Ist dies die richtige Kafka Consumer -Konfiguration - unter diesem Kafka -Setup?Java

Java-Forum
Anonymous
 Ist dies die richtige Kafka Consumer -Konfiguration - unter diesem Kafka -Setup?

Post by Anonymous »

Mein Kafka -Produzent erstellt Nachrichten mit etwa .. 350 MB pro 30 Sekunden ..

Code: Select all

Kafka Setup< /code>: < /p>

--> 1 Zookeeper instance
--> 3 Kafka Brokers

--> 1 Java Producer

--> 1 Java Verbraucher < /code> < /p>

So habe ich Themen- und Broker -Partitionen erstellt < /code>: < /p>

Code: Select all

bin/kafka-topics.sh --create --zookeeper 10.10.1.5:2181 --replication-factor 1 --partitions 8 --topic test< /code> < /p>

Der Rest der Konfiguration lautet wie folgt .. < /p>

Producer Code< /code>: < /p>

KeyedMessage publishData = new KeyedMessage(this.topic, data);
producer.send(publishData);
Hier ist Data ein 5000 Längen -Byte [] .

Code: Select all

Producer Config< /code>: < /p>

batch.size = 200
producer.type = async
sflow-topic = test
connect.timeout.ms = 10000
request.required.acks = 0
zk.connect = 10.10.1.5:2181
serializer.class = kafka.serializer.DefaultEncoder
partitioner.class = kafka.producer.DefaultPartitioner
metadata.broker.list = 10.10.1.5:9092,10.10.1.6:9092,10.10.1.7:9092
< /code>

Ich kann sehen, wie mein Produzent gut arbeitet. Das [url=viewtopic.php?t=20324]Problem[/url] ist, dass der Verbraucher die Nachrichten verbraucht. Auch wenn der Verbraucher zurückbleibt, sehe ich nicht, dass meine Nachrichten verzehrt (und schließlich verarbeiten und in ein DB eingefügt). /> Consumer Code< /code>: < /p>

 public class FlowConsumer {
private final String topic;
private final ExecutorService threadPool;
private final ConsumerConnector consumer;
private static AppProperties appProperties;
private final ExecutorService processDataThreadPool;

public FlowConsumer() throws Exception {
/**
* Load properties configuration for flowLog4j.properties.
*/
appProperties = AppProperties.loadConfiguration();

/** Assign the flow-topic.. */
this.topic = appProperties.getString(AppConstants.FLOW_TOPIC);
logger.fatal("Topic : "+topic);

/** Initialize the thread pool to consume kafka byte[] streams.. */
this.threadPool = Executors.newFixedThreadPool(20);

/** Initialize the thread pool for processing kafka byte[] messages.. */
this.processDataThreadPool = Executors.newFixedThreadPool(100);

/** Fetch the Consumer Config, by reading the Flow.properties file..  */
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerPropertyConfig.getConsumerConfig(appProperties));

logger.fatal("Consumer : "+consumer);

//new Thread(new Consumer()).start();
threadPool.submit(new Consumer());
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (threadPool != null) threadPool.shutdown();
if (processDataThreadPool!= null) processDataThreadPool.shutdown();
}

private class Consumer implements Runnable {

public Consumer() {
logger.fatal("Started Consumer Thread!");
}

@Override
public void run() {
Map topicCountMap = new HashMap();
Map consumerMap = consumer.createMessageStreams(topicCountMap);
List streams = consumerMap.get(topic);
for (final KafkaStream kafkaStream : streams) {
for (MessageAndMetadata messageAndMetadata : kafkaStream) {
processDataThreadPool.submit(new FlowServiceImpl(messageAndMetadata.message()));
}
}
}
}

public static void main(String[] args) throws Exception {
FlowConsumer consumer = new FlowConsumer();

/*try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
consumer.shutdown();*/
}
}
< /code>

Consumer Config< /code>: < /p>

group.id = group1
flow-topic = test
auto.offset.reset = smallest
auto.commit.interval.ms = 2000
zookeeper.connect = 10.10.1.5:2181
zookeeper.sync.time.ms = 2000
zookeeper.session.timeout.ms = 2000
zookeeper.connection.timeout.ms = 6000
< /code>

Question 1
:

Für 3 Broker können/sollte ich mehr als 3 Partitionen erstellen? Ich habe gelesen, dass mehr Partitionen bedeutet, dass ich meinem Verbraucher mehr Parallelität hinzufügen kann? Aber wie, indem ein mehr Verbraucherfaden für einen einzelnen Verbraucher verwendet wird? Oder durch 3 Verbraucherinstanzen, jeweils 1 Thread? < /P>

Code: Select all

Question 2< /code>: < /p>

Is my Java consumer config code correct/wrong
? < /p>

Kann mir bitte jemand sagen, was ich hier falsch mache? < /P.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post