Apache Beam 2.68.0 gibt die Warnung „Fallback-deterministischer Coder wird für Typ verwendet“ ausPython

Python-Programme
Anonymous
 Apache Beam 2.68.0 gibt die Warnung „Fallback-deterministischer Coder wird für Typ verwendet“ aus

Post by Anonymous »

Im neuesten Apache Beam 2.68.0 haben sie das Verhalten von Codern für nicht-primitive Objekte geändert. (Siehe das Änderungsprotokoll hier).
Daher erhalte ich eine Warnung wie diese auf GCP Dataflow.

Code: Select all

"Using fallback deterministic coder for type ''
in 'Run Pipeline/Select latest per Key/CombinePerKey(LatestCombineFn)/GroupByKey'. "
Diese Warnung wird auch im Abschnitt „Breaking Changes“ explizit erwähnt.

(Python) Der deterministische Fallback-Codierer für komplexe Typen wie NamedTuple, Enum und Datenklassen verwendet jetzt Cloudpickle anstelle von Dill. Wenn Ihre Pipeline betroffen ist, wird möglicherweise eine Warnung wie „Verwendet einen deterministischen Fallback-Codierer für Typ X…“ angezeigt. Sie können zum vorherigen Verhalten zurückkehren, indem Sie die Pipeline-Option --update_compatibility_version=2.67.0 (35725) verwenden. Melden Sie alle Probleme im Zusammenhang mit dem Beizen an #34903

Ihr Vorschlag ist, die Option --update_compatibility_version=2.67.0 an den Dataflow-Job zu übergeben.
Aber durch das Hinzufügen dieser Option zum Dataflow-Job wird die Warnung nicht ausgeblendet!!!
Ich bin mir nicht sicher, warum es passiert, aber ich würde es gerne wissen Warum.
Am wichtigsten ist, ich möchte wissen, wie ich das richtig angehen kann.
Ein Teil des Codes, der dafür verantwortlich ist, sieht so aus.

Code: Select all

>> beam.WithKeys(lambda rec: rec.key).with_output_types((Tuple[MessageKey, Message]))
| "Window Input" >> beam.WindowInto(window.FixedWindows(60))
| "Select latest per Key" >> beam.combiners.Latest.PerKey() # > beam.Values()
Ich habe auch Typhinweise hinzugefügt (mithilfe von .with_output_types((Tuple[MessageKey, Message])) in der obigen Zeile, aber es wird immer noch die gleiche Warnung ausgegeben. Aber ich glaube nicht, dass es nötig ist, weil PerKey von beam bereits die Eingabe- und Ausgabetypen definiert.

Code: Select all

@with_input_types(tuple[K, V])
@with_output_types(tuple[K, V])
class PerKey(ptransform.PTransform):
...

Code: Select all

MessageKey
ist nur eine Ableitung von BaseModel pydantic`.

Code: Select all

class MessageKey(BaseModel):
...
Dann habe ich einen benutzerdefinierten Codierer erstellt:

Code: Select all

class MessageKeyCoder(Coder):
def encode(self, value: MessageKey) -> bytes:
return json.dumps(value.model_dump(), sort_keys=True).encode("utf-8")

def decode(self, encoded: bytes) -> MessageKey:
data = json.loads(encoded.decode("utf-8"))
return MessageKey(**data)

def is_deterministic(self) -> bool:
return True

def estimate_size(self, value: MessageKey) -> int:
return len(self.encode(value))

Ich habe dies registriert mit:

Code: Select all

beam.coders.registry.register_coder(MessageKey, MessageKeyCoder)
Ich habe versucht, dies an verschiedenen Stellen im Code einzufügen, aber nichts hat die Warnung behoben.
  • In derselben Datei wie MessageKeyCoder hinzugefügt, direkt darunter.
  • Direkt nach allen Importen in der Datei hinzugefügt, in der die Pipeline definiert ist.
  • Im Kontextmanager von mit Pipeline(...) als p hinzugefügt.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post