Mögliche Inkompatibilität der Spark- und Kafka-Version in Docker?Python

Python-Programme
Anonymous
 Mögliche Inkompatibilität der Spark- und Kafka-Version in Docker?

Post by Anonymous »

Ich verwende ein Spark-Streaming-Notebook aus meinem Data Engineering-Kurs und bin auf einen Fehler gestoßen, der vermutlich mit der Inkompatibilität der Spark- und Kafka-Versionen in Docker zusammenhängt.

Code: Select all

game_monitoring.ipynb

Code: Select all

import os
import json
import findspark
from datetime import datetime, timedelta
import time
from pymongo import MongoClient

# Initialize Spark
findspark.init()

# Import Spark after [url=viewtopic.php?t=25360]environment[/url] setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window, sum, expr, to_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

spark = SparkSession.builder \
.appName("GameTimeMonitoring") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
.config("spark.sql.shuffle.partitions", "2") \
.master("local[*]") \
.getOrCreate()

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "game-activity") \
.option("startingOffsets", "earliest") \
.load()

daily_play_time = active_sessions \
.withWatermark("timestamp", "1 minute") \
.groupBy(
col("user_id"),
window(col("timestamp"), "1 day").alias("day")
) \
.count() \
.withColumnRenamed("count", "minutes_played") \
.select(
col("user_id"),
col("day.start").alias("day_start"),
col("day.end").alias("day_end"),
col("minutes_played")
)

# Start streaming query to MongoDB
query = daily_play_time \
.writeStream \
.outputMode("complete") \
.foreachBatch(write_to_mongodb) \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()

# Wait for termination
try:
print("Spark Streaming is running. Press Ctrl+C to stop...")
query.awaitTermination(60)  # Wait for 60 seconds (for demo)
print("Streaming completed or timeout reached.")
except KeyboardInterrupt:
print("Stopping the streaming query...")
query.stop()
console_query.stop()
Fehlermeldung:

Code: Select all

Spark Streaming is running. Press Ctrl+C to stop...
---------------------------------------------------------------------------
StreamingQueryException                   Traceback (most recent call last)
Cell In[32], line 4
2 try:
3     print("Spark Streaming is running. Press Ctrl+C to stop...")
----> 4     query.awaitTermination(60)  # Wait for 60 seconds (for demo)
5     print("Streaming completed or timeout reached.")
6 except KeyboardInterrupt:

File /usr/local/spark/python/pyspark/sql/streaming/query.py:199, in StreamingQuery.awaitTermination(self, timeout)
197     if not isinstance(timeout, (int, float)) or timeout < 0:
198         raise ValueError("timeout must be a positive integer or float.  Got %s" % timeout)
--> 199     return self._jsq.awaitTermination(int(timeout * 1000))
200 else:
201     return self._jsq.awaitTermination()

File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317     self.command_header +\
1318     args_command +\
1319     proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323     answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326     if hasattr(temp_arg, "_detach"):

File /usr/local/spark/python/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception..deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173     # Hide where the exception came from that shows a non-Pythonic
174     # JVM exception message.
--> 175     raise converted from None
176 else:
177     raise

StreamingQueryException: [STREAM_FAILED] Query [id = 24d56320-12a8-43fe-b96f-79f02ce2cf72, runId = 22ebd29e-03d2-4354-9ede-0b727506d247] terminated with exception: org/apache/spark/kafka010/KafkaConfigUpdater
Ich habe versucht, die Spark-Version 3.5.0 in Dockerfile.jupyter mit docker-compose.yml abzugleichen, bin mir aber nicht sicher, ob ich die richtigen Kafka- und MongoDB-Konnektoren für Spark hatte.

Code: Select all

Dockerfile.jupyter

Code: Select all

# Add Kafka and MongoDB connectors for Spark
RUN cd /usr/local/spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar && \
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.0/kafka-clients-3.5.0.jar && \
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.5.0/spark-streaming-kafka-0-10_2.12-3.5.0.jar && \
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.12/3.5.0/kafka_2.12-3.5.0.jar && \
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar &&  \
wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar

Code: Select all

docker-compose.yml

Code: Select all

services:
broker:
image: confluentinc/cp-server:7.9.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry:
image: confluentinc/cp-schema-registry:7.9.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.9.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"

control-center:
image: confluentinc/cp-enterprise-control-center:7.9.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL:  "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.9.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.9.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true

ksql-datagen:
image: confluentinc/ksqldb-examples:7.9.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish...  && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081

rest-proxy:
image: confluentinc/cp-kafka-rest:7.9.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

spark-master:
image: bitnami/spark:3.5.0
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "8080:8080"
- "7077:7077"

spark-worker:
image: bitnami/spark:3.5.0
depends_on:
- spark-master
ports:
- "8084:8084"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=2
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no

mongodb:
image: mongo:7.0
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=password
ports:
- "27017:27017"
volumes:
- ./mongodb_data:/data/db

jupyter:
build:
context: .
dockerfile: Dockerfile.jupyter
ports:
- "8888:8888"
environment:
- JUPYTER_ENABLE_LAB=yes
# Add debugging [url=viewtopic.php?t=25360]environment[/url] variables
- PYTHONPATH=/home/jovyan/work
volumes:
- ./notebooks:/home/jovyan/work
command: >
start-notebook.sh
--NotebookApp.token=''
--NotebookApp.password=''
--NotebookApp.log_level='DEBUG'
depends_on:
- broker
- mongodb

api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "8000:8000"
depends_on:
- mongodb

Ich wäre Ihnen sehr dankbar, wenn Sie einen Vorschlag hätten. Vielen Dank im Voraus, dass Sie mir geholfen haben!

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post