Code: Select all
services:
pulsar_consumer:
build:
context: .
dockerfile: Dockerfile
image: allocation-image
depends_on:
postgres:
condition: service_started
broker:
condition: service_started
environment:
- PULSAR_HOST=broker
- PYTHONDONTWRITEBYTECODE=1
volumes:
- ./src:/src
- ./tests:/tests
entrypoint:
- python
- src/allocation/entrypoints/pulsar_eventconsumer.py
# Start zookeeper
zookeeper:
...
# Start broker
broker:
image: apachepulsar/pulsar:latest
container_name: broker
hostname: broker
restart: on-failure
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- bindAddresses=docker_internal://broker:6650
- advertisedAddress=broker
- advertisedListeners=external:pulsar://127.0.0.1:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
Code: Select all
2025-08-05 12:48:40
2025-08-05 16:48:40.611 INFO [281473074655616] ClientConnection:193 | [ -> pulsar://broker:6650] Create ClientConnection, timeout=10000
2025-08-05 12:48:40
2025-08-05 16:48:40.611 INFO [281473074655616] ConnectionPool:124 | Created connection for pulsar://broker:6650-pulsar://broker:6650-0
2025-08-05 12:48:40
2025-08-05 16:48:40.612 INFO [281473074655616] ClientConnection:410 | [172.20.0.9:57664 -> 172.20.0.8:6650] Connected to broker
2025-08-05 12:48:41
2025-08-05 16:48:41.849 INFO [281473074655616] HandlerBase:115 | [persistent://public/default/change-batch-quantity, ] Getting connection from pool
2025-08-05 12:48:42
2025-08-05 16:48:42.030 INFO [281473074655616] BinaryProtoLookupService:85 | Lookup response for persistent://public/default/change-batch-quantity, lookup-broker-url pulsar://127.0.0.1:6650, from [172.20.0.9:57664 -> 172.20.0.8:6650]
2025-08-05 12:48:42
2025-08-05 16:48:42.030 INFO [281473074655616] ClientConnection:193 | [ -> pulsar://127.0.0.1:6650] Create ClientConnection, timeout=10000
2025-08-05 12:48:42
2025-08-05 16:48:42.030 INFO [281473074655616] ConnectionPool:124 | Created connection for pulsar://127.0.0.1:6650-pulsar://127.0.0.1:6650-0
2025-08-05 12:48:42
2025-08-05 16:48:42.030 WARN [281473074655616] ClientConnection:483 | [ -> pulsar://127.0.0.1:6650] Failed to establish connection: Connection refused
< /code>
Es ist ein einfacher Pulsar -Verbraucher: < /p>
import json
import logging
import pulsar
from allocation import bootstrap, config
logger = logging.getLogger(__name__)
def main():
logger.info("Consumer starting")
client = pulsar.Client(config.get_pulsar_uri(), operation_timeout_seconds=30)
consumer = client.subscribe("change-batch-quantity", "allocation-consumer")
while True:
msg = consumer.receive()
try:
logger.debug(
"Received message '{}' id='{}'".format(msg.data(), msg.message_id())
)
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
if __name__ == "__main__":
main()