Ich verwende den org.springframework application.yml-Datei < /p>
Code: Select all
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file:
role-arn:
role-session-name: RoleSessionName
region:
static:
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
< /code>
Unten ist die Java-Klasse, die die Bean zum Verarbeiten von Daten von Kinesis < /p>
enthält.@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
@Configuration
public class KinesisConfig {
private final DataSource dataSource;
@Autowired
KinesisConfig(@Qualifier("dataSource") DataSource dataSource){ this.dataSource = dataSource;}
@Bean
public LockRepository lockRepository(){
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
return lockRepository;
}
@Bean
public LockRegistry lockRegistry(LockRepository lockRepository){return new JdbcLockRegistry(lockRepository);}
@Bean
public ConcurrentMetadataStore metadataStore(){
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
return metadataStore;
}
}
< /code>
gemäß meiner vorherigen Frage, die ich unter den folgenden Links gestellt habe. und Sperren beim Konsumieren von Daten von Kinesis mit dem Binder -Ansatz < /li>
prüfen und in der Amazon Kinesis mit postgreSQL < /li>
< /ul>
verspunden werden Ich hatte die gleiche Lösung gemacht und es funktioniert für mich. Ich konnte PostgreSQL zum Checkpointing- und Sperren verwenden. Fehler. < /p>
Es gibt zwei Pods, die dieselbe Codebasis ausführen, die Nachrichten von Kinesis "test-Test.tst.v1" < /p>
unten verbrauchen. Unten ist die Erstellen Sie Abfrage für die Sperrtabelle < /p>
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
< /code>
Analyse < /h3>
Fehler: Duplicate -Schlüsselwert verletzt eindeutige Einschränkung "int_lock_pk"
Details : Key (lock_key, Region) = (2B062295-539B-38EB-A544-A6DD9214E50B, Standard) existiert bereits. $ 2, $ 3, $ 4) < /p>
< /blockquote>
Die Protokolldatei wird jede Minute mit diesem Fehler generiert. < /h4>
Ich hatte DynamoDB durch eine Postgresql -DB für POD -Verriegelung und -Chropting ersetzt. Standardkonfiguration (Dynamo DB, Checkpointing, Metadatastore) mit der oben genannten Kinesisconfig -Klasse überschreiben. unten Skript < /p>
CREATE TABLE INT_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE TIMESTAMP NOT NULL,
MESSAGE_BYTES BYTEA,
constraint INT_MESSAGE_PK primary key (MESSAGE_ID, REGION)
);
CREATE INDEX INT_MESSAGE_IX1 ON INT_MESSAGE (CREATED_DATE);
CREATE TABLE INT_GROUP_TO_MESSAGE (
GROUP_KEY CHAR(36) NOT NULL,
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100),
constraint INT_GROUP_TO_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION)
);
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
GROUP_CONDITION VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
UPDATED_DATE TIMESTAMP DEFAULT NULL,
constraint INT_MESSAGE_GROUP_PK primary key (GROUP_KEY, REGION)
);
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE;
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'),
MESSAGE_BYTES BYTEA,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE INDEX INT_CHANNEL_MSG_DELETE_IDX ON INT_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID);
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
< /code>
< /li>
< /ol>
Gewünschtes Verhalten < /h4>
Es sollte nicht doppelten Schlüsselwert erstellen, der eindeutige Einschränkung verletzt Beim Schließen der POD durch das Abhängigkeitsspring-Cloud-Stream-Binder-Kinesis-Jar.
Wie kann ich dieses Problem lösen?