Ich möchte Unit -Tests als Teil des Build -Prozesses ausführen, um Timer in einer Apache -Strahl -Pipeline zu testen. Um die Tests lokal durchzuführen. Ich verwende einen teststream , um eine unbegrenzte Datenquelle zu simulieren. strong> real_time < /strong>. Wasserzeichen , alles funktioniert wie erwartet.
Warum passiert das? Ist der Directrunner nicht geeignet, für das Testen real_time Timer mit einem teststream verwendet zu werden? Oder fehlt es mir hier noch etwas? Alle Hilfe wird sehr geschätzt! < /P>
Ich habe einen Code geschrieben, um dies speziell zu testen. Es handelt sich um eine einfache Pipeline, die zur Konsole druckt, wenn der Timer nach 1s der Inaktivität ausgelöst wird. Es sollte in der Lage sein, aus dem Box auszuführen. Dies funktioniert nicht, aber wenn er in Wasserzeichen geändert wird, wird der Timer wie erwartet ausgelöst. < /P>
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline as dppPipeline
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.testing.test_stream import TestStream as tStream
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, BagStateSpec, TimerSpec, on_timer
from apache_beam.utils.timestamp import Timestamp, Duration
from apache_beam.coders import StrUtf8Coder, FloatCoder
from typing import Tuple
import pytest
# WATERMARK or REAL_TIME
TIMEDOMAIN = beam.TimeDomain.REAL_TIME
@beam.typehints.with_input_types(Tuple[str, str])
class InactivityTimerDoFn(beam.DoFn):
"""
A DoFn that sets a real-time or waterrmark timer for 1.0s after receiving an element.
If no new element arrives for that key within 1.0 seconds, the timer fires.
"""
STATE_BUFFER = BagStateSpec("state_buffer", StrUtf8Coder())
INACTIVITY_TIMER = TimerSpec('inactivity_timer', TIMEDOMAIN)
EXPIRATION_STATE = ReadModifyWriteStateSpec('expiration_time', FloatCoder()) # Store expiration timestamp for debugging
def process(self,
element,
timer=beam.DoFn.TimerParam(INACTIVITY_TIMER), # Timer to fire after 1.0s of inactivity
state_buffer=beam.DoFn.StateParam(STATE_BUFFER), # Buffer to store elements in utill timer fires
expiration_state=beam.DoFn.StateParam(EXPIRATION_STATE), # Store expiration timestamp for debugging
element_timestamp=beam.DoFn.TimestampParam # Timestamp of the element entering the pipeline
):
key, value = element
state_buffer.add(value)
if TIMEDOMAIN == beam.TimeDomain.WATERMARK:
timestamp = element_timestamp + 1
else:
timestamp= Timestamp.now() + Duration(seconds=1)
expiration_state.write(timestamp)
timer.set(timestamp) # Fire 1.0s from current processing time or watermark
print(f'Timer set for: {timestamp}')
@on_timer(INACTIVITY_TIMER)
def on_inactivity_expired(self,
state_buffer=beam.DoFn.StateParam(STATE_BUFFER),
timer=beam.DoFn.TimerParam(INACTIVITY_TIMER),
expiration_state=beam.DoFn.StateParam(EXPIRATION_STATE)):
expiration_timestamp = expiration_state.read() # Read expiration timestamp for debugging
if TIMEDOMAIN == beam.TimeDomain.WATERMARK:
print(f"Timer fired! Expected watermark: {expiration_timestamp}")
else:
print(f"Timer fired! Expected expiration time: {expiration_timestamp}, actual time: {Timestamp.now()}")
buffer = list(state_buffer.read())
state_buffer.clear()
if buffer:
print(f'Elements in buffer when timer triggered: {buffer}')
timer.clear()
expiration_state.clear() # Clear stored timestamp
# ===========================
# Test Case Using TestStream
# ===========================
def test_inactivity_timer():
runner_options = {
'runner': 'DirectRunner',
'streaming': True,
}
pipeline_options = DirectOptions([], **runner_options)
# Create a TestStream to simulate real-time processing.
# Watermark and processing time are advanced synchronously.
test_stream = (
'TestStream' >> tStream()
.advance_watermark_to(0)
.advance_processing_time(advance_by=round(float(Timestamp.now()), 3)) # Advance processing time to now. Only milliseconds precision accepted
.add_elements([('key1', 'A')])
.advance_processing_time(Duration(seconds=0, micros=500_000))
.advance_watermark_to(0.5)
.add_elements([('key1', 'B')])
.advance_processing_time(Duration(seconds=0, micros=100_000))
.advance_watermark_to(0.6)
.add_elements([('key1', 'C')])
.advance_processing_time(Duration(seconds=1, micros=100_000)) # more than 1s passed. Fire timer and print ['A', 'B', 'C']
.advance_watermark_to(1.7)
.add_elements([('key1', 'D'), ('key1', 'E')])
.add_elements([('key1', 'F')])
.advance_processing_time(Duration(seconds=0, micros=300_000))
.advance_watermark_to(2.0)
.add_elements([('key1', 'G')])
.advance_processing_time(Duration(seconds=0, micros=900_000))
.advance_watermark_to(2.9)
.add_elements([('key1', 'H')])
.advance_processing_time(Duration(seconds=1, micros=300_000)) # more than 1s passed. Fire timer and print ['D', 'E', 'F', 'G', 'H']
.advance_watermark_to(4.2)
.add_elements([('key1', 'I')])
.add_elements([('key1', 'J')])
.advance_watermark_to_infinity() # End the stream and print ['I', 'J']
)
# Run the pipeline
with dppPipeline(options=pipeline_options) as p:
# Apply the DoFn on the test stream
outputs = (
p
| test_stream
| "Force Coder" >> beam.Map(lambda x: x).with_output_types(Tuple[str, str])
| 'InactivityTimerDoFn' >> beam.ParDo(InactivityTimerDoFn())
)
print('output', outputs)
if __name__ == '__main__':
pytest.main(['-s', __file__])
< /code>
Die erwartete Ausgabe, die sich auf Elemente im Puffer bezieht, die zur Konsole gedruckt werden, wenn die Timerbrände: < /p>
Elemente sein sollte In Puffer, wenn der Timer ausgelöst wurde: ['A', 'B', 'C'] < /p>
< /blockquote>
Elemente in Puffer, wenn Timer Timer Ausgelöst: ['d', 'e', 'f', 'g', 'h'] < /p>
< /blockquote>
Elemente In Puffer, wenn der Timer ausgelöst wurde: ['i', 'j']
Verwenden von Teststream zum Testen von Real_Time -Timern in Apache Strahl für Python mit Directrunner ⇐ Python
-
- Similar Topics
- Replies
- Views
- Last post