Apache Flink Job, der versucht, über CDC -Quellanschluss von Mongo zu lesen, führt zu MongotimeoutExceptionJava

Java-Forum
Anonymous
 Apache Flink Job, der versucht, über CDC -Quellanschluss von Mongo zu lesen, führt zu MongotimeoutException

Post by Anonymous »

Ich versuche, Mongo -CDC -Connector als Quelle für meine Datenastream -Quelle in meinem Flink -Job zu verwenden. Ich verwende denselben Beispielcode wie [per die Dokumente] [1]. < /P>
Das ist mein Code: < /p>

Code: Select all

MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(3000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();

env.execute("mongo-cdc");
< /code>
Da ich einen Replikatsatz verwende, setze ich die Verbindungs ​​-URIs für alle drei Knoten mit den Ports ein, die durch Komma getrennt sind. Ich habe versucht, den normalen Verbindungs ​​-URI aus dem Kompass zu verwenden, aber ein weiterer Fehler wurde aufgrund des MongoDB+Srv 
am Anfang erzeugt, und ich musste die URI ändern.
Ich habe meinen lokalen Flink -Cluster ausgeführt und suche über das Terminal:

Code: Select all

./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar

dauert ungefähr 30 Sekunden, bis der Auftrag in der Flink -Benutzeroberfläche auf Port 8081 angezeigt wird, und es wechselt ständig den Status zwischen Auslauf und neustart . < /p>
Der Fehler, den der Job erzeugt, lautet:

Code: Select all

Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Ich bin mir sicher, dass das Problem nicht in Mongo ist, da ich über den Kompass ohne Problem verbinde, und ich habe auch das andere Beispiel für [Mongo Sink/Quelle ohne CDC] ausprobiert. [2] Und das alles funktioniert, auch wenn die Standard -MongoDB+SRV uri.

Code: Select all

org.apache.flink
flink-java
1.20.0



org.apache.flink
flink-streaming-java
1.20.0



org.apache.flink
flink-connector-mongodb
1.2.0-1.19



org.apache.flink
flink-connector-mongodb-cdc
3.2.1

```

What could I be missing?

[1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
[2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post