Code: Select all
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Beim Erstellen derselben Nachricht zu demselben Thema Von einer einfachen Python-Produzentenanwendung (außerhalb des Luftstroms) funktioniert es einwandfrei. Bei der Produktion aus der Airflow-Aufgabe schlägt dies jedoch fehl.
Der Airflow-Cluster lauscht erfolgreich auf die Kafka-Broker.
Themenauthentifizierung und -autorisierung sind abgeschlossen.
Code: Select all
from airflow import DAG
from airflow.operators.python import PythonOperator
from confluent_kafka import Producer
def produce_to_kafka(**context):
# Kafka configuration
KAFKA_CONFIG = {
'bootstrap.servers': 'hostname:port',
'security.protocol': 'ssl',
"partitioner": "random",
'ssl.ca.location': 'path.pem',
'ssl.certificate.location':'path.cer',
'ssl.key.location':'path.key',
}
KAFKA_TOPIC = 'my_topic'
producer = Producer(KAFKA_CONFIG)
producer.produce(KAFKA_TOPIC, key='my_key', value='my_message', partition=1, callback=handle_error)
producer.flush()
with DAG(
dag_id='my_dag',
schedule_interval='0 0 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
send_to_kafka = PythonOperator(
task_id='send_messages_to_kafka',
python_callable=produce_to_kafka,
provide_context=True,
)
send_to_kafka