KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Zeitüberschreitung der Nachricht"}Python

Python-Programme
Guest
 KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Zeitüberschreitung der Nachricht"}

Post by Guest »

Code: Select all

KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

Beim Erstellen einer Nachricht zu einem Kafka-Thema von der Airflow-Aufgabe (Python-Operator) wurde ein Fehler empfangen.
Beim Erstellen derselben Nachricht zu demselben Thema Von einer einfachen Python-Produzentenanwendung (außerhalb des Luftstroms) funktioniert es einwandfrei. Bei der Produktion aus der Airflow-Aufgabe schlägt dies jedoch fehl.
Der Airflow-Cluster lauscht erfolgreich auf die Kafka-Broker.
Themenauthentifizierung und -autorisierung sind abgeschlossen.

Code: Select all

from airflow import DAG
from airflow.operators.python import PythonOperator
from confluent_kafka import Producer

def produce_to_kafka(**context):
# Kafka configuration
KAFKA_CONFIG = {
'bootstrap.servers': 'hostname:port',
'security.protocol': 'ssl',
"partitioner": "random",
'ssl.ca.location': 'path.pem',
'ssl.certificate.location':'path.cer',
'ssl.key.location':'path.key',
}
KAFKA_TOPIC = 'my_topic'

producer = Producer(KAFKA_CONFIG)

producer.produce(KAFKA_TOPIC, key='my_key', value='my_message', partition=1, callback=handle_error)
producer.flush()

with DAG(
dag_id='my_dag',
schedule_interval='0 0 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:

send_to_kafka = PythonOperator(
task_id='send_messages_to_kafka',
python_callable=produce_to_kafka,
provide_context=True,
)

send_to_kafka

Ich habe versucht, eine einfache Producer-Python-Anwendung außerhalb von Airflow mit denselben Producer-Konfigurationen zu erstellen, und es funktioniert einwandfrei.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post