Schnellere Möglichkeit zum Filtern des Abgleichs von Datensätzen zwischen 2 PYSPARK -DatenframesPython

Python-Programme
Anonymous
 Schnellere Möglichkeit zum Filtern des Abgleichs von Datensätzen zwischen 2 PYSPARK -Datenframes

Post by Anonymous »

Ich versuche, ein PYSPARK -Programm zu schreiben, das Datensätze in einem sehr großen Datenrahmen (700 m bis 1B -Datensätze) filtert, das einigen Bedingungen für einen anderen kleineren Referenzdatenfream (450K -Datensätze) übereinstimmt. Dies geschieht mit einem linken Join zwischen den 2 Datenrahmen und schriftlich die Ergebnisse in eine Parkettdatei. Ich habe jedoch Probleme, das PYSPARK -Programm erfolgreich auszuführen. Der Referenzdatenrahmen auf 5 oder 10 Stücke gegen den gesamten großen Datenrahmen < /li>
Wenn ich den Referenzdatenrahmen auf 5 -Stücke aufteilte, und der große Datenrahmen auf 10 Stücke < /li>
< /ul>
< /li>
< /ul>
Aufteilung. Ich habe mir den Ausgabefordner angesehen, es gibt eine Parquetendatei, die tief in sich vergraben ist (

Code: Select all

/_temporary/0/_temporary/attempt_/part-00000-....snappy.parquet
). Diese Datei ist jedoch 0 Byte. < /P>
Mein Code (ohne Aufteilung) ist wie folgt.def extract_to_df(spark, ref_db):
columns_to_drop = ["ColA", "ColB", "ColC"]

# Join conditions
join_cond_1 = (col("Col1") >= col("Col3a")) & (col("Col1") >= col("Col3b"))
join_cond_2 = (col("Col2") >= col("Col3a")) & (col("Col2") >= col("Col3b"))

df = spark.read.parquet(folder)
df_2 = df.filter(df["Col4"]=="abc").withColumn("Col1", udf_col(col("Col1a"))).withColumn("Col2", udf_col(col("Col2a")))

df_tmp = df_2.join(ref_db, on=join_cond_1, how="left").drop(*columns_to_drop).withColumnRenamed("Col5", "Col5a")
df_results = df_tmp.join(ref_db, on=join_cond_2, how="left").drop(*columns_to_drop).withColumnRenamed("Col6", "Col6a")
df_final_results = df_results.dropna(subset=["Col5a", "Col6a"])

df_final_results.write.mode("append").parquet(output_folder)

def main():
ref_db = spark.read.parquet("/ref_db.parquet")

extract_to_df(spark, ref_db)

if __name__ == "__main__":
main()
< /code>
Vielleicht ist dies vielleicht nicht die effizienteste Art, das zu tun, was ich will. Gibt es einen schnelleren Weg, dies zu tun als 2 Verbindungen?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post