Bei Starrocks habe ich also eine Pipe, die nach neuen Dateien in meinem S3-Bucket sucht. Jetzt möchte ich diese Datei in einer temporären Tabelle haben, die mit der neuesten Datei überschrieben wird, die in meinem S3-Bucket landet. Aber eine Starrocks-Pipe unterstützt INSERT OVERWRITE nicht, also wie könnte man so etwas erreichen? Meine materialisierte Ansicht sieht ungefähr so aus.
Code: Select all
CREATE MATERIALIZED VIEW mv_segmentation_manager REFRESH ASYNC AS
SELECT *
FROM segmentation_manager
WHERE segment_name NOT IN (SELECT DISTINCT segment_name FROM temp_segments)
UNION ALL
SELECT *
FROM temp_segments;
Meine Pipe hat die folgende Konfiguration
Code: Select all
CREATE PIPE segment_manager
PROPERTIES
(
"AUTO_INGEST" = "TRUE",
"POLL_INTERVAL" = "5"
)
AS
INSERT OVERWRITE temp_segments
SELECT * FROM FILES
(
"path" = "s3://my-bucket/*.snappy.parquet",
"format" = "parquet",
"aws.s3.region" = "region",
"aws.s3.use_instance_profile" = "true"
);
Was also im Grunde genommen passieren muss, ist, dass meine materialisierte Ansicht nach segment_name in der neuesten S3-Datei sucht und alle Zeilen in der Tabelle segmentation_manager ausschließt und die neue S3-Datei vereint. Dies ist nur eine upsert ohne zu überprüfen. Aber derzeit füge ich diese S3-Datei in eine temp_table ein, die, wenn sie nicht gekürzt wird, abhängig von der Anzahl der S3-Dateien, die im Bucket landen, sehr groß wird. Daher sollte diese leer sein, wenn eine S3-Datei im Bucket landet. Das Abschneiden nach einem Zeitplan wäre nicht sinnvoll, da dies zu Überschneidungen mit einer S3-Datei führen könnte, die im Bucket landet, und möglicherweise zu Datenverlust führen könnte. Irgendwelche Ideen, wie dies in Starrocks erreicht werden könnte? Ich freue mich darauf, eure Ideen zu hören!