Ich habe eine FastAPI -Anwendung, die sich mit dem Topie von Kafka unter Verwendung asynchronous code abonniert (d. H. Async /
). Ich muss einen Unit-Test für meine Anwendung erstellen.
Code: Select all
def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.
Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)
app = FastAPI()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Startup event for FastAPI application."""
log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())
async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Ich verwende den TestClient von Fastapi :
import pytest
from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app
client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.
< /code>
Ich erhalte jedoch den folgenden Fehler bei AIOKAFKACONSUMER < /code> Instatiation: < /p>
Das Objekt sollte in einer asynchronisierten Funktion erstellt werden oder die Schleife direkt anbieten. Es sieht so aus, als müsste ich Kafka-Funktionalität verspotten.
Ich habe eine FastAPI -Anwendung, die sich mit dem Topie von Kafka unter Verwendung asynchronous code abonniert (d. H. Async /[code]await[/code]). Ich muss einen Unit-Test für meine Anwendung erstellen.[code]def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.
Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)
app = FastAPI()
consumer = create_consumer()
@app.on_event("startup")
async def startup_event():
"""Startup event for FastAPI application."""
log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())
async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
[/code]
Ich verwende den TestClient von Fastapi :
import pytest
from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app
client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.
< /code>
Ich erhalte jedoch den folgenden Fehler bei AIOKAFKACONSUMER < /code> Instatiation: < /p>
Das Objekt sollte in einer asynchronisierten Funktion erstellt werden oder die Schleife direkt anbieten. Es sieht so aus, als müsste ich Kafka-Funktionalität verspotten.