Flink ConfluentregistryAvroserializationSchema, die RegistryConfigs nicht respektierenJava

Java-Forum
Anonymous
 Flink ConfluentregistryAvroserializationSchema, die RegistryConfigs nicht respektieren

Post by Anonymous »

hi, wenn ich in Apache Flink das Kafkarecordserialization -Schema mit Einstellungen für die Serialisierung der Schemaregistrierung verwende, werden die Einstellungen für Registrierungskonfigurien nicht in Rechnung gestellt. class = "Lang-Java PrettyPrint-Override">

Code: Select all

package flink;

import example.avro.Car;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Map;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS;
import static io.confluent.kafka.serializers.KafkaAvroSerializerConfig.AVRO_REMOVE_JAVA_PROPS_CONFIG;

public class Jobflink {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.AT_LEAST_ONCE);

Map schema_settings = Map.of(
AVRO_REMOVE_JAVA_PROPS_CONFIG, "true",
AUTO_REGISTER_SCHEMAS, "false"
);

KafkaSink sink = KafkaSink.builder()
.setBootstrapServers("kafka-local:29092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(
ConfluentRegistryAvroSerializationSchema.forSpecific(
Car.class,
"toto-value",
"http://confluent-schema-registry-local:8081",
schema_settings))
.setTopic("toto")
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

env.execute();
}
}
< /code>
Ich habe auch versucht, die Einstellungen für das Confluent-Schema-Registrierungseinstellungen mit setKafkaproDucerConfig einzustellen, aber es funktioniert auch nicht < /p>
Properties properties = new Properties();
properties.setProperty(AVRO_REMOVE_JAVA_PROPS_CONFIG, "true");
properties.setProperty(AUTO_REGISTER_SCHEMAS, "false");

KafkaSink sink = KafkaSink.builder()
.setBootstrapServers("kafka-local:29092")
.setKafkaProducerConfig(properties)
.build();

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post