Problem beim Transformieren von Zeitattribut beim Übergang zwischen Stream und SQL APIPython

Python-Programme
Guest
 Problem beim Transformieren von Zeitattribut beim Übergang zwischen Stream und SQL API

Post by Guest »

Ich habe eine einfache Pyflink -Anwendung, in der ich einen Primärschlüssel für die temporale Tabellenverbindung benötige. Da meine Quelle Avro Confluent -Format verwendet und Probleme mit Primärschlüssel haben, verwende ich Transformation: SQL API -> Stream -Objekte -> SQL -API. Und hier erhalte ich ein Problem, das die Tabelle erklärt.sensor_readings_ddl = f"""
CREATE TABLE sensor_readings (
kafka_key_id VARCHAR not null,
...
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
)

class TsExtractor(TimestampAssigner):
def extract_timestamp(self, element, record_timestamp):
return element.ts # Extracts timestamp from the `ts` field of the event

sensors_reading_stream = (
tenv.to_data_stream(sensor_readings_tab)
.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(10)) # Define watermark strategy
.with_timestamp_assigner(TsExtractor()) # Use custom timestamp assigner
)
)

sensors_reading_schema = (Schema.new_builder().
column("kafka_key_id", DataTypes.STRING().not_null()).
...
column("ts", DataTypes.TIMESTAMP(3)).
primary_key("kafka_key_id").
watermark("ts", "ts - INTERVAL '5' SECOND").
build())
sensor_readings_view = tenv.from_data_stream(sensors_reading_stream, sensors_reading_schema)

sensor_readings_view_tab = tenv.create_temporary_view( "sensor_readings_view", sensors_reading_stream)

tumbling_w_sql = """
SELECT
sr.device_id,
das.metric_1,
das.metric_2,
TUMBLE_START(sr.ts, INTERVAL '30' SECONDS) AS window_start,
TUMBLE_END(sr.ts, INTERVAL '30' SECONDS) AS window_end,
SUM(sr.ampere_hour) AS charge_consumed
FROM sensor_readings_view FOR SYSTEM_TIME AS OF sr.ts AS sr
JOIN device_account_stats_view AS das ON sr.device_id = das.device_id
GROUP BY
TUMBLE(sr.ts, INTERVAL '30' SECONDS),
sr.device_id,
das.metric_1,
das.metric_2
"""
< /code>
Der Fehler lautet: < /p>

pyflink.util.Exceptions.tableException:> org.apache.flink.table. API.TableException: Fensteraggregat kann nur über eine Zeitattributspalte definiert werden, aber Zeitstempel (3)> P>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post