Der beste Weg, Polars Multithreading mit Scikit-Learn-Kompatibilität zu nutzenPython

Python-Programme
Anonymous
 Der beste Weg, Polars Multithreading mit Scikit-Learn-Kompatibilität zu nutzen

Post by Anonymous »

Ich habe an einem Projekt gearbeitet, um Tausende von Ergebnisvariablen anhand eines Standardsatzes von Prädiktoren und Kovariaten mithilfe von Polaren schnell zu testen. Es funktioniert sehr gut, mit bis zu 16-fachen Geschwindigkeitssteigerungen im Vergleich zu einem vergleichbaren Paket in R. Meine Frage ist, was die beste Vorgehensweise für die Kombination einer benutzerdefinierten Modellierungsfunktion mit Polars ist, um Multithreading in Polars und wissenschaftlichen Python-Paketen (Numpy, Scipy, Sklearn usw.) zu nutzen. Dies ist meine aktuelle Implementierung

Code: Select all

# This collects all the common operations that need to be done
# for all predictors/dependents
lf = lf.collect().lazy()
result_lazyframes = []
for predictor in config.predictor_columns:
for dependent in config.dependent_columns:
logger.trace(f"Analyzing predictor '{predictor}' with dependent '{dependent}'.")
# Placeholder for actual analysis logic
result_lazyframe = perform_analysis(lf, predictor, dependent, config)
if result_lazyframe is not None:
result_lazyframes.append(result_lazyframe)
# Store or log results as needed
if not result_lazyframes:
logger.error("No valid analyses were performed. Please check your configuration and data.")
return pl.DataFrame()
# Collect in batches with progress
batch_size = min(100, max(10, num_groups // 10))
all_results = []
for i in range(0, len(result_lazyframes), batch_size):
batch = result_lazyframes[i : i + batch_size]
results = pl.collect_all(batch)
all_results.extend(results)
completed = min(i + batch_size, len(result_lazyframes))

def perform_analysis(
lf: pl.LazyFrame, predictor: str, dependent: str, config: MASConfig
) -> pl.LazyFrame:
"""Perform the actual analysis for a given predictor and dependent variable"""
# Select only the relevant columns and drop missing values in the predictor and dependent
columns = [predictor, dependent, *config.covariate_columns]
analysis_lf = lf.select(columns)
model_func = partial(_run_association, predictor=predictor, dependent=dependent, config=config)
expected_schema = _get_schema(config)
result_lf = (
analysis_lf
.select(pl.struct(columns).alias("association_struct"))
.select(
pl.col("association_struct")
.map_batches(model_func, returns_scalar=True, return_dtype=expected_schema)
.alias("result")
)
)
return result_lf
Innerhalb von model_func gibt es einen Threadpool_limits-Schutz aus der Threadpoolctl-Bibliothek, um die Anzahl der Threads zu begrenzen, die die wissenschaftlichen Python-Pakete verwenden können. Ist das ein gutes System oder gibt es Verbesserungsmöglichkeiten?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post