Ich verwende ein Spark-Streaming-Notebook aus meinem Data Engineering-Kurs und bin auf einen Fehler gestoßen, der vermutlich mit der Inkompatibilität der Spark- und Kafka-Versionen in Docker zusammenhängt.
Spark Streaming is running. Press Ctrl+C to stop...
---------------------------------------------------------------------------
StreamingQueryException Traceback (most recent call last)
Cell In[32], line 4
2 try:
3 print("Spark Streaming is running. Press Ctrl+C to stop...")
----> 4 query.awaitTermination(60) # Wait for 60 seconds (for demo)
5 print("Streaming completed or timeout reached.")
6 except KeyboardInterrupt:
File /usr/local/spark/python/pyspark/sql/streaming/query.py:199, in StreamingQuery.awaitTermination(self, timeout)
197 if not isinstance(timeout, (int, float)) or timeout < 0:
198 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
--> 199 return self._jsq.awaitTermination(int(timeout * 1000))
200 else:
201 return self._jsq.awaitTermination()
File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /usr/local/spark/python/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception..deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
StreamingQueryException: [STREAM_FAILED] Query [id = 24d56320-12a8-43fe-b96f-79f02ce2cf72, runId = 22ebd29e-03d2-4354-9ede-0b727506d247] terminated with exception: org/apache/spark/kafka010/KafkaConfigUpdater
Ich habe versucht, die Spark-Version 3.5.0 in Dockerfile.jupyter mit docker-compose.yml abzugleichen, bin mir aber nicht sicher, ob ich die richtigen Kafka- und MongoDB-Konnektoren für Spark hatte.
Ich verwende ein Spark-Streaming-Notebook aus meinem Data Engineering-Kurs und bin auf einen Fehler gestoßen, der vermutlich mit der Inkompatibilität der Spark- und Kafka-Versionen in Docker zusammenhängt. [code]game_monitoring.ipynb[/code] [code]import os import json import findspark from datetime import datetime, timedelta import time from pymongo import MongoClient
# Initialize Spark findspark.init()
# Import Spark after [url=viewtopic.php?t=25360]environment[/url] setup from pyspark.sql import SparkSession from pyspark.sql.functions import col, from_json, window, sum, expr, to_timestamp, current_timestamp from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# Wait for termination try: print("Spark Streaming is running. Press Ctrl+C to stop...") query.awaitTermination(60) # Wait for 60 seconds (for demo) print("Streaming completed or timeout reached.") except KeyboardInterrupt: print("Stopping the streaming query...") query.stop() console_query.stop() [/code] Fehlermeldung: [code]Spark Streaming is running. Press Ctrl+C to stop... --------------------------------------------------------------------------- StreamingQueryException Traceback (most recent call last) Cell In[32], line 4 2 try: 3 print("Spark Streaming is running. Press Ctrl+C to stop...") ----> 4 query.awaitTermination(60) # Wait for 60 seconds (for demo) 5 print("Streaming completed or timeout reached.") 6 except KeyboardInterrupt:
File /usr/local/spark/python/pyspark/sql/streaming/query.py:199, in StreamingQuery.awaitTermination(self, timeout) 197 if not isinstance(timeout, (int, float)) or timeout < 0: 198 raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) --> 199 return self._jsq.awaitTermination(int(timeout * 1000)) 200 else: 201 return self._jsq.awaitTermination()
File /usr/local/spark/python/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception..deco(*a, **kw) 171 converted = convert_exception(e.java_exception) 172 if not isinstance(converted, UnknownException): 173 # Hide where the exception came from that shows a non-Pythonic 174 # JVM exception message. --> 175 raise converted from None 176 else: 177 raise
StreamingQueryException: [STREAM_FAILED] Query [id = 24d56320-12a8-43fe-b96f-79f02ce2cf72, runId = 22ebd29e-03d2-4354-9ede-0b727506d247] terminated with exception: org/apache/spark/kafka010/KafkaConfigUpdater [/code] Ich habe versucht, die Spark-Version 3.5.0 in Dockerfile.jupyter mit docker-compose.yml abzugleichen, bin mir aber nicht sicher, ob ich die richtigen Kafka- und MongoDB-Konnektoren für Spark hatte. [code]Dockerfile.jupyter[/code] [code]# Add Kafka and MongoDB connectors for Spark RUN cd /usr/local/spark/jars && \ wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar && \ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.0/kafka-clients-3.5.0.jar && \ wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.5.0/spark-streaming-kafka-0-10_2.12-3.5.0.jar && \ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.12/3.5.0/kafka_2.12-3.5.0.jar && \ wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar && \ wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar [/code] [code]docker-compose.yml[/code] [code]services: broker: image: confluentinc/cp-server:7.9.0 hostname: broker container_name: broker ports: - "9092:9092" - "9101:9101" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
Ich habe eine Java -Spark -Anwendung, die Daten von Kafka erhält, einige Arbeiten an den Daten ausführt und dann mit dem Befehl toundswrite () Parquetdateien in S3 speichert. Bis zu diesem Zeitpunkt...
Ich habe eine Java -Spark -Anwendung, die Daten von Kafka erhält, einige Arbeiten an den Daten ausführt und dann mit dem Befehl toundswrite () Parquetdateien in S3 speichert. Bis zu diesem Zeitpunkt...
Ich bin neu im Stoff und habe ein paar Udemy -Kurse durchgeführt. Ich bin mir jedoch nicht sicher, wie ich dieses Problem angehen soll. Die Transformationen sind kleine, umgebende Spalten...
Ich bekomme diesen Fehler vom Spark mit Hadoop und PySpark
ApplicationMaster: Waiting for spark context initialization...
25/05/12 23:56:11 INFO ApplicationMaster: Final app status: FAILED,...
In meinem Projekt verwende ich den Spark-Cassandra-Connector, um die aus der Cassandra-Tabelle zu lesen und sie weiter in JavaRDD zu verarbeiten, aber ich habe ein Problem bei der Verarbeitung der...