Hier ist der Code für den KafkaListener und KafkaConsumerTest.
Listener.java
Code: Select all
@Service
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}
public CountDownLatch getLatch() {
return latch;
}
}
Code: Select all
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
public KafkaTemplate template;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private Listener listener;
@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory producerFactory = new DefaultKafkaProducerFactory(senderProps);
template = new KafkaTemplate(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}
@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}