So streamen Sie Audio aus Google Cloud -Speicher in Stücken und konvertieren Sie jeden Chunk in WAV für Whisper -TranskrPython

Python-Programme
Anonymous
 So streamen Sie Audio aus Google Cloud -Speicher in Stücken und konvertieren Sie jeden Chunk in WAV für Whisper -Transkr

Post by Anonymous »

Ich arbeite an einem Projekt, bei dem ich mit dem Flüstermodell von OpenAI in einem Google Cloud -Speicher -Bucket Audio transkribieren muss. Das Audio wird im Webm-Format mit OPUS-Codierung gespeichert. Aufgrund der Dateigröße stream ich das Audio in 30-Sekunden-Stücken. Der erste Chunk konvertiert erfolgreich, aber nachfolgende Stücke konvertieren nicht. Ich vermute, dies liegt daran, dass jedem Chunk den Header des Webm -Containers fehlt, den FFMPEG den Opus -Codec richtig interpretieren muss. von Webm/opus bis wav.

Code: Select all

async def handle_transcription_and_notify(
consultation_service: ConsultationService,
consultation_processor: ConsultationProcessor,
consultation: Consultation,
language: str,
notes: str,
clinic_id: str,
vet_email: str,
trace_id: str,
blob_path: str,
max_retries: int = 3,
retry_delay: int = 5,
max_concurrent_tasks: int = 3
):
"""
Handles the transcription process by streaming the file from GCS, converting to a compatible format,
and notifying the client via WebSocket.
"""
chunk_duration_sec = 30  # 30 seconds per chunk
logger.info(f"Starting transcription process for consultation {consultation.consultation_id}",
extra={'trace_id': trace_id})

# Initialize GCS client
service_account_key = os.environ.get('SERVICE_ACCOUNT_KEY_BACKEND')
if not service_account_key:
logger.error("Service account key not found in environment variables", extra={'trace_id': trace_id})
await send_discord_alert(
f"Service account key not found for consultation {consultation.consultation_id}.\nTrace ID: {trace_id}"
)
return

try:
service_account_info = json.loads(service_account_key)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
except Exception as e:
logger.error(f"Error loading service account credentials: {str(e)}", extra={'trace_id': trace_id})
await send_discord_alert(
f"Error loading service account credentials for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return

# Initialize GCS client
service_account_key = os.environ.get('SERVICE_ACCOUNT_KEY_BACKEND')
if not service_account_key:
logger.error("Service account key not found in environment variables", extra={'trace_id': trace_id})
await send_discord_alert(
f"Service account key not found for consultation {consultation.consultation_id}.\nTrace ID: {trace_id}"
)
return

try:
service_account_info = json.loads(service_account_key)
credentials = service_account.Credentials.from_service_account_info(service_account_info)
except Exception as e:
logger.error(f"Error loading service account credentials: {str(e)}", extra={'trace_id': trace_id})
await send_discord_alert(
f"Error loading service account credentials for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return

storage_client = storage.Client(credentials=credentials)
bucket_name = 'vetz_consultations'
blob = storage_client.bucket(bucket_name).get_blob(blob_path)
bytes_per_second = 16000 * 2  # 32,000 bytes per second
chunk_size_bytes = 30 * bytes_per_second
size = blob.size

async def stream_blob_in_chunks(blob, chunk_size):
loop = asyncio.get_running_loop()
start = 0
size = blob.size
while start <  size:
end = min(start + chunk_size - 1, size - 1)
try:
logger.info(f"Requesting chunk from {start} to {end}", extra={'trace_id': trace_id})
chunk = await loop.run_in_executor(
None, lambda: blob.download_as_bytes(start=start, end=end)
)
if not chunk:
break
logger.info(f"Yielding chunk from {start} to {end}, size: {len(chunk)} bytes",
extra={'trace_id': trace_id})
yield chunk
start += chunk_size
except Exception as e:
logger.error(f"Error downloading chunk from {start} to {end}: {str(e)}", exc_info=True,
extra={'trace_id': trace_id})
raise e

async def convert_to_wav(chunk_bytes, chunk_idx):
"""
Convert audio chunk to WAV format compatible with Whisper, ensuring it's 16 kHz, mono, and 16-bit PCM.
"""
try:
logger.debug(f"Processing chunk {chunk_idx}: size = {len(chunk_bytes)} bytes")

detected_format = await detect_audio_format(chunk_bytes)
logger.info(f"Detected audio format for chunk {chunk_idx}: {detected_format}")
input_io = io.BytesIO(chunk_bytes)
output_io = io.BytesIO()

# ffmpeg command to convert webm/opus to WAV with 16 kHz, mono, and 16-bit PCM

# ffmpeg command with debug information
ffmpeg_command = [
"ffmpeg",
"-loglevel", "debug",
"-f", "s16le",            # Treat input as raw PCM data
"-ar", "48000",           # Set input sample rate
"-ac", "1",               # Set input to mono
"-i", "pipe:0",
"-ar", "16000",           # Set output sample rate to 16 kHz
"-ac", "1",               # Ensure mono output
"-sample_fmt", "s16",     # Set output format to 16-bit PCM
"-f", "wav",              # Output as WAV format
"pipe:1"
]

process = subprocess.Popen(
ffmpeg_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)

stdout, stderr = process.communicate(input=input_io.read())

if process.returncode == 0:
logger.info(f"FFmpeg conversion completed successfully for chunk {chunk_idx}")
output_io.write(stdout)
output_io.seek(0)

# Save the WAV file locally for listening
output_dir = "converted_chunks"
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, f"chunk_{chunk_idx}.wav")

with open(file_path, "wb") as f:
f.write(stdout)
logger.info(f"Chunk {chunk_idx} saved to {file_path}")

return output_io
else:
logger.error(f"FFmpeg failed for chunk {chunk_idx} with return code {process.returncode}")
logger.error(f"Chunk {chunk_idx} - FFmpeg stderr: {stderr.decode()}")
return None

except Exception as e:
logger.error(f"Unexpected error in FFmpeg conversion for chunk {chunk_idx}: {str(e)}")
return None

async def transcribe_chunk(idx, chunk_bytes):
for attempt in range(1, max_retries + 1):
try:
logger.info(f"Transcribing chunk {idx + 1} (attempt {attempt}).", extra={'trace_id':  trace_id})

# Convert to WAV format
wav_io = await convert_to_wav(chunk_bytes, idx)
if not wav_io:
logger.error(f"Failed to convert chunk {idx + 1} to WAV format.")
return ""

wav_io.name = "chunk.wav"
chunk_transcription = await consultation_processor.transcribe_audio_whisper(wav_io)
logger.info(f"Chunk {idx + 1} transcribed successfully.", extra={'trace_id': trace_id})
return chunk_transcription
except Exception as e:
logger.error(f"Error transcribing chunk {idx + 1} (attempt {attempt}): {str(e)}", exc_info=True,
extra={'trace_id': trace_id})
if attempt < max_retries:
await asyncio.sleep(retry_delay)
else:
await send_discord_alert(
f"Max retries reached for chunk {idx + 1} in consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return ""  # Return empty string for failed chunk

await notification_manager.send_personal_message(
f"Consultation {consultation.consultation_id} is being transcribed.", vet_email
)

try:
idx = 0
full_transcription = []
async for chunk in stream_blob_in_chunks(blob, chunk_size_bytes):
transcription = await transcribe_chunk(idx, chunk)
if transcription:
full_transcription.append(transcription)
idx += 1

combined_transcription = " ".join(full_transcription)
consultation.full_transcript = (consultation.full_transcript or "") + " " + combined_transcription
consultation_service.save_consultation(clinic_id, vet_email, consultation)
logger.info(f"Transcription saved for consultation {consultation.consultation_id}.",
extra={'trace_id': trace_id})

except Exception as e:
logger.error(f"Error during transcription process: {str(e)}", exc_info=True, extra={'trace_id': trace_id})
await send_discord_alert(
f"Error during transcription process for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
return

await notification_manager.send_personal_message(
f"Consultation {consultation.consultation_id} has been transcribed.", vet_email
)

try:
template_service = TemplateService()
medical_record_template = template_service.get_template_by_name(
consultation.medical_record_template_id).sections

sections = await consultation_processor.extract_structured_sections(
transcription=consultation.full_transcript,
notes=notes,
language=language,
template=medical_record_template,
)
consultation.sections = sections
consultation_service.save_consultation(clinic_id, vet_email, consultation)
logger.info(f"Sections processed for consultation {consultation.consultation_id}.",
extra={'trace_id': trace_id})
except Exception as e:
logger.error(f"Error processing sections for consultation {consultation.consultation_id}: {str(e)}",
exc_info=True, extra={'trace_id': trace_id})
await send_discord_alert(
f"Error processing sections for consultation {consultation.consultation_id}.\nError: {str(e)}\nTrace ID: {trace_id}"
)
raise e

await notification_manager.send_personal_message(
f"Consultation {consultation.consultation_id} is fully processed.", vet_email
)
logger.info(f"Successfully processed consultation {consultation.consultation_id}.",
extra={'trace_id': trace_id})

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post