Der Code lautet wie folgt:
Messaging -Service (mit Experimentationsmethoden, die Nachrichten zu Start hinzufügen und Nachrichten aus der Warteschlange vergrößern) < /p>
Code: Select all
package com.example.messaging;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import com.example.domain.Event;
import org.eclipse.microprofile.reactive.messaging.*;
import org.jboss.logging.Logger;
import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class MessagingService {
private static final Logger LOG = Logger.getLogger(MessagingService.class);
@Inject
@Channel("events-out")
Emitter emitter;
@SuppressWarnings("unchecked")
public void send(Event event, String actionType) {
LOG.debug("Sending event with event number [%s]".formatted(event.getNumber()));
var message = Message.of(event);
var metadata = message.getMetadata(OutgoingCloudEventMetadata.class)
.orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
message.addMetadata(OutgoingCloudEventMetadata.from(metadata)
.withExtension("action_type", actionType) // FIXME Does not work. Parameter is missing in the message.
.build());
emitter.send(message);
}
void onStart(@Observes StartupEvent ev) {
test();
}
public void test() {
var event1 = new Event();
event1.setNumber(1);
event1.setBool(true);
event1.setText("Hello World");
event1.setDatetime(ZonedDateTime.now().minusDays(1));
var event2 = new Event();
event2.setNumber(2);
event2.setDatetime(ZonedDateTime.now());
send(event1, "test");
send(event2, "test");
}
@Incoming("events-in")
public CompletionStage printMessage(Message message) {
System.out.println(message.getPayload());
message.getMetadata(CloudEventMetadata.class)
.ifPresent(metadata -> {
System.out.println("CloudEvent ID: " + metadata.getId());
System.out.println("CloudEvent Type: " + metadata.getType());
System.out.println("CloudEvent Source: " + metadata.getSource());
System.out.println("CloudEvent Timestamp: " + metadata.getTimeStamp());
System.out.println("CloudEvent content type: " + metadata.getDataContentType());
System.out.println("CloudEvent spec version: " + metadata.getSpecVersion());
System.out.println("CloudEvent extensions: " + metadata.getExtensions());
});
return message.ack();
}
}
Code: Select all
package com.example.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.xml.bind.annotation.XmlElement;
import jakarta.xml.bind.annotation.XmlRootElement;
import jakarta.xml.bind.annotation.XmlType;
import java.time.ZonedDateTime;
import java.util.Objects;
@XmlType(name = "Event", propOrder = {"text", "number", "bool", "datetime"})
@XmlRootElement(name = "Event")
public class Event {
private String text;
private Integer number;
private Boolean bool;
private ZonedDateTime datetime;
@JsonProperty("Text")
public String getText() {
return text;
}
@XmlElement(name = "Text")
public void setText(String text) {
this.text = text;
}
@JsonProperty("Number")
public Integer getNumber() {
return number;
}
@XmlElement(name = "Number")
public void setNumber(Integer number) {
this.number = number;
}
@JsonProperty("Bool")
public Boolean getBool() {
return bool;
}
@XmlElement(name = "Bool")
public void setBool(Boolean bool) {
this.bool = bool;
}
@JsonProperty("DateTime")
public ZonedDateTime getDatetime() {
return datetime;
}
@XmlElement(name = "DateTime")
public void setDatetime(ZonedDateTime datetime) {
this.datetime = datetime;
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Event event = (Event) o;
return Objects.equals(text, event.text) && Objects.equals(number, event.number) && Objects.equals(bool, event.bool) && Objects.equals(datetime, event.datetime);
}
@Override
public int hashCode() {
return Objects.hash(text, number, bool, datetime);
}
}
Code: Select all
plugins {
java
id("io.quarkus")
}
repositories {
mavenCentral()
mavenLocal()
}
val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project
dependencies {
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation(enforcedPlatform("${quarkusPlatformGroupId}:quarkus-cxf-bom:${quarkusPlatformVersion}"))
implementation("io.quarkus:quarkus-resteasy")
implementation("io.quarkiverse.cxf:quarkus-cxf:1.0.1")
implementation("io.quarkus:quarkus-arc")
implementation("io.quarkus:quarkus-messaging-amqp")
testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.rest-assured:rest-assured")
}
group = "com.example"
version = "1.0-SNAPSHOT"
java {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}
tasks.withType {
systemProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager")
}
tasks.withType {
options.encoding = "UTF-8"
options.compilerArgs.add("-parameters")
}
Code: Select all
quarkus.cxf.path = /soap
quarkus.cxf.logging.enabled-for = both
quarkus.cxf.logging.pretty = true
mp.messaging.outgoing.events-out.address=events
mp.messaging.outgoing.events-out.cloud-events-type=com.example.service.event
mp.messaging.outgoing.events-out.cloud-events-source=/source
mp.messaging.outgoing.events-out.connector=smallrye-amqp
mp.messaging.incoming.events-in.address=events
Code: Select all
package com.example.server;
import com.example.domain.Event;
import jakarta.jws.WebMethod;
import jakarta.jws.WebService;
@WebService
public interface EventService {
@WebMethod
String add(Event event);
@WebMethod
String update(Event event);
@WebMethod
String delete(Event event);
}
< /code>
package com.example.server;
import com.example.domain.Event;
import io.quarkiverse.cxf.annotation.CXFEndpoint;
import jakarta.inject.Inject;
import jakarta.jws.WebService;
import com.example.messaging.MessagingService;
import org.jboss.logging.Logger;
@CXFEndpoint("/event")
@WebService(serviceName = "EventService")
public class EventServiceImpl implements EventService {
private static final Logger LOG = Logger.getLogger(EventServiceImpl.class);
@Inject
MessagingService messagingService;
@Override
public String add(Event event) {
return processEvent(event, "add");
}
@Override
public String update(Event event) {
return processEvent(event, "update");
}
@Override
public String delete(Event event) {
return processEvent(event, "delete");
}
private String processEvent(Event event, String actionType) {
LOG.info("Event received. Event number [%s]; Action type [%s]".formatted(event.getNumber(), actionType));
try {
messagingService.send(event, actionType);
} catch (Exception e) {
LOG.error(e);
return "Unexpected error occurred while processing event";
}
return "Event processed";
}
}