Testen von GCP PubSub mit TestContainer Pubsub -Emulator
Posted: 17 Mar 2025, 00:44
Ich versuche, einen Integrationstest mit PubSub -Emulator basierend auf dem Beispiel dieses Github -Repo zu erstellen, das wie < /p>
aussieht
wobei meine Service -Implementierung Publisher anstelle von PubSubPublisherTemplate aus dem Beispiel verwendet:
und funktioniert bei der Bereitstellung für GCP jedoch nicht in diesem Fall des Integrationstests mit dem PubSub -Emulator.
aussieht
Code: Select all
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
private static final String PROJECT_ID = "test-project";
@Container
private static final PubSubEmulatorContainer pubsubEmulator =
new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));
@DynamicPropertySource
static void emulatorProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
}
@BeforeAll
static void setup() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
.usePlaintext()
.build();
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
TopicAdminClient topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build());
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build());
PubSubAdmin admin =
new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);
admin.createTopic("test-topic");
admin.createSubscription("test-subscription", "test-topic");
admin.close();
channel.shutdown();
}
// By default, autoconfiguration will initialize application default credentials.
// For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
@TestConfiguration
static class PubSubEmulatorConfiguration {
@Bean
CredentialsProvider googleCredentials() {
return NoCredentialsProvider.create();
}
}
@Autowired PubSubSender sender;
@Autowired PubSubSubscriberTemplate subscriberTemplate;
@Autowired PubSubPublisherTemplate publisherTemplate;
@Test
void testSend() throws ExecutionException, InterruptedException {
ListenableFuture future = sender.send("hello!");
List msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
@Test
void testWorker() throws ExecutionException, InterruptedException {
ListenableFuture future = publisherTemplate.publish("test-topic", "hi!");
List
messages = Collections.synchronizedList(new LinkedList());
PubSubWorker worker =
new PubSubWorker(
"test-subscription",
subscriberTemplate,
(msg) -> {
messages.add(msg);
});
worker.start();
await().until(() -> messages, not(empty()));
assertEquals(1, messages.size());
assertEquals(future.get(), messages.get(0).getMessageId());
assertEquals("hi!", messages.get(0).getData().toStringUtf8());
worker.stop();
}
@AfterEach
void teardown() {
// Drain any messages that are still in the subscription so that they don't interfere with
// subsequent tests.
await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
}
}
< /code>
funktioniert für das obige Beispiel gut, aber wenn ich meine Implementierung wie folgt testen möchte < /p>
@Autowired
private FunctionCatalog catalog;
@Test
void testSendB() throws ExecutionException, InterruptedException {
Consumer function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
var pubSubMessage = new PubSubMessage();
pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
function.accept(pubSubMessage);
List msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
< /code>
Es werfen Fehler aus: < /p>
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).
Code: Select all
private final Publisher publisher;
public void publishMessage(String message) {
var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
var pubsubApiMessage = getPubsubApiMessage(byteStr);
try {
publish(pubsubApiMessage);
} catch (Exception e) {
log.error("Error during event publishing: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}
private void publish(PubsubMessage pubsubApiMessage) throws Exception {
publisher.publish(pubsubApiMessage).get();
}
private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
return PubsubMessage.newBuilder()
.setData(byteStr)
.build();
}