Daher erhalte ich eine Warnung wie diese auf GCP Dataflow.
Code: Select all
"Using fallback deterministic coder for type ''
in 'Run Pipeline/Select latest per Key/CombinePerKey(LatestCombineFn)/GroupByKey'. "
(Python) Der deterministische Fallback-Codierer für komplexe Typen wie NamedTuple, Enum und Datenklassen verwendet jetzt Cloudpickle anstelle von Dill. Wenn Ihre Pipeline betroffen ist, wird möglicherweise eine Warnung wie „Verwendet einen deterministischen Fallback-Codierer für Typ X…“ angezeigt. Sie können zum vorherigen Verhalten zurückkehren, indem Sie die Pipeline-Option --update_compatibility_version=2.67.0 (35725) verwenden. Melden Sie alle Probleme im Zusammenhang mit dem Beizen an #34903
Ihr Vorschlag ist, die Option --update_compatibility_version=2.67.0 an den Dataflow-Job zu übergeben.
Aber durch das Hinzufügen dieser Option zum Dataflow-Job wird die Warnung nicht ausgeblendet!!!
Ich bin mir nicht sicher, warum es passiert, aber ich würde es gerne wissen Warum.
Am wichtigsten ist, ich möchte wissen, wie ich das richtig angehen kann.
Ein Teil des Codes, der dafür verantwortlich ist, sieht so aus.
Code: Select all
>> beam.WithKeys(lambda rec: rec.key).with_output_types((Tuple[MessageKey, Message]))
| "Window Input" >> beam.WindowInto(window.FixedWindows(60))
| "Select latest per Key" >> beam.combiners.Latest.PerKey() # > beam.Values()
Code: Select all
@with_input_types(tuple[K, V])
@with_output_types(tuple[K, V])
class PerKey(ptransform.PTransform):
...
Code: Select all
MessageKeyCode: Select all
class MessageKey(BaseModel):
...
Code: Select all
class MessageKeyCoder(Coder):
def encode(self, value: MessageKey) -> bytes:
return json.dumps(value.model_dump(), sort_keys=True).encode("utf-8")
def decode(self, encoded: bytes) -> MessageKey:
data = json.loads(encoded.decode("utf-8"))
return MessageKey(**data)
def is_deterministic(self) -> bool:
return True
def estimate_size(self, value: MessageKey) -> int:
return len(self.encode(value))
Code: Select all
beam.coders.registry.register_coder(MessageKey, MessageKeyCoder)
- In derselben Datei wie MessageKeyCoder hinzugefügt, direkt darunter.
- Direkt nach allen Importen in der Datei hinzugefügt, in der die Pipeline definiert ist.
- Im Kontextmanager von mit Pipeline(...) als p hinzugefügt.
 Mobile version
 Mobile version