Das Problem, das ich habe, sind die FastAPIs request.stream() gibt einen asynchronen-Generator zurück, während der Speicherclient entweder Rohdaten als Bytes oder String oder etwas basierend auf io erwartet .IOBase-Klasse.
Also, wenn ich Folgendes aufrufe:
Code: Select all
async def upload(request: fastapi.Request, storage_client: gcloud.aio.storage.Storage):
upload_stream = request.stream()
await storage_client.upload(
"my_bucket",
"my_object",
upload_stream
)
Wie würde ich ein io bekommen? IOBase-abgeleitetes Stream-Objekt, das akzeptiert wird, ohne den gesamten Anforderungstext auf einmal zu lesen?