PYSPARK -Programm hängen beim Hinzufügen von Broadcast -Stück festPython

Python-Programme
Anonymous
 PYSPARK -Programm hängen beim Hinzufügen von Broadcast -Stück fest

Post by Anonymous »

Ich versuche, ein PYSPARK-Programm zu schreiben, das Datensätze in einem sehr großen Datenrahmen (1-2B-Datensätze) filtert, der einige Bedingungen für einen anderen kleineren Referenzdatenrahmen entspricht. Dies geschieht mit einem linken Join zwischen den 2 Datenrahmen und schriftlich die Ergebnisse in eine Parkettdatei. Wenn der Referenzdatenrahmen leer ist, wird das Programm erfolgreich ausgeführt. Aber wenn der Referenzdatenrahmen 414K-Datensätze enthält, hängt das Spark-Programm am Nachrichtenspeicher.

Code: Select all

def extract_to_df(spark, ref_db):
columns_to_drop = ["ColA", "ColB", "ColC"]

# Join conditions
join_cond_1 = (col("Col1") >= col("Col3a")) & (col("Col1") >= col("Col3b"))
join_cond_2 = (col("Col2") >= col("Col3a")) & (col("Col2") >= col("Col3b"))

df = spark.read.parquet(folder)
df_2 = df.filter(df["Col4"]=="abc").withColumn("Col1", udf_col(col("Col1a"))).withColumn("Col2", udf_col(col("Col2a")))

df_tmp = df_2.join(ref_db, on=join_cond_1, how="left").drop(*columns_to_drop).withColumnRenamed("Col5", "Col5a")
df_results = df_tmp.join(ref_db, on=join_cond_2, how="left").drop(*columns_to_drop).withColumnRenamed("Col6", "Col6a")
df_final_results = df_results.dropna(subset=["Col5a", "Col6a"])

df_final_results.write.mode("overwrite").parquet(output_folder)

def main():
ref_db = spark.read.parquet("/ref_db.parquet")
ref_db.persist()

extract_to_df(spark, ref_db)

if __name__ == "__main__":
main()
Was ist mit dem Code los?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post