Das ist mein Code: < /p>
Code: Select all
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
< /code>
Da ich einen Replikatsatz verwende, setze ich die Verbindungs -URIs für alle drei Knoten mit den Ports ein, die durch Komma getrennt sind. Ich habe versucht, den normalen Verbindungs -URI aus dem Kompass zu verwenden, aber ein weiterer Fehler wurde aufgrund des MongoDB+Srv
Ich habe meinen lokalen Flink -Cluster ausgeführt und suche über das Terminal:
Code: Select all
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
dauert ungefähr 30 Sekunden, bis der Auftrag in der Flink -Benutzeroberfläche auf Port 8081 angezeigt wird, und es wechselt ständig den Status zwischen Auslauf und neustart . < /p>
Der Fehler, den der Job erzeugt, lautet:
Code: Select all
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Code: Select all
org.apache.flink
flink-java
1.20.0
org.apache.flink
flink-streaming-java
1.20.0
org.apache.flink
flink-connector-mongodb
1.2.0-1.19
org.apache.flink
flink-connector-mongodb-cdc
3.2.1
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
[2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/