ID
seqNum
100
3609
100
3610
< tr>
100
3616
100< /td>
3617
100
3622
< tr>
100
3623
100< /td>
3634
100
3642
< tr>
100
3643
Das sollte der Code ausgeben:
ID
seqNum
pairID
100
3609
1
100
36101
100
3616
2
100
3617
2
100
3622
3
100
3623
3
1003634
Null
100
3642
4
100
36434
Zeile mit 3634 sollte nicht gepaart werden, da der Unterschied zwischen den Sequenznummern eins betragen sollte.
Das habe ich Logik in Python, die zu funktionieren scheint, aber dann kann ich die Verarbeitungsfähigkeiten von Spark nicht nutzen. Kann mir jemand helfen, die Logik in Pyspark zu erstellen?
Code: Select all
# window specification
windowSpec = Window.orderBy("seqNum")
# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))
# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))
#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))
# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect() # Collect rows for iterative processing
paired_indices = set() # Track already paired rows
result = []
for i, row in enumerate(rows):
if i in paired_indices:
continue # Skip already paired rows
current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]
# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows[i - 1]["seqNum"]))
result.append((rows[i - 1]["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1
# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1
else:
result.append((current, None, None))
# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])