Die Regel lautet wie folgt.
Finden Sie Überlappungen Verarbeiten Sie Daten zwischen aufeinanderfolgenden Zeilen nacheinander.
Ich habe die Zeilen für die Reihenfolge bereits nummeriert.
Schritte:
- Wenn Prozessstartdatum des vorherigen Zeile ist = Prozessstartdatum der nächsten Zeile
Ich habe die Codierung bis zu diesem Schritt erfolgreich durchgeführt. Der nächste Schritt der Regel ist jedoch ziemlich komplex.
Wenn sich die beiden Zeilen überlappen, nehmen Sie die kleinste (Startdatum) und größte (Enddatum) der beiden überlappenden Zeilen und vergleichen Sie sie damit Wenn in der dritten Zeile die Schritte 1 und 2 für diesen Vergleich wahr sind, wird Zeile 3 zusammen mit den Zeilen 1 und 2 Teil der überlappenden Gruppe. Wir nehmen die kleinsten und größten Daten der drei Zeilen und vergleichen sie mit der vierten Zeile.
Dieser Vorgang wird für die Partition der Kunden-ID und der Standort-ID fortgesetzt, wie im folgenden Code gezeigt.
Wenn sich die Zeilen I und 2 nicht überschneiden, wird der Vergleich zu den Zeilen verschoben 2 und 3 und führen Sie die gleichen Schritte aus.
Es kann mehrere separate überlappende Gruppen innerhalb einer Partition geben, daher ist es möglich, dass die Zeilen 1 und 2 eine Gruppe und die Zeilen 3 und 4 eine andere Gruppe sind und die Zeilen 5,6 sind nicht Teil von der Gruppe.
Für die Zeilen 5 und 6 können sie entweder Teil der 2. Gruppe sein oder keiner Gruppe angehören, da es sich um einen sequentiellen Vergleich handelt.
Das gewünschte Ergebnis sollte etwa so aussehen
Code: Select all
+--------+----------+----------+------------------+----------------+-------------+
|recordno|customerid|locationid|process_start_date|process_end_date|overlap_group|
+--------+----------+----------+------------------+----------------+-------------+
| 1| 2277953| A| 2015-03-13| 2016-04-15| 1|
| 2| 2277953| A| 2016-04-04| 2019-12-31| 1|
| 3| 2277953| A| 2019-06-06| 2019-06-20| 1|
| 4| 2277953| A| 2019-06-30| 2019-12-31| 1|
| 5| 2277953| A| 2020-01-01| 2020-12-31| 2|
| 6| 2277953| A| 2020-06-30| 2020-12-31| 2|
+--------+----------+----------+------------------+----------------+-------------+
Nach meiner Logik sollte Zeile 4 Teil von Gruppe 1 sein, aber es erscheint so ungruppiert wie 2.
Mein Code ist wie folgt
Code: Select all
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, when, sum as spark_sum
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("OptimizedOverlapGrouping").getOrCreate()
# Sample data
data = [
(1, 2277953, 'A', '2015-03-13', '2016-04-15'),
(2, 2277953, 'A', '2016-04-04', '2019-12-31'),
(3, 2277953, 'A', '2019-06-06', '2019-06-20'),
(4, 2277953, 'A', '2019-06-30', '2019-12-31'),
(5, 2277953, 'A', '2020-01-01', '2020-12-31'),
(6, 2277953, 'A', '2020-06-30', '2020-12-31')
]
# Create DataFrame
df = spark.createDataFrame(data, ['recordno', 'customerid', 'locationid', 'process_start_date', 'process_end_date'])
df = df.withColumn("process_start_date", col("process_start_date").cast("date"))
df = df.withColumn("process_end_date", col("process_end_date").cast("date"))
# Step 1: Define window for partitioning by group and ordering by start date
window_spec = Window.partitionBy("customerid", "locationid").orderBy("process_start_date")
# Step 2: Compare each row with the previous row to detect non-overlapping groups
df_with_lag = df.withColumn(
"prev_EndDate", lag("process_end_date").over(window_spec)
)
# Step 3: Identify the start of a new overlap group
df_with_group_flag = df_with_lag.withColumn(
"is_new_group",
when(
(col("prev_EndDate").isNull()) | (col("process_start_date") > col("prev_EndDate")),
1
).otherwise(0)
)
# Step 4: Generate sequential group numbers for overlapping records
df_with_overlap_group = df_with_group_flag.withColumn(
"overlap_group",
spark_sum("is_new_group").over(window_spec)
).drop("prev_EndDate", "is_new_group")
df_with_overlap_group.show()