Spark-Redis-Schreibvorgänge verlieren Zeilen, wenn große DataFrames in Redis geschrieben werdenPython

Python-Programme
Anonymous
 Spark-Redis-Schreibvorgänge verlieren Zeilen, wenn große DataFrames in Redis geschrieben werden

Post by Anonymous »

Ich erlebe Datenverlust, wenn ich einen großen DataFrame mit dem Spark-Redis-Connector in Redis schreibe.
Details:
  • Ich habe einen DataFrame mit Millionen von Zeilen.
  • Das Schreiben in Redis funktioniert bei kleinen DataFrames korrekt, aber wenn der DataFrame groß ist, scheinen einige Zeilen danach zu fehlen write.
Beobachtungen:
  • Das Zurücklesen von Redis über den Spark-Redis-Connector gibt weniger Zeilen zurück als der ursprüngliche DataFrame.
  • Das direkte Lesen per Schlüssel oder die Verwendung von scan_iter liefert auch weniger Einträge.
  • Es gibt keine doppelten Zeilen im DataFrame.
  • Dieses Problem tritt nur bei großen Datensätzen auf; Kleine Datensätze werden korrekt geschrieben.
Frage:
  • Warum löscht Spark-Redis beim Schreiben großer DataFrames Zeilen?
  • Gibt es empfohlene Einstellungen, Konfigurationen oder Ansätze, um mit Spark-Redis zuverlässig große Datensätze in Redis zu schreiben?
Beispielcode

Code: Select all

# Prepare Redis key column
df_to_redis = df.withColumn("key", F.concat(F.lit("{"), F.col("uid"), F.lit("}"))).select("key", "lang")

# Write to Redis
df_to_redis.write.format("org.apache.spark.sql.redis") \
.option("table", "info") \
.option("key.column", "key")
.option("host", "REDIS_HOST") \
.option("port", 6379) \
.option("dbNum", 0) \
.mode("append") \
.save()

Code: Select all

# Reading back from Redis using Spark-Redis
df_redis = spark.read.format("org.apache.spark.sql.redis") \
.option("table", "info") \
.option("host", "REDIS_HOST") \
.option("port", 6379) \
.option("dbNum", 0) \
.load()

Code: Select all

# Reading all keys directly from Redis using redis-py keys()
r = redis.Redis(host="REDIS_HOST", port=6379, db=0)
all_keys = r.keys("info:*")
print(f"Number of keys read via keys(): {len(all_keys)}")

Code: Select all

# Reading all keys from Redis using scan_iter()
r = redis.Redis(host="REDIS_HOST", port=6379, db=0)
keys = list(r.scan_iter("info:*"))
print(f"Number of keys read via scan_iter: {len(keys)}")

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post