Azure Container -Apps Jobs mit Event Hubs Integration läuft endlos, obwohl es keine neuen Ereignisse gibt
Posted: 15 May 2025, 20:50
Ich verwende Azure Container -Apps -Jobs mit einem Ereignisauslöser über Azure Event Hubs mit Blobmetadaten als Check -Point -Strategie. Der Job wird so ausgelöst, so sollte der Check -Point -Store vom Job aktualisiert werden, wie er sollte. Das Problem ist, dass die Jobs sofort nach Abschluss in einer endlosen Schleife ausgelöst werden, obwohl es keine neuen Ereignisse gibt. Dies habe ich durch die Protokolle des Jobs bestätigt, für den ersten Lauf sind alle Ereignisse protokolliert. Alle nachfolgenden Ereignisse haben überhaupt keine Ereignisprotokolle.
Der Job hängt von Azure-eventhub-CheckpointStorBlob-Aio (1.2.0) und Azure-Identity (1.21.0) ab.
Code: Select all
eventTriggerConfig: {
parallelism: 1
replicaCompletionCount: 1
scale: {
rules: [
{
name: 'event-hub-trigger'
type: 'azure-eventhub'
auth: [
{
secretRef: 'event-hub-connection-string'
triggerParameter: 'connection'
}
{
secretRef: 'storage-account-connection-string'
triggerParameter: 'storageConnection'
}
]
metadata: {
blobContainer: containerName
checkPointStrategy: 'blobMetadata'
consumerGroup: eventHubConsumerGroupName
eventHubName: eventHubName
connectionFromEnv: 'EVENT_HUB_CONNECTION_STRING'
storageConnectionFromEnv: 'STORAGE_ACCOUNT_CONNECTION_STRING'
activationUnprocessedEventThreshold: 1
unprocessedEventThreshold: 5
}
}
]
}
}
< /code>
Hier ist die Python-basierte Joblogik: < /p>
import asyncio
from datetime import datetime, timedelta, timezone
import logging
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential
BLOB_STORAGE_ACCOUNT_URL = os.getenv("BLOB_STORAGE_ACCOUNT_URL")
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME")
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = os.getenv("EVENT_HUB_FULLY_QUALIFIED_NAMESPACE")
EVENT_HUB_NAME = os.getenv("EVENT_HUB_NAME")
EVENT_HUB_CONSUMER_GROUP = os.getenv("EVENT_HUB_CONSUMER_GROUP")
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
credential = DefaultAzureCredential()
# Global variable to track the last event time
last_event_time = None
WAIT_DURATION = timedelta(seconds=30)
async def on_event(partition_context, event):
global last_event_time
if event is not None:
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
else:
print(f"Received a None event from partition ID: {partition_context.partition_id}")
# Update the last event time
last_event_time = datetime.now(timezone.utc)
await partition_context.update_checkpoint(event)
async def receive():
global last_event_time
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential,
)
client = EventHubConsumerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
consumer_group=EVENT_HUB_CONSUMER_GROUP,
checkpoint_store=checkpoint_store,
credential=credential,
)
# Initialize the last event time
last_event_time = datetime.now(timezone.utc)
async with client:
# client.receive method is a blocking call, so we run it in a separate thread.
receive_task = asyncio.create_task(
client.receive(
on_event=on_event,
starting_position="-1",
)
)
# Wait until no events are received for the specified duration
while True:
await asyncio.sleep(1)
if datetime.now(timezone.utc) - last_event_time > WAIT_DURATION:
break
# Close the client and the receive task
await client.close()
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
# Close credential when no longer needed.
await credential.close()
def run():
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())