Quarkus-Messaging-AMQP CloudEvent mit ErweiterungenJava

Java-Forum
Anonymous
 Quarkus-Messaging-AMQP CloudEvent mit Erweiterungen

Post by Anonymous »

Ich erstelle einen Quarkus -Dienst, der die SOAP -Anforderung akzeptiert und die Daten in einer AMQP -Warteschlange nach CloudEnent -Spezifikation einfügt. Ich möchte einen zusätzlichen Metadatenparameter (Erweiterung) hinzufügen, aber am Ende hat die Nachricht sie nicht und ich verstehe nicht, warum und wie es funktioniert (Quarkus ist neu für mich). < /P>
Der Code lautet wie folgt:
Messaging -Service (mit Experimentationsmethoden, die Nachrichten zu Start hinzufügen und Nachrichten aus der Warteschlange vergrößern) < /p>

Code: Select all

package com.example.messaging;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import com.example.domain.Event;
import org.eclipse.microprofile.reactive.messaging.*;
import org.jboss.logging.Logger;

import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class MessagingService {

private static final Logger LOG = Logger.getLogger(MessagingService.class);

@Inject
@Channel("events-out")
Emitter emitter;

@SuppressWarnings("unchecked")
public void send(Event event, String actionType) {
LOG.debug("Sending event with event number [%s]".formatted(event.getNumber()));
var message = Message.of(event);
var metadata = message.getMetadata(OutgoingCloudEventMetadata.class)
.orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
message.addMetadata(OutgoingCloudEventMetadata.from(metadata)
.withExtension("action_type", actionType) // FIXME Does not work.  Parameter is missing in the message.
.build());
emitter.send(message);
}

void onStart(@Observes StartupEvent ev) {
test();
}

public void test() {
var event1 = new Event();
event1.setNumber(1);
event1.setBool(true);
event1.setText("Hello World");
event1.setDatetime(ZonedDateTime.now().minusDays(1));

var event2 = new Event();
event2.setNumber(2);
event2.setDatetime(ZonedDateTime.now());

send(event1, "test");
send(event2, "test");
}

@Incoming("events-in")
public CompletionStage printMessage(Message message) {
System.out.println(message.getPayload());
message.getMetadata(CloudEventMetadata.class)
.ifPresent(metadata -> {
System.out.println("CloudEvent ID: " + metadata.getId());
System.out.println("CloudEvent Type: " + metadata.getType());
System.out.println("CloudEvent Source: " + metadata.getSource());
System.out.println("CloudEvent Timestamp: " + metadata.getTimeStamp());
System.out.println("CloudEvent content type: " + metadata.getDataContentType());
System.out.println("CloudEvent spec version: " + metadata.getSpecVersion());
System.out.println("CloudEvent extensions: " + metadata.getExtensions());
});

return message.ack();
}
}
Dummy -Domänenklasse Nur um zu überprüfen, ob es mit den erforderlichen Datentypen funktioniert

Code: Select all

package com.example.domain;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.xml.bind.annotation.XmlElement;
import jakarta.xml.bind.annotation.XmlRootElement;
import jakarta.xml.bind.annotation.XmlType;

import java.time.ZonedDateTime;
import java.util.Objects;

@XmlType(name = "Event", propOrder = {"text", "number", "bool", "datetime"})
@XmlRootElement(name = "Event")
public class Event {

private String text;
private Integer number;
private Boolean bool;
private ZonedDateTime datetime;

@JsonProperty("Text")
public String getText() {
return text;
}

@XmlElement(name = "Text")
public void setText(String text) {
this.text = text;
}

@JsonProperty("Number")
public Integer getNumber() {
return number;
}

@XmlElement(name = "Number")
public void setNumber(Integer number) {
this.number = number;
}

@JsonProperty("Bool")
public Boolean getBool() {
return bool;
}

@XmlElement(name = "Bool")
public void setBool(Boolean bool) {
this.bool = bool;
}

@JsonProperty("DateTime")
public ZonedDateTime getDatetime() {
return datetime;
}

@XmlElement(name = "DateTime")
public void setDatetime(ZonedDateTime datetime) {
this.datetime = datetime;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Event event = (Event) o;
return Objects.equals(text, event.text) && Objects.equals(number, event.number) && Objects.equals(bool, event.bool) &&  Objects.equals(datetime, event.datetime);
}

@Override
public int hashCode() {
return Objects.hash(text, number, bool, datetime);
}
}
Build.gradle.KTS

Code: Select all

plugins {
java
id("io.quarkus")
}

repositories {
mavenCentral()
mavenLocal()
}

val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project

dependencies {
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation(enforcedPlatform("${quarkusPlatformGroupId}:quarkus-cxf-bom:${quarkusPlatformVersion}"))

implementation("io.quarkus:quarkus-resteasy")
implementation("io.quarkiverse.cxf:quarkus-cxf:1.0.1")
implementation("io.quarkus:quarkus-arc")
implementation("io.quarkus:quarkus-messaging-amqp")

testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.rest-assured:rest-assured")
}

group = "com.example"
version = "1.0-SNAPSHOT"

java {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}

tasks.withType {
systemProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager")
}
tasks.withType {
options.encoding = "UTF-8"
options.compilerArgs.add("-parameters")
}
Application.Properties

Code: Select all

quarkus.cxf.path = /soap
quarkus.cxf.logging.enabled-for = both
quarkus.cxf.logging.pretty = true

mp.messaging.outgoing.events-out.address=events
mp.messaging.outgoing.events-out.cloud-events-type=com.example.service.event
mp.messaging.outgoing.events-out.cloud-events-source=/source
mp.messaging.outgoing.events-out.connector=smallrye-amqp

mp.messaging.incoming.events-in.address=events
SOAP -Endpunkt (wahrscheinlich nicht relevant)

Code: Select all

package com.example.server;

import com.example.domain.Event;
import jakarta.jws.WebMethod;
import jakarta.jws.WebService;

@WebService
public interface EventService {

@WebMethod
String add(Event event);

@WebMethod
String update(Event event);

@WebMethod
String delete(Event event);
}
< /code>
package com.example.server;

import com.example.domain.Event;
import io.quarkiverse.cxf.annotation.CXFEndpoint;
import jakarta.inject.Inject;
import jakarta.jws.WebService;
import com.example.messaging.MessagingService;
import org.jboss.logging.Logger;

@CXFEndpoint("/event")
@WebService(serviceName = "EventService")
public class EventServiceImpl implements EventService {

private static final Logger LOG = Logger.getLogger(EventServiceImpl.class);

@Inject
MessagingService messagingService;

@Override
public String add(Event event) {
return processEvent(event, "add");
}

@Override
public String update(Event event) {
return processEvent(event, "update");
}

@Override
public String delete(Event event) {
return processEvent(event, "delete");
}

private String processEvent(Event event, String actionType) {
LOG.info("Event received. Event number [%s]; Action type [%s]".formatted(event.getNumber(), actionType));
try {
messagingService.send(event, actionType);
} catch (Exception e) {
LOG.error(e);
return "Unexpected error occurred while processing event";
}
return "Event processed";
}

}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post