Der Aufruf von AIOKafkaConsumer über FastAPI löst den Fehler „Objekt sollte innerhalb einer asynchronen Funktion erstellPython

Python-Programme
Guest
 Der Aufruf von AIOKafkaConsumer über FastAPI löst den Fehler „Objekt sollte innerhalb einer asynchronen Funktion erstell

Post by Guest »

Ich habe eine FastAPI-Anwendung, die das Kafka-Thema im asynchronen-Modus abonniert. Ich muss einen Unittest für meine Anwendung erstellen.
Mein Code:

Code: Select all

def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.

Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)

app = FastAPI()
consumer = create_consumer()

@app.on_event("startup")
async def startup_event():
"""Startup event for FastAPI application."""

log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())

async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Ich verwende den TestClient von FastAPI:

Code: Select all

import pytest

from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app

client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.
Allerdings erhalte ich bei der AIOKafkaConsumer-Instatiation die folgende Fehlermeldung:

Das Objekt sollte erstellt werden innerhalb einer asynchronen Funktion oder stellen Sie direkt eine Schleife bereit.

Wie teste ich meine Anwendung richtig? Es sieht so aus, als müsste ich die Kafka-Funktionalität verspotten.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post