Pyspark erstellt Paring-LogikPython

Python-Programme
Guest
 Pyspark erstellt Paring-Logik

Post by Guest »

Ich arbeite in Azure Synapse und gewöhne mich an die Arbeit mit Pyspark. Ich möchte in meinem DF eine Paring-Logik zwischen Zeilen erstellen, aber ich bekomme sie nicht zum Laufen. Ich habe eine ID-Spalte und eine Sequenznummer. Zum Beispiel:



ID
seqNum




100
3609


100
3610

< tr>
100
3616


100< /td>
3617


100
3622

< tr>
100
3623


100< /td>
3634


100
3642

< tr>
100
3643



Das sollte der Code ausgeben:



ID
seqNum
pairID




100
3609
1


100
36101


100
3616
2


100
3617
2

100
3622
3


100
3623
3


1003634
Null


100
3642
4


100
36434



Zeile mit 3634 sollte nicht gepaart werden, da der Unterschied zwischen den Sequenznummern eins betragen sollte.
Das habe ich Logik in Python, die zu funktionieren scheint, aber dann kann ich die Verarbeitungsfähigkeiten von Spark nicht nutzen. Kann mir jemand helfen, die Logik in Pyspark zu erstellen?

Code: Select all

# window specification
windowSpec = Window.orderBy("seqNum")

# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))

# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))

#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))

# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect()  # Collect rows for iterative processing
paired_indices = set()  # Track already paired rows
result = []

for i, row in enumerate(rows):
if i in paired_indices:
continue  # Skip already paired rows

current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]

# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows[i - 1]["seqNum"]))
result.append((rows[i - 1]["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1

# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1

else:
result.append((current, None, None))

# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post