Wenn ich den Referenzdatenrahmen auf 5 -Stücke aufteilte, und der große Datenrahmen auf 10 Stücke < /li>
< /ul>
< /li>
< /ul>
Aufteilung. Ich habe mir den Ausgabefordner angesehen, es gibt eine Parquetendatei, die tief in sich vergraben ist (
Code: Select all
/_temporary/0/_temporary/attempt_/part-00000-....snappy.parquet
Mein Code (ohne Aufteilung) ist wie folgt.def extract_to_df(spark, ref_db):
columns_to_drop = ["ColA", "ColB", "ColC"]
# Join conditions
join_cond_1 = (col("Col1") >= col("Col3a")) & (col("Col1") >= col("Col3b"))
join_cond_2 = (col("Col2") >= col("Col3a")) & (col("Col2") >= col("Col3b"))
df = spark.read.parquet(folder)
df_2 = df.filter(df["Col4"]=="abc").withColumn("Col1", udf_col(col("Col1a"))).withColumn("Col2", udf_col(col("Col2a")))
df_tmp = df_2.join(ref_db, on=join_cond_1, how="left").drop(*columns_to_drop).withColumnRenamed("Col5", "Col5a")
df_results = df_tmp.join(ref_db, on=join_cond_2, how="left").drop(*columns_to_drop).withColumnRenamed("Col6", "Col6a")
df_final_results = df_results.dropna(subset=["Col5a", "Col6a"])
df_final_results.write.mode("append").parquet(output_folder)
def main():
ref_db = spark.read.parquet("/ref_db.parquet")
extract_to_df(spark, ref_db)
if __name__ == "__main__":
main()
< /code>
Vielleicht ist dies vielleicht nicht die effizienteste Art, das zu tun, was ich will. Gibt es einen schnelleren Weg, dies zu tun als 2 Verbindungen?