So legen Sie den Port in KafkaEmbedded fest, wenn Sie den Spring-Kafka-Consumer testenJava

Java-Forum
Anonymous
 So legen Sie den Port in KafkaEmbedded fest, wenn Sie den Spring-Kafka-Consumer testen

Post by Anonymous »

Ich verwende spring-boot-starter-parent Version 1.5.0.RELEASE, spring-kafka Version 1.0.0.RELEASE und spring-kafka-test Version 1.0.0.RELEASE in einer Anwendung, die Nachrichten von einem Kakfa 0.9-Cluster verarbeitet. Ich habe einen Komponententest für meinen Verbraucher, der KafkaEmbedded verwendet hat, aber dieser schlägt fehl, da der Broker-Port zufällig ausgewählt wird. Gibt es eine Möglichkeit, diese Brokereigenschaft festzulegen, ohne die Versionen zu ändern? Oder welche Versionen sollte ich verwenden, um nichts kaputt zu machen?
Hier ist der Code für den KafkaListener und KafkaConsumerTest.
Listener.java

Code: Select all

@Service
public class Listener {

private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}

public CountDownLatch getLatch() {
return latch;
}
}

KafkaConsumerTest.java (BEARBEITEN)

Code: Select all

@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

public KafkaTemplate template;

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private Listener listener;

@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory producerFactory = new DefaultKafkaProducerFactory(senderProps);
template = new KafkaTemplate(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}

@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post