Mein Setup ist< /p>
Code: Select all
input = pathlib.Path("input.csv") # 300k lines
output = pathlib.Path("output.csv")
def mapper(row_id):
# expensive computation and can fail sometimes
pass
any_value_column_is_null = ... # polars expression
schema_as_dict = ... # polars schema
id_col_name = "id"
def process_unprocessed_rows_in_batch(df: pl.DataFrame) -> pl.DataFrame:
additional_data = (
df.filter(any_value_column_is_null)
.with_columns(
pl.col(id_col_name)
.map_elements(
mapper,
pl.Struct(schema_as_dict),
)
.alias(generated_data_col_name)
)
.with_columns(pl.col(generated_data_col_name).struct.unnest())
.drop(generated_data_col_name)
)
return df.update(additional_data, on=id_col_name, how="left")
df = pl.scan_csv(input, schema=schema_as_dict).map_batches(
process_unprocessed_rows_in_batch, streamable=True
)
df.sink_csv(output, maintain_order=False)
Code: Select all
STREAMING:
OPAQUE_PYTHON
Csv SCAN [snippet-dataset.csv]
PROJECT */4 COLUMNS
Ich bin davon ausgegangen, dass Streaming erfolgt Wird in Stapeln ausgeführt, sodass bereits verarbeitete Stapel in der Ausgabe beibehalten werden und im Falle eines Fehlers nur der aktuelle Stapel verloren geht
Aber das scheint nicht der Fall zu sein – wenn die Verarbeitung fehlschlägt, ist die Zwischenausgabe leer
I habe versucht anzupassen sink_csv(batch_size) und pl.Config.set_streaming_chunk_size – es hat keine Auswirkung