Zwischenergebnisse für die Lazyframe-Verarbeitung großer Polaren speichern?
Posted: 27 Dec 2024, 11:15
Das Problem hängt möglicherweise mit https://github.com/pola-rs/polars/issues/9842 und der stapelweisen Verarbeitung von Python Polars LazyFrame zusammen
Mein Setup ist< /p>
Plan ist
In diesem Setup möchte ich eine Best-Effort-Verarbeitung haben – wenn der Mapper fehlschlägt, möchte ich bereits verarbeitete Ergebnisse beibehalten
Ich bin davon ausgegangen, dass Streaming erfolgt Wird in Stapeln ausgeführt, sodass bereits verarbeitete Stapel in der Ausgabe beibehalten werden und im Falle eines Fehlers nur der aktuelle Stapel verloren geht
Aber das scheint nicht der Fall zu sein – wenn die Verarbeitung fehlschlägt, ist die Zwischenausgabe leer
I habe versucht anzupassen sink_csv(batch_size) und pl.Config.set_streaming_chunk_size – es hat keine Auswirkung
Mein Setup ist< /p>
Code: Select all
input = pathlib.Path("input.csv") # 300k lines
output = pathlib.Path("output.csv")
def mapper(row_id):
# expensive computation and can fail sometimes
pass
any_value_column_is_null = ... # polars expression
schema_as_dict = ... # polars schema
id_col_name = "id"
def process_unprocessed_rows_in_batch(df: pl.DataFrame) -> pl.DataFrame:
additional_data = (
df.filter(any_value_column_is_null)
.with_columns(
pl.col(id_col_name)
.map_elements(
mapper,
pl.Struct(schema_as_dict),
)
.alias(generated_data_col_name)
)
.with_columns(pl.col(generated_data_col_name).struct.unnest())
.drop(generated_data_col_name)
)
return df.update(additional_data, on=id_col_name, how="left")
df = pl.scan_csv(input, schema=schema_as_dict).map_batches(
process_unprocessed_rows_in_batch, streamable=True
)
df.sink_csv(output, maintain_order=False)
Code: Select all
STREAMING:
OPAQUE_PYTHON
Csv SCAN [snippet-dataset.csv]
PROJECT */4 COLUMNS
Ich bin davon ausgegangen, dass Streaming erfolgt Wird in Stapeln ausgeführt, sodass bereits verarbeitete Stapel in der Ausgabe beibehalten werden und im Falle eines Fehlers nur der aktuelle Stapel verloren geht
Aber das scheint nicht der Fall zu sein – wenn die Verarbeitung fehlschlägt, ist die Zwischenausgabe leer
I habe versucht anzupassen sink_csv(batch_size) und pl.Config.set_streaming_chunk_size – es hat keine Auswirkung