Zwischenergebnisse für die Lazyframe-Verarbeitung großer Polaren speichern?Python

Python-Programme
Guest
 Zwischenergebnisse für die Lazyframe-Verarbeitung großer Polaren speichern?

Post by Guest »

Das Problem hängt möglicherweise mit https://github.com/pola-rs/polars/issues/9842 und der stapelweisen Verarbeitung von Python Polars LazyFrame zusammen
Mein Setup ist< /p>

Code: Select all

input = pathlib.Path("input.csv") # 300k lines
output = pathlib.Path("output.csv")
def mapper(row_id):
# expensive computation and can fail sometimes
pass
any_value_column_is_null = ... # polars expression
schema_as_dict = ... # polars schema
id_col_name = "id"
def process_unprocessed_rows_in_batch(df: pl.DataFrame) -> pl.DataFrame:
additional_data = (
df.filter(any_value_column_is_null)
.with_columns(
pl.col(id_col_name)
.map_elements(
mapper,
pl.Struct(schema_as_dict),
)
.alias(generated_data_col_name)
)
.with_columns(pl.col(generated_data_col_name).struct.unnest())
.drop(generated_data_col_name)
)
return df.update(additional_data, on=id_col_name, how="left")

df = pl.scan_csv(input, schema=schema_as_dict).map_batches(
process_unprocessed_rows_in_batch, streamable=True
)
df.sink_csv(output, maintain_order=False)
Plan ist

Code: Select all

STREAMING:
OPAQUE_PYTHON
Csv SCAN [snippet-dataset.csv]
PROJECT */4 COLUMNS
In diesem Setup möchte ich eine Best-Effort-Verarbeitung haben – wenn der Mapper fehlschlägt, möchte ich bereits verarbeitete Ergebnisse beibehalten
Ich bin davon ausgegangen, dass Streaming erfolgt Wird in Stapeln ausgeführt, sodass bereits verarbeitete Stapel in der Ausgabe beibehalten werden und im Falle eines Fehlers nur der aktuelle Stapel verloren geht
Aber das scheint nicht der Fall zu sein – wenn die Verarbeitung fehlschlägt, ist die Zwischenausgabe leer
I habe versucht anzupassen sink_csv(batch_size) und pl.Config.set_streaming_chunk_size – es hat keine Auswirkung

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post