Erhalten Sie Spark.rpc.message.maxSize -Fehler mit einer großen DateiPython

Python-Programme
Anonymous
 Erhalten Sie Spark.rpc.message.maxSize -Fehler mit einer großen Datei

Post by Anonymous »

Ich nehme eine große ZIP -Datei in Azure -Datenbank (345 GB Zip -Datei mit einer einzigen 1,5 -TB -CSV mit ~ 3 Milliarden Zeilen) ein. Es ist beabsichtigt, den CSV in eine Delta -Tabelle für eine schnellere Einnahme in einer Datenpipeline umzuwandeln. Beide werden im Azure -Blob -Speicher gespeichert. pd.read_csv (ChunkSize = ChunkSize) < /code> < /li>
[*] Gehen Sie jeden Chunk im Iterator durch. storage
[*]clear memory



With CHUNKSIZE=5_000_000 I get this error at iteration 83 (415 million rows processed)

org.apache.spark.SparkException: Aufgrund des Bühnenversagens abgebrochen Erwägen Sie, Spark.rpc.Message.Maxsize zu erhöhen oder mit Broadcast -Variablen für große Werte zu verwenden. />
org.apache.spark.sparkKexception: Job wegen des Bühnenversagens: Serialisierte Aufgabe 43293: 250 war 282494392 Bytes, die Max erlaubt: Spark.Message.maxSize (2684354556). Erwägen Sie, Spark.rpc.message.maxSize zu erhöhen oder zu Sendungsvariablen für große Werte zu verwenden. Konfiguration: < /p>
  • 2 Arbeiter standard_ds3_v2 < /code> 14 GB Speicher, 4 Kerne < /li>
    Treiber standard_ds13_v2 < /code> 56 GB -Speicher, 8 Cores < /li> < /> < /ul>
    < />
    vorgeschlagen

    Code: Select all

    SparkSession.builder.config("spark.rpc.message.maxSize", "536870912")
    Für den Fall, dass sich das Gerät in Bytes irgendwie befindet
  • Beide geben mir immer noch den gleichen Fehler, obwohl ich beim Drucken von spark.conf.get ("spark.rpc.message.maxSize") die neue Einstellung zeigt
  • Einstellung, die die Instanzinstanz mit der Sparkinstanz mit der Sparkinstanz einstellen, die die Sparkinstanz verwendet, um die Spark -Instanz zu verwenden. spark.conf.set ("spark.rpc.message.maxsize", "512") , der mir einen Fehler gab, in dem er sagte, der Parameter kann nicht geändert werden, nachdem Spark instanziiert wurde
Ganzer Codeblock:

Code: Select all

def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0):

# File paths
zip_file = f"{snapshot_date}.zip"
delta_file = f"{snapshot_date}_delta"
delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{delta_file}/"

spark = (
SparkSession.builder.config("spark.sql.shuffle.partitions", "100")
.config("spark.hadoop.fs.azure.retries", "10")
.config("spark.rpc.message.maxSize", "536870912")  # 512 MiB in bytes
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)

spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", BLOB_CREDENTIAL)

print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize"))

if start_chunk == 0:
# Delete file if exists
print("**** Deleting existing delta table")
if fs.exists(f"{CONTAINER_NAME}/{delta_file}"):
fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True)

chunksize = 3_000_000

with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file:
with zipfile.ZipFile(file, "r") as zip_ref:
file_name = zip_ref.namelist()[0]

with zip_ref.open(file_name) as csv_file:
csv_io = TextIOWrapper(csv_file, "utf-8")
headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist()
chunk_iter = pd.read_csv(
csv_io,
sep="\t",
header=None,
names=headers,
usecols=["col1", "col2", "col3"],
dtype=str,  # Read all as strings to avoid errors
chunksize=chunksize,
skiprows=start_chunk*chunksize
)

for chunk in tqdm(chunk_iter, desc="Processing chunks"):
# Convert pd DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(chunk)

(spark_df.repartition(8).write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(delta_table_path)
)

# Clear memory after each iteration
spark_df.unpersist(blocking=True)
del chunk
del spark_df
gc.collect()

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post