So führen Sie Pyspark UDF separat über Datenrahmengruppen ausPython

Python-Programme
Anonymous
 So führen Sie Pyspark UDF separat über Datenrahmengruppen aus

Post by Anonymous »

Gruppieren eines Pyspark-Datenrahmens, Anwenden der Zeitreihenanalyse-UDF auf jede Gruppe

GELÖST Siehe unten

Ich habe einen Pyspark-Prozess, der einen Zeitreihen-Datenrahmen für einen Standort verwendet und Funktionen zur Anomalieerkennung berechnet/hinzufügt. Fensterfunktionen werden verwendet, um führende und nacheilende Lesevorgänge zu vergleichen, und es werden Aggregate wie Mittelwert, Median und Standardabweichung verwendet.
Dies führt jeweils eine Site aus, aber ich würde dies gerne über einen einzelnen großen Datenrahmen ausführen, der Daten für viele Sites enthält, wobei die Funktionen separat auf jede Gruppe des Datenrahmens angewendet werden und ein einzelner geänderter Datenrahmen zurückgegeben wird.

Code: Select all

# Illustrative functions
window = Window.partitionBy("Site").orderBy("DateTime").rowsBetween(-2, 0)
val_mean = df.agg(mean("Value")).collect()[0][0]

df = df.withColumn("NewCol", col("Value") - val_mean)
df = df.withColumn("rolling_mean", avg(col("Value")).over(window))
Ich habe versucht, For-Schleifen und Pandas für Skalierbarkeit und Parallelverarbeitung zu vermeiden, habe aber mit PySpark nicht den richtigen Weg gefunden, dies zu tun.
Welche Optionen gibt es hier?
(Laufen auf Azure Databricks, falls das wichtig ist)
Beispiel für gewünschtes Verhalten



Site
DateTime
Value
NewCol
rolling_mean




A
01.04.2026
1
-0,8
null


A
01.05.2026
2
-0.2
null


A
06/01/2026
3
1.2

01.08.2026
2
0,2
0,2


B
01.03.2026
4
-6.4
null


B
04/01/2026
5
-5.4
null


B
01.05.2026
34
23,6
3,93


B
01.06.2026
5
-5,4
4,26


B
01.07.2026
4
-6.4
3.93



GELÖST
Die Leistung von Pandas war viel schlechter als die der reinen Pyspark-Implementierung und führte zu zusätzlicher Komplexität des Ausführungsplans (was zu Fehlern auf einigen Clustern führte, z. B. Serverless).
Die Verwendung von „concurrent.futures“ ermöglichte die Anwendung bestehender Funktionen mit minimalen Anpassungen und die Ausführungsleistung war um ein Vielfaches schneller als die Pandas-Implementierung.

Code: Select all

# Divide task across workers
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 
   results = {executor.submit(anomaly_func, raw_df.filter(raw_df.Id == key)) for key in IdList}

# Union the results dataframe
df = None
for r in results:
   df = r.result()
   if df is None:
       df = df
   else:
       df = df.union(df)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post