Page 1 of 1

Asof-Join mit mehreren Ungleichheitsbedingungen

Posted: 28 Dec 2024, 18:35
by Guest
Ich habe zwei Datenrahmen: a (~600 Mio. Zeilen) und b (~2 Mio. Zeilen). Was ist der beste Ansatz, um b mit a zu verbinden, wenn 1 Gleichheitsbedingung und 2 Ungleichheitsbedingungen für die jeweiligen Spalten verwendet werden?
  • a_1 = b_1
  • a_2 >= b_2
  • a_3 >= b_3
Ich habe bisher die folgenden Wege erkundet:
  • Polars:

    join_asof(): erlaubt nur 1 Ungleichheitsbedingung
  • join_where() mit filter(): Selbst bei einem kleinen Toleranzfenster gehen der Standard-Polars-Installation während des Joins die Zeilen aus (4,3B Zeilenlimit) und der Polars-u64-idx-Installation gehen die Zeilen aus Speicher (512GB)
[*]DuckDB: ASOF LEFT JOIN: erlaubt auch nur 1 Ungleichheitsbedingung
< li>Numba: Da das oben Gesagte nicht funktioniert hat, habe ich versucht, meine eigene Funktion „join_asof()“ zu erstellen – siehe Code unten. Es funktioniert gut, aber mit zunehmender Länge von a wird es unerschwinglich langsam. Ich habe verschiedene Konfigurationen von for/while-Schleifen und Filtern ausprobiert, alle mit ähnlichen Ergebnissen.

Jetzt gehen mir etwas die Ideen aus ... Was wäre ein effizienterer Weg, dies umzusetzen?
Vielen Dank

Code: Select all

import numba as nb
import numpy as np
import polars as pl
import time

@nb.njit(nb.int32[:](nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:]), parallel=True)
def join_multi_ineq(a_1, a_2, a_3, b_1, b_2, b_3, b_4):
output = np.zeros(len(a_1), dtype=np.int32)

for i in nb.prange(len(a_1)):

for j in range(len(b_1) - 1, -1, -1):

if a_1[i] == b_1[j]:

if a_2[i] >= b_2[j]:

if a_3[i] >= b_3[j]:
output[i] = b_4[j]
break

return output

length_a = 5_000_000
length_b = 2_000_000

start_time = time.time()
output = join_multi_ineq(a_1=np.random.randint(1, 1_000, length_a, dtype=np.int32),
a_2=np.random.randint(1, 1_000, length_a, dtype=np.int32),
a_3=np.random.randint(1, 1_000, length_a, dtype=np.int32),
b_1=np.random.randint(1, 1_000, length_b, dtype=np.int32),
b_2=np.random.randint(1, 1_000, length_b, dtype=np.int32),
b_3=np.random.randint(1, 1_000, length_b, dtype=np.int32),
b_4=np.random.randint(1, 1_000, length_b, dtype=np.int32))
print(f"Duration: {(time.time() - start_time):.2f} seconds")