Richtig Kafka mit Python für Leistung und Skalierbarkeit einrichten

Post a reply

Smilies
:) :( :oops: :chelo: :roll: :wink: :muza: :sorry: :angel: :read: *x) :clever:
View more smilies

BBCode is ON
[img] is ON
[flash] is OFF
[url] is ON
Smilies are ON

Topic review
   

Expand view Topic review: Richtig Kafka mit Python für Leistung und Skalierbarkeit einrichten

by Anonymous » 04 Mar 2025, 08:48

Ich bin neu bei Apache Kafka und möchte es mit Python einrichten, um Fahrzeugorte zu verfolgen. Für die Implementierung von WebSocket verwende ich Django -Kanäle. Während dieses Setup es mir ermöglicht, Echtzeitdaten aus dem Frontend zu erhalten, ist es nicht ideal für die Leistung und Skalierbarkeit. Ich möchte jedoch keine Daten für jede eingehende Nachricht kontinuierlich in die Datenbank einfügen. Um dies anzugehen, habe ich die Batch -Verarbeitung in meinem Kafka -Verbraucher implementiert. Trotzdem empfängt der Verbraucher weiterhin Nachrichten, nachdem er erstellt wurde. Unten finden Sie den Beispielcode für meinen Kafka -Produzenten und meinen Verbraucher. < /P>

Code: Select all

consumers.py

KAFKA_TOPIC = 'vehicle-location-updates'
KAFKA_BROKER = 'localhost:9092'  # Adjust to your Kafka broker

# Create Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

def consume_vehicle_locations():
for message in consumer:
# Get the data from the Kafka message
data = message.value
vehicle_id = data.get('vehicle_id')
lat = data.get('latitude')
lon = data.get('longitude')
print(f"Location updated for {vehicle_id} at {lat, lon}")

KAFKA_TOPIC = 'vehicle-location-updates'
KAFKA_BROKER = 'localhost:9092'  # Adjust to your Kafka broker

# Create Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

def consume_vehicle_locations():
for message in consumer:
# Get the data from the Kafka message
data = message.value
vehicle_id = data.get('vehicle_id')
lat = data.get('latitude')
lon = data.get('longitude')
print(f"Location updated for {vehicle_id} at {lat, lon}")
< /code>
producers.py
from kafka import KafkaProducer
import json

KAFKA_TOPIC = "vehicle-location-updates"
KAFKA_BROKER = "localhost:9092"  # Adjust to your Kafka broker

# Custom partitioner to ensure messages for the same vehicle_id go to the same partition
def vehicle_partition(key_bytes, all_partitions, available_partitions):
vehicle_id = key_bytes.decode('utf-8')
# Use the vehicle_id hash to determine the partition (this is just an example)
partition = hash(vehicle_id) % len(available_partitions)
return partition

# Kafka Producer Configuration
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BROKER],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda v: str(v).encode('utf-8'),  # Serialize vehicle_id as key
batch_size=1048576,  # 1 MB batch size (increase to accumulate more messages)
linger_ms=100000,  # Wait for 100 ms before sending the batch
partitioner=vehicle_partition  # Custom partitioner for vehicles
)

# Send location messages for multiple vehicles
def send_vehicle_location(vehicle_id, lat, lon):
message = {
'vehicle_id': vehicle_id,
'latitude': lat,
'longitude': lon
}

# Send the message with the vehicle_id as the key (to ensure same vehicle goes to same partition)
producer.send(
KAFKA_TOPIC,
value=message,
key=str(vehicle_id)  # Send vehicle_id as key to ensure partitioning
)

producer.flush()

# Example usage
for i in range(10):
send_vehicle_location(vehicle_id=1, lat=40.7128, lon=-74.0060)
send_vehicle_location(vehicle_id=2, lat=34.0522, lon=-118.2437)

Top