Der doppelte Schlüsselwert zu erhalten, der bei Amazon Kinesis mit PostgreSQL [geschlossen] verstößt, während sie in AmaJava

Java-Forum
Anonymous
 Der doppelte Schlüsselwert zu erhalten, der bei Amazon Kinesis mit PostgreSQL [geschlossen] verstößt, während sie in Ama

Post by Anonymous »

Ich habe eine Spring -Boot -Anwendung, die Amazon Kinesis verwendet hat, um Daten zu konsumieren und sie auf PostgreSQL zu speichern. Da ich bereits eine Datenbank (PostgreSQL) in meiner Anwendung verwende, möchte ich vermeiden, eine andere Datenbank (Dynamo DB) nur für CheckPointing und Sperren zu verwenden. Damit die Ressourcenkosten reduziert werden können. -Binder-Kinesis: 4.0.2 '
My Application.yml-Datei

Code: Select all

spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file: 
role-arn: 
role-session-name: RoleSessionName
region:
static: 
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
< /code>
unten finden Sie die Java -Klasse, die die Bean zur Verarbeitung von Daten von Kinesis < /p>
enthalten@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
kinesis config
@Configuration
public class KinesisConfig {

private final DataSource dataSource;

@Autowired
KinesisConfig(@Qualifier("dataSource") DataSource dataSource){ this.dataSource = dataSource;}

@Bean
public LockRepository lockRepository(){
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
return lockRepository;
}

@Bean
public LockRegistry lockRegistry(LockRepository lockRepository){return new JdbcLockRegistry(lockRepository);}

@Bean
public ConcurrentMetadataStore metadataStore(){
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
return metadataStore;
}
}
< /code>
gemäß meiner vorherigen Frage, die ich unter den folgenden Links gestellt habe. und Sperren beim Konsumieren von Daten von Kinesis mit dem Binder -Ansatz < /li>
Überprüfungen und Sperren in Amazon Kinesis mit postgresql < /li>
< /ol>
Ich hatte die gleiche Lösung und seine Arbeit für mich gemacht. Ich konnte Postgresql zum Checkpointing- und Sperren verwenden. p>
Es gibt zwei Pods gleiche Codebasis, die eine Nachricht von Kinesis "test-test.tst.v1" < /p>
Unten finden Sie die Abfrage für die Sperre erstellen Tabelle < /p>
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
< /code>
Analyse < /strong> < /p>
Fehler: Duplicate -Schlüsselwert verletzt eindeutige Einschränkung "int_lock_pk"
detail: key (lock_key , Region) = (2B062295-539B-38EB-A544-A6DD9214E50B, Standard) existiert bereits. p>
Protokolldatei wird in jeder Minute mit diesem Fehler generiert.
Ich hatte Dynamo DB durch Postgress -DB für POD -Verriegelung und Prüfpointing ersetzt. Standardkonfiguration (Dynamo DB, Checkpointing, Metadatastore) mit der obigen Kinesisconfig -Klasse über < /p>
< /li>
Ich hatte die Sperren- und Überprüfungspunktabelle mithilfe der folgenden Hinweise hinzugefügt Skript < /p>
Tabelle erstellen int_message (
message_id char (36) NICHT NULL,
Region VARCHAR (100) NICHT NULL,
Created_Date Timestamp, nicht Null, < BR /> Message_Bytes bytea,
Einschränkung int_message_pk Primärschlüssel (Message_id, Region)
); br /> Tabelle erstellen int_group_to_message (
gruppe_key char (36) nicht null,
message_id char (36) NICHT NULL,
Region VARCHAR (100),
Einschränkung int_group_to_message_pk Primärschlüssel (Group_Key, Message_ID, Region)
); ) Nicht null,
Group_Condition varchar (255),
Complete Bigint,
last_released_sequence Bigint,
erstellt_date Timestamp Null,
updated_date Timestamp Default Null,
Einschränkung int_message_group_pk primärer Key (Group_key, Region)
); ) Nicht NULL,
client_id char (36),
created_date timestamp nicht null,
Einschränkung int_lock_pk primärer Key (lock_key, Region)
); < /p>
Sequenz erstellen int_message_seq starten mit 1 inkrement von 1 kein cycle; ) Nicht null,
erstellt_date bigint nicht null,
message_priority bigint,
message_sequence bigint nicht null default nextVal ('int_message_seq'),
message_bytes bytea,
Region Varchar (Varchary (Varchary 100) Nicht null,
Einschränkung int_channel_message_pk primärer Key (Region, Group_key, erstellt_date, message_sequence)
); ); ,
Einschränkung int_metadata_store_pk Primärschlüssel (metadata_key, Region)
); Gewünschtes Verhalten < /strong>
Es sollte nicht doppelten Schlüsselwert erzeugen Könnte mir bitte jemand helfen, dieses Problem zu lösen.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post