FastAPI simuliert AIOKafkaConsumer in Tests

Post a reply

Smilies
:) :( :oops: :chelo: :roll: :wink: :muza: :sorry: :angel: :read: *x) :clever:
View more smilies

BBCode is ON
[img] is ON
[flash] is OFF
[url] is ON
Smilies are ON

Topic review
   

Expand view Topic review: FastAPI simuliert AIOKafkaConsumer in Tests

by Anonymous » 27 Dec 2024, 15:00

Ich habe eine FastApi-Anwendung, die das Kafka-Thema im asynchronen Modus abonniert. Ich muss Unittest für meine Anwendung erstellen. Sieht so aus, als müsste ich die Kafka-Funktionalität verspotten
Mein Code:

Code: Select all

def create_consumer() -> AIOKafkaConsumer:
"""Create AIOKafkaConsumer.

Returns:
AIOKafkaConsumer: The created AIOKafkaConsumer instance.
"""
return AIOKafkaConsumer(
settings.kafka_consumer_topic,
bootstrap_servers=f"{settings.kafka_consumer_host}:{settings.kafka_consumer_port}"
)

app = FastAPI()
consumer = create_consumer()

@app.on_event("startup")
async def startup_event():
"""Start up event for FastAPI application."""

log.info("Starting up...")
await consumer.start()
asyncio.create_task(consume())

async def consume(db: Session = next(get_db())):
"""Consume and print messages from Kafka."""
while True:
async for msg in consumer:
...
Ich verwende den Fastapi-Testclient

Code: Select all

import pytest

from fastapi.testclient import TestClient
from app.main import get_task_files
#from app.main import app

client = TestClient(app) # ERR: AIOKafkaConsumer The object should be created within an async function or provide loop directly.

Ich habe einen Fehler in der AIOKafkaConsumer-Erstellungszeile:

Das Objekt sollte innerhalb einer asynchronen Funktion oder Bereitstellungsschleife erstellt werden direkt.

Wie teste ich meine Anwendung richtig? (Sieht so aus, als müsste ich die Kafka-Funktionalität verspotten)

Top