GELÖST Siehe unten
Ich habe einen Pyspark-Prozess, der einen Zeitreihen-Datenrahmen für einen Standort verwendet und Funktionen zur Anomalieerkennung berechnet/hinzufügt. Fensterfunktionen werden verwendet, um führende und nacheilende Lesevorgänge zu vergleichen, und es werden Aggregate wie Mittelwert, Median und Standardabweichung verwendet.
Dies führt jeweils eine Site aus, aber ich würde dies gerne über einen einzelnen großen Datenrahmen ausführen, der Daten für viele Sites enthält, wobei die Funktionen separat auf jede Gruppe des Datenrahmens angewendet werden und ein einzelner geänderter Datenrahmen zurückgegeben wird.
Code: Select all
# Illustrative functions
window = Window.partitionBy("Site").orderBy("DateTime").rowsBetween(-2, 0)
val_mean = df.agg(mean("Value")).collect()[0][0]
df = df.withColumn("NewCol", col("Value") - val_mean)
df = df.withColumn("rolling_mean", avg(col("Value")).over(window))
Welche Optionen gibt es hier?
(Laufen auf Azure Databricks, falls das wichtig ist)
Beispiel für gewünschtes Verhalten
Site
DateTime
Value
NewCol
rolling_mean
A
01.04.2026
1
-0,8
null
A
01.05.2026
2
-0.2
null
A
06/01/2026
3
1.2
01.08.2026
2
0,2
0,2
B
01.03.2026
4
-6.4
null
B
04/01/2026
5
-5.4
null
B
01.05.2026
34
23,6
3,93
B
01.06.2026
5
-5,4
4,26
B
01.07.2026
4
-6.4
3.93
GELÖST
Die Leistung von Pandas war viel schlechter als die der reinen Pyspark-Implementierung und führte zu zusätzlicher Komplexität des Ausführungsplans (was zu Fehlern auf einigen Clustern führte, z. B. Serverless).
Die Verwendung von „concurrent.futures“ ermöglichte die Anwendung bestehender Funktionen mit minimalen Anpassungen und die Ausführungsleistung war um ein Vielfaches schneller als die Pandas-Implementierung.
Code: Select all
# Divide task across workers
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = {executor.submit(anomaly_func, raw_df.filter(raw_df.Id == key)) for key in IdList}
# Union the results dataframe
df = None
for r in results:
df = r.result()
if df is None:
df = df
else:
df = df.union(df)
Mobile version