Code: Select all
Kafka Setup< /code>: < /p>
--> 1 Zookeeper instance
--> 1 Java Producer
--> 1 Java Verbraucher < /code> < /p>
So habe ich Themen- und Broker -Partitionen erstellt < /code>: < /p>
Code: Select all
bin/kafka-topics.sh --create --zookeeper 10.10.1.5:2181 --replication-factor 1 --partitions 8 --topic test< /code> < /p>
Der Rest der Konfiguration lautet wie folgt .. < /p>
Producer Code< /code>: < /p>
KeyedMessage publishData = new KeyedMessage(this.topic, data);
producer.send(publishData);
Code: Select all
Producer Config< /code>: < /p>
batch.size = 200
producer.type = async
sflow-topic = test
connect.timeout.ms = 10000
request.required.acks = 0
zk.connect = 10.10.1.5:2181
serializer.class = kafka.serializer.DefaultEncoder
partitioner.class = kafka.producer.DefaultPartitioner
metadata.broker.list = 10.10.1.5:9092,10.10.1.6:9092,10.10.1.7:9092
< /code>
Ich kann sehen, wie mein Produzent gut arbeitet. Das [url=viewtopic.php?t=20324]Problem[/url] ist, dass der Verbraucher die Nachrichten verbraucht. Auch wenn der Verbraucher zurückbleibt, sehe ich nicht, dass meine Nachrichten verzehrt (und schließlich verarbeiten und in ein DB eingefügt). /> Consumer Code< /code>: < /p>
public class FlowConsumer {
private final String topic;
private final ExecutorService threadPool;
private final ConsumerConnector consumer;
private static AppProperties appProperties;
private final ExecutorService processDataThreadPool;
public FlowConsumer() throws Exception {
/**
* Load properties configuration for flowLog4j.properties.
*/
appProperties = AppProperties.loadConfiguration();
/** Assign the flow-topic.. */
this.topic = appProperties.getString(AppConstants.FLOW_TOPIC);
logger.fatal("Topic : "+topic);
/** Initialize the thread pool to consume kafka byte[] streams.. */
this.threadPool = Executors.newFixedThreadPool(20);
/** Initialize the thread pool for processing kafka byte[] messages.. */
this.processDataThreadPool = Executors.newFixedThreadPool(100);
/** Fetch the Consumer Config, by reading the Flow.properties file.. */
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerPropertyConfig.getConsumerConfig(appProperties));
logger.fatal("Consumer : "+consumer);
//new Thread(new Consumer()).start();
threadPool.submit(new Consumer());
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (threadPool != null) threadPool.shutdown();
if (processDataThreadPool!= null) processDataThreadPool.shutdown();
}
private class Consumer implements Runnable {
public Consumer() {
logger.fatal("Started Consumer Thread!");
}
@Override
public void run() {
Map topicCountMap = new HashMap();
Map consumerMap = consumer.createMessageStreams(topicCountMap);
List streams = consumerMap.get(topic);
for (final KafkaStream kafkaStream : streams) {
for (MessageAndMetadata messageAndMetadata : kafkaStream) {
processDataThreadPool.submit(new FlowServiceImpl(messageAndMetadata.message()));
}
}
}
}
public static void main(String[] args) throws Exception {
FlowConsumer consumer = new FlowConsumer();
/*try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
consumer.shutdown();*/
}
}
< /code>
Consumer Config< /code>: < /p>
group.id = group1
flow-topic = test
auto.offset.reset = smallest
auto.commit.interval.ms = 2000
zookeeper.connect = 10.10.1.5:2181
zookeeper.sync.time.ms = 2000
zookeeper.session.timeout.ms = 2000
zookeeper.connection.timeout.ms = 6000
< /code>
Question 1
Für 3 Broker können/sollte ich mehr als 3 Partitionen erstellen? Ich habe gelesen, dass mehr Partitionen bedeutet, dass ich meinem Verbraucher mehr Parallelität hinzufügen kann? Aber wie, indem ein mehr Verbraucherfaden für einen einzelnen Verbraucher verwendet wird? Oder durch 3 Verbraucherinstanzen, jeweils 1 Thread? < /P>
Code: Select all
Question 2< /code>: < /p>
Is my Java consumer config code correct/wrong
Kann mir bitte jemand sagen, was ich hier falsch mache? < /P.>