FastAPI simuliert AIOKafkaConsumer in Tests
Posted: 27 Dec 2024, 15:00
Ich habe eine FastApi-Anwendung, die das Kafka-Thema im asynchronen Modus abonniert. Ich muss Unittest für meine Anwendung erstellen. Sieht so aus, als müsste ich die Kafka-Funktionalität verspotten
Mein Code:
Ich verwende den Fastapi-Testclient
Ich habe einen Fehler in der AIOKafkaConsumer-Erstellungszeile:
Das Objekt sollte innerhalb einer asynchronen Funktion oder Bereitstellungsschleife erstellt werden direkt.
Wie teste ich meine Anwendung richtig? (Sieht so aus, als müsste ich die Kafka-Funktionalität verspotten)
Mein Code:
Code: Select all
def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.
Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)
app = FastAPI()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""
log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())
async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Code: Select all
import pytest
from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app
client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.
Das Objekt sollte innerhalb einer asynchronen Funktion oder Bereitstellungsschleife erstellt werden direkt.
Wie teste ich meine Anwendung richtig? (Sieht so aus, als müsste ich die Kafka-Funktionalität verspotten)