Speichereffizientes Sortieren/Entfernen von Duplikaten von PolardatenrahmenPython

Python-Programme
Anonymous
 Speichereffizientes Sortieren/Entfernen von Duplikaten von Polardatenrahmen

Post by Anonymous »

Ich versuche, sehr große CSV-Dateien mithilfe von Polars in Parquet-Dateien zu importieren. Ich streame Daten, verwende Lazy Dataframes und Senken. Kein Problem, bis...
...den Datenrahmen nach einer Spalte sortiert und Duplikate entfernt. Eine Anforderung, die nicht übersprungen werden kann, besteht darin, dass die in Parkett geschriebenen Daten in der Spalte „Datum/Uhrzeit“ eindeutig und nach derselben Spalte sortiert sein müssen. Der Flaschenhals ist das Sortieren und Entfernen von Duplikaten. Nach meinem Verständnis müssen die Daten vollständig im Speicher vorhanden sein, um Duplikate zu entfernen und zu sortieren. Es gibt keine Garantie dafür, dass die Quelldaten sortiert sind oder keine Duplikate enthalten.
Das Schreiben des Datenrahmens unsortiert und ohne Prüfung auf Duplikate in Parquet ist kein Problem und führt zu Parquet-Dateien mit einer Größe von etwa 3–4 GB. Aber das Einlesen und Sortieren und Anwenden von unique() lässt den Speicherverbrauch auf über 128 GB explodieren, was der Speichergrenze meines Hosts entspricht (ich führe den Code unter Ubuntu in WSL2 aus). Ich habe WSL2 bereits die maximale Speichermenge zugewiesen und bestätigt, dass es Zugriff auf die gesamte Speichermenge hat. Irgendwann führt das Sortieren und Entfernen von Duplikaten zum Absturz der WSL-VM. Ich scheine nicht in der Lage zu sein, Duplikate effizient zu sortieren und zu entfernen.
Können Sie mir bitte helfen, einen besseren Ansatz als den, den ich derzeit verwende, vorzuschlagen:

Code: Select all

   def import_csv(self, symbol_id: str, data_source: DataSource, data_type: DataType, source_files: List[str], column_schema: List[ColumnSchema]) -> None:

#ensure 1 or 2 source files are provided
if len(source_files) != 1 and len(source_files) != 2:
raise ValueError(f"Can only process 1 or 2 source files for symbol {symbol_id}")

#obtain new df
new_df = self._csv_to_dataframe(source_files, column_schema)

#filter out duplicates and sort by datetime
new_df = new_df.unique(subset="datetime")
new_df = new_df.sort("datetime")

#merge with existing data if it exists
path_filename = self.base_directory / f"{symbol_id}_{data_source.value}_{data_type.value}.parquet"
if path_filename.exists():
old_df = pl.scan_parquet(path_filename, glob=False)
df = pl.concat([old_df, new_df], how="vertical")
else:
df = new_df

#write to parquet
df.sink_parquet(path_filename, engine="streaming")

#update metadata
# self._update_metadata(symbol_id, data_source, data_type, len(df), df["datetime"].first(), df["datetime"].last())

#logging
# self.logger.info(f"Imported {len(df)} rows for {symbol_id} from {df["datetime"].first()} to {df["datetime"].last()}")

def _csv_to_dataframe(self, source_files: list[str], column_schema: List[ColumnSchema]) -> pl.LazyFrame:

# Generate Polars expressions for column transformations
expressions = self._generate_polars_expressions(column_schema)

dfs = []
for source_file in source_files:
df = pl.scan_csv(source_file, has_header=True, glob=False).select(expressions)
dfs.append(df)

if len(dfs) == 1:
df = dfs[0]
else:
df = pl.concat(dfs, how="vertical")
df = df.group_by("datetime").mean()

return df

def _generate_polars_expressions(self, schema: list[ColumnSchema]) -> list[pl.Expr]:

expressions = []
for col_schema in schema:
# Create a base expression from the source column name
expr = pl.col(col_schema.source_column_name)

# Handle special cases based on the target data type
if col_schema.dtype == pl.Datetime:

# Ensure datetime format is provided
if col_schema.datetime_format is None:
raise ValueError(
f"Datetime format is required for column '{col_schema.source_column_name}'"
)

# For datetime, we first parse the string with the specified format
expr = expr.str.to_datetime(format=col_schema.datetime_format, time_unit=self.time_unit, time_zone=col_schema.from_timezone)

#always convert to default timezone
expr = expr.dt.convert_time_zone(self.data_timezone)
else:
# For other dtypes, a simple cast is sufficient
expr = expr.cast(col_schema.dtype)

# Alias the expression with the target column name
final_expr = expr.alias(col_schema.target_column_name)

# Add the final expression to the list
expressions.append(final_expr)

return expressions

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post