Richtig Kafka mit Python für Leistung und Skalierbarkeit einrichtenPython

Python-Programme
Anonymous
 Richtig Kafka mit Python für Leistung und Skalierbarkeit einrichten

Post by Anonymous »

Ich bin neu bei Apache Kafka und möchte es mit Python einrichten, um Fahrzeugorte zu verfolgen. Für die Implementierung von WebSocket verwende ich Django -Kanäle. Während dieses Setup es mir ermöglicht, Echtzeitdaten aus dem Frontend zu erhalten, ist es nicht ideal für die Leistung und Skalierbarkeit. Ich möchte jedoch keine Daten für jede eingehende Nachricht kontinuierlich in die Datenbank einfügen. Um dies anzugehen, habe ich die Batch -Verarbeitung in meinem Kafka -Verbraucher implementiert. Trotzdem empfängt der Verbraucher weiterhin Nachrichten, nachdem er erstellt wurde. Unten finden Sie den Beispielcode für meinen Kafka -Produzenten und meinen Verbraucher. < /P>

Code: Select all

consumers.py

KAFKA_TOPIC = 'vehicle-location-updates'
KAFKA_BROKER = 'localhost:9092'  # Adjust to your Kafka broker

# Create Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

def consume_vehicle_locations():
for message in consumer:
# Get the data from the Kafka message
data = message.value
vehicle_id = data.get('vehicle_id')
lat = data.get('latitude')
lon = data.get('longitude')
print(f"Location updated for {vehicle_id} at {lat, lon}")

KAFKA_TOPIC = 'vehicle-location-updates'
KAFKA_BROKER = 'localhost:9092'  # Adjust to your Kafka broker

# Create Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

def consume_vehicle_locations():
for message in consumer:
# Get the data from the Kafka message
data = message.value
vehicle_id = data.get('vehicle_id')
lat = data.get('latitude')
lon = data.get('longitude')
print(f"Location updated for {vehicle_id} at {lat, lon}")
< /code>
producers.py
from kafka import KafkaProducer
import json

KAFKA_TOPIC = "vehicle-location-updates"
KAFKA_BROKER = "localhost:9092"  # Adjust to your Kafka broker

# Custom partitioner to ensure messages for the same vehicle_id go to the same partition
def vehicle_partition(key_bytes, all_partitions, available_partitions):
vehicle_id = key_bytes.decode('utf-8')
# Use the vehicle_id hash to determine the partition (this is just an example)
partition = hash(vehicle_id) % len(available_partitions)
return partition

# Kafka Producer Configuration
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BROKER],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda v: str(v).encode('utf-8'),  # Serialize vehicle_id as key
batch_size=1048576,  # 1 MB batch size (increase to accumulate more messages)
linger_ms=100000,  # Wait for 100 ms before sending the batch
partitioner=vehicle_partition  # Custom partitioner for vehicles
)

# Send location messages for multiple vehicles
def send_vehicle_location(vehicle_id, lat, lon):
message = {
'vehicle_id': vehicle_id,
'latitude': lat,
'longitude': lon
}

# Send the message with the vehicle_id as the key (to ensure same vehicle goes to same partition)
producer.send(
KAFKA_TOPIC,
value=message,
key=str(vehicle_id)  # Send vehicle_id as key to ensure partitioning
)

producer.flush()

# Example usage
for i in range(10):
send_vehicle_location(vehicle_id=1, lat=40.7128, lon=-74.0060)
send_vehicle_location(vehicle_id=2, lat=34.0522, lon=-118.2437)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post