My Application.yml-Datei
Code: Select all
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file:
role-arn:
role-session-name: RoleSessionName
region:
static:
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
< /code>
unten finden Sie die Java -Klasse, die die Bean zur Verarbeitung von Daten von Kinesis < /p>
enthalten@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
@Configuration
public class KinesisConfig {
private final DataSource dataSource;
@Autowired
KinesisConfig(@Qualifier("dataSource") DataSource dataSource){ this.dataSource = dataSource;}
@Bean
public LockRepository lockRepository(){
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
return lockRepository;
}
@Bean
public LockRegistry lockRegistry(LockRepository lockRepository){return new JdbcLockRegistry(lockRepository);}
@Bean
public ConcurrentMetadataStore metadataStore(){
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
return metadataStore;
}
}
< /code>
gemäß meiner vorherigen Frage, die ich unter den folgenden Links gestellt habe. und Sperren beim Konsumieren von Daten von Kinesis mit dem Binder -Ansatz < /li>
Überprüfungen und Sperren in Amazon Kinesis mit postgresql < /li>
< /ol>
Ich hatte die gleiche Lösung und seine Arbeit für mich gemacht. Ich konnte Postgresql zum Checkpointing- und Sperren verwenden. p>
Es gibt zwei Pods gleiche Codebasis, die eine Nachricht von Kinesis "test-test.tst.v1" < /p>
Unten finden Sie die Abfrage für die Sperre erstellen Tabelle < /p>
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
< /code>
Analyse < /strong> < /p>
Fehler: Duplicate -Schlüsselwert verletzt eindeutige Einschränkung "int_lock_pk"
detail: key (lock_key , Region) = (2B062295-539B-38EB-A544-A6DD9214E50B, Standard) existiert bereits. p>
Protokolldatei wird in jeder Minute mit diesem Fehler generiert.
Ich hatte Dynamo DB durch Postgress -DB für POD -Verriegelung und Prüfpointing ersetzt. Standardkonfiguration (Dynamo DB, Checkpointing, Metadatastore) mit der obigen Kinesisconfig -Klasse über < /p>
< /li>
Ich hatte die Sperren- und Überprüfungspunktabelle mithilfe der folgenden Hinweise hinzugefügt Skript < /p>
Tabelle erstellen int_message (
message_id char (36) NICHT NULL,
Region VARCHAR (100) NICHT NULL,
Created_Date Timestamp, nicht Null, < BR /> Message_Bytes bytea,
Einschränkung int_message_pk Primärschlüssel (Message_id, Region)
); br /> Tabelle erstellen int_group_to_message (
gruppe_key char (36) nicht null,
message_id char (36) NICHT NULL,
Region VARCHAR (100),
Einschränkung int_group_to_message_pk Primärschlüssel (Group_Key, Message_ID, Region)
); ) Nicht null,
Group_Condition varchar (255),
Complete Bigint,
last_released_sequence Bigint,
erstellt_date Timestamp Null,
updated_date Timestamp Default Null,
Einschränkung int_message_group_pk primärer Key (Group_key, Region)
); ) Nicht NULL,
client_id char (36),
created_date timestamp nicht null,
Einschränkung int_lock_pk primärer Key (lock_key, Region)
); < /p>
Sequenz erstellen int_message_seq starten mit 1 inkrement von 1 kein cycle; ) Nicht null,
erstellt_date bigint nicht null,
message_priority bigint,
message_sequence bigint nicht null default nextVal ('int_message_seq'),
message_bytes bytea,
Region Varchar (Varchary (Varchary 100) Nicht null,
Einschränkung int_channel_message_pk primärer Key (Region, Group_key, erstellt_date, message_sequence)
); ); ,
Einschränkung int_metadata_store_pk Primärschlüssel (metadata_key, Region)
); Gewünschtes Verhalten < /strong>
Es sollte nicht doppelten Schlüsselwert erzeugen Könnte mir bitte jemand helfen, dieses Problem zu lösen.