Ich bin neu bei Apache Kafka und möchte es mit Python einrichten, um Fahrzeugorte zu verfolgen. Für die Implementierung von WebSocket verwende ich Django -Kanäle. Während dieses Setup es mir ermöglicht, Echtzeitdaten aus dem Frontend zu erhalten, ist es nicht ideal für die Leistung und Skalierbarkeit. Ich möchte jedoch keine Daten für jede eingehende Nachricht kontinuierlich in die Datenbank einfügen. Um dies anzugehen, habe ich die Batch -Verarbeitung in meinem Kafka -Verbraucher implementiert. Trotzdem empfängt der Verbraucher weiterhin Nachrichten, nachdem er erstellt wurde. Unten finden Sie den Beispielcode für meinen Kafka -Produzenten und meinen Verbraucher. < /P>
consumers.py
KAFKA_TOPIC = 'vehicle-location-updates'
KAFKA_BROKER = 'localhost:9092' # Adjust to your Kafka broker
# Create Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def consume_vehicle_locations():
for message in consumer:
# Get the data from the Kafka message
data = message.value
vehicle_id = data.get('vehicle_id')
lat = data.get('latitude')
lon = data.get('longitude')
print(f"Location updated for {vehicle_id} at {lat, lon}")
KAFKA_TOPIC = 'vehicle-location-updates'
KAFKA_BROKER = 'localhost:9092' # Adjust to your Kafka broker
# Create Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def consume_vehicle_locations():
for message in consumer:
# Get the data from the Kafka message
data = message.value
vehicle_id = data.get('vehicle_id')
lat = data.get('latitude')
lon = data.get('longitude')
print(f"Location updated for {vehicle_id} at {lat, lon}")
< /code>
producers.py
from kafka import KafkaProducer
import json
KAFKA_TOPIC = "vehicle-location-updates"
KAFKA_BROKER = "localhost:9092" # Adjust to your Kafka broker
# Custom partitioner to ensure messages for the same vehicle_id go to the same partition
def vehicle_partition(key_bytes, all_partitions, available_partitions):
vehicle_id = key_bytes.decode('utf-8')
# Use the vehicle_id hash to determine the partition (this is just an example)
partition = hash(vehicle_id) % len(available_partitions)
return partition
# Kafka Producer Configuration
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BROKER],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda v: str(v).encode('utf-8'), # Serialize vehicle_id as key
batch_size=1048576, # 1 MB batch size (increase to accumulate more messages)
linger_ms=100000, # Wait for 100 ms before sending the batch
partitioner=vehicle_partition # Custom partitioner for vehicles
)
# Send location messages for multiple vehicles
def send_vehicle_location(vehicle_id, lat, lon):
message = {
'vehicle_id': vehicle_id,
'latitude': lat,
'longitude': lon
}
# Send the message with the vehicle_id as the key (to ensure same vehicle goes to same partition)
producer.send(
KAFKA_TOPIC,
value=message,
key=str(vehicle_id) # Send vehicle_id as key to ensure partitioning
)
producer.flush()
# Example usage
for i in range(10):
send_vehicle_location(vehicle_id=1, lat=40.7128, lon=-74.0060)
send_vehicle_location(vehicle_id=2, lat=34.0522, lon=-118.2437)
Ich bin neu bei Apache Kafka und möchte es mit Python einrichten, um Fahrzeugorte zu verfolgen. Für die Implementierung von WebSocket verwende ich Django -Kanäle. Während dieses Setup es mir ermöglicht, Echtzeitdaten aus dem Frontend zu erhalten, ist es nicht ideal für die Leistung und Skalierbarkeit. [url=viewtopic.php?t=14917]Ich möchte[/url] jedoch keine Daten für jede eingehende Nachricht kontinuierlich in die Datenbank einfügen. Um dies anzugehen, habe ich die Batch -Verarbeitung in meinem Kafka -Verbraucher implementiert. Trotzdem empfängt der Verbraucher weiterhin Nachrichten, nachdem er erstellt wurde. Unten finden Sie den Beispielcode für meinen Kafka -Produzenten und meinen Verbraucher. < /P> [code]consumers.py
KAFKA_TOPIC = 'vehicle-location-updates' KAFKA_BROKER = 'localhost:9092' # Adjust to your Kafka broker
def consume_vehicle_locations(): for message in consumer: # Get the data from the Kafka message data = message.value vehicle_id = data.get('vehicle_id') lat = data.get('latitude') lon = data.get('longitude') print(f"Location updated for {vehicle_id} at {lat, lon}")
KAFKA_TOPIC = 'vehicle-location-updates' KAFKA_BROKER = 'localhost:9092' # Adjust to your Kafka broker
def consume_vehicle_locations(): for message in consumer: # Get the data from the Kafka message data = message.value vehicle_id = data.get('vehicle_id') lat = data.get('latitude') lon = data.get('longitude') print(f"Location updated for {vehicle_id} at {lat, lon}") < /code> producers.py from kafka import KafkaProducer import json
KAFKA_TOPIC = "vehicle-location-updates" KAFKA_BROKER = "localhost:9092" # Adjust to your Kafka broker
# Custom partitioner to ensure messages for the same vehicle_id go to the same partition def vehicle_partition(key_bytes, all_partitions, available_partitions): vehicle_id = key_bytes.decode('utf-8') # Use the vehicle_id hash to determine the partition (this is just an example) partition = hash(vehicle_id) % len(available_partitions) return partition
# Kafka Producer Configuration producer = KafkaProducer( bootstrap_servers=[KAFKA_BROKER], value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda v: str(v).encode('utf-8'), # Serialize vehicle_id as key batch_size=1048576, # 1 MB batch size (increase to accumulate more messages) linger_ms=100000, # Wait for 100 ms before sending the batch partitioner=vehicle_partition # Custom partitioner for vehicles )
# Send location messages for multiple vehicles def send_vehicle_location(vehicle_id, lat, lon): message = { 'vehicle_id': vehicle_id, 'latitude': lat, 'longitude': lon }
# Send the message with the vehicle_id as the key (to ensure same vehicle goes to same partition) producer.send( KAFKA_TOPIC, value=message, key=str(vehicle_id) # Send vehicle_id as key to ensure partitioning )
producer.flush()
# Example usage for i in range(10): send_vehicle_location(vehicle_id=1, lat=40.7128, lon=-74.0060) send_vehicle_location(vehicle_id=2, lat=34.0522, lon=-118.2437) [/code]
nach Git -Klon und wenn ich:
npm i
Es zeigt den Fehler wie diesen:
npm warn EBADENGINE required: { node: '>=20.0.0 =10.0.0' },
npm warn EBADENGINE current: { node: 'v22.11.0', npm: '11.2.0' }
npm...
Ich arbeite derzeit an einem Projekt mit Java Spring Boot und Apache Kafka, bei dem mehrere Microservices über Kafka kommunizieren. Unser Ziel ist es, 100.000 Transaktionen pro Sekunde (TPS) (oder...
Ich verwende Spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) und Spring-Boot(2.1.1). Ich kann keine Transaktion ausführen, da mein Listener keine Partition zuweisen kann. Ich...
Hier geht es um den Kafka-Hörer. Wir haben keinen Zugriff auf den Herausgeber (es ist ein Drittanbieter).
Genau 9 Minuten nach dem Start der App erhalten wir Folgendes in unseren...