Apache Beam DoFn Init: Warum werden Init-Werte bundleübergreifend zurückgesetzt?Python

Python-Programme
Anonymous
 Apache Beam DoFn Init: Warum werden Init-Werte bundleübergreifend zurückgesetzt?

Post by Anonymous »

Ich versuche, dieses Beispiel für einen gemeinsamen Cache zu verstehen: https://beam.apache.org/documentation/p ... red-class/
Ich habe diesen Zähler zum hinzugefügt init:
> self.cache_load_counter = Metrics.counter(self.__class__, 'cache_loads')

Und entsprechend innerhalb der teueren_remote_call_to_load_customers erhöhen:
> self.cache_load_counter.inc()

Ich schreibe einen Test, um zu überprüfen, ob der Cache wie erwartet geladen wird, d. h. nach max_stale Sekunden sollte der Cache neu geladen werden. Dies wird ordnungsgemäß im Protokoll widergespiegelt, aber es scheint, dass der Zähler zwischen den Bundles nicht bestehen bleibt. Protokoll zur Referenz:
> pardo
> bundle started
> Loading rule cache.
> Cache loaded with tag: 1734739200.0
> bundle started
> Loading rule cache.
> Cache loaded with tag: 1734742800.0
> Cache load counter: 1

Der Stream wird über einen TestStream gesendet, wobei ein Satz von Elementen bei 0 gesendet wird, das Wasserzeichen vorgeschoben wird und ein weiterer Satz gesendet wird. Der Lauf der Zeit wird durch einen Schein-Patch nachgeahmt, um eine Änderung des Tags zu erzwingen.
Es scheint, dass der Zähler zwischen den Bundles zurückgesetzt wird. Ich gehe davon aus, dass es einen Cache-Ladezähler von 2 geben sollte, da es in der Funktion „Regelcache wird geladen“ eine Zählererhöhung gibt. Warum gibt dieser Zähler 1 zurück?
Ein weiteres damit zusammenhängendes Problem ist, dass es beim Ausführen eines Batches möglich ist, mithilfe dieser Parameter zu überprüfen, ob jeder Worker den Cache lädt (also num_workers == Cache_loads).
> options.\_all_options\['direct_num_workers'\] = num_workers
> options.\_all_options\['direct_running_mode'\] = 'multi_processing'

Dies funktioniert nicht für einen Stream. Gibt es eine bevorzugte Möglichkeit, mehrere Worker für einen Stream nachzuahmen, der sich vom Batch unterscheidet? Ich habe Folgendes ausprobiert und es scheint, dass immer noch nur ein Worker verwendet wird.
> options.view_as(beam.options.pipeline_options.WorkerOptions).num_workers = num_workers

Vielen Dank und bitte lassen Sie mich wissen, ob ich irgendwo Klarheit schaffen kann.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post