Problem mit der hohen Speicherauslastung der Polars read_ndjson DynamoDB-DateiPython

Python-Programme
Anonymous
 Problem mit der hohen Speicherauslastung der Polars read_ndjson DynamoDB-Datei

Post by Anonymous »

Ich versuche, eine DynamoDB-JSON-Datei mit Polars zu reduzieren.
Meine JSON-Datei ist 1 GB groß, 100 Monate nach der Komprimierung in gz, aber im Speicher, wenn ich sie lese, belegt sie 16 GB.
Da die Daten aus DynamoDB stammen, können meine Spalten mehrere Typen haben. Vor einem Jahr war es zum Beispiel ein String, heute ist es ein Boolescher Wert.
Also habe ich 4 Experimente mit nur einem pl.read_ndjson und write_parquet gemacht:
  • Lesen Sie eine ~150 MB JSON-Datei mit 1 verschachtelten Ebene, alle Zeilen haben dasselbe Schema -> ~390 MB verwendeter Speicher
  • Lesen Sie dieselbe Datei, komprimiert mit gz -> ~750 MB belegter Speicher
  • Lesen Sie eine ~150 MB JSON-Datei mit mehreren verschachtelten Ebenen und unterschiedlichen Schemata -> 7,7 GB belegter Speicher
  • Lesen Sie dieselbe Datei, komprimiert mit gz -> ~7,7 GB belegter Speicher
Warum braucht es so viel Speicher, wenn das Schema in der verschachtelten Datei nicht konstant ist?
Hier mein Skript für einfache Daten:

Code: Select all

import json
import random
from uuid import uuid4
from datetime import datetime

DYNAMO_TYPES = ["S", "N", "BOOL"]

def random_dynamodb_value(i, depth=0):
if depth > 0:
return {"S": str(uuid4())}

value_type = DYNAMO_TYPES[i%3]

if value_type == "S":
return {"S": f"val-{uuid4().hex[:8]}"}
elif value_type == "N":
return {"N": str(random.randint(0, 1000))}
elif value_type == "BOOL":
return {"BOOL": random.choice([True, False])}

def generate_extra_easy_fields(n=50):
extra = {}
for i in range(n):
field_name = f"field_{i}"
extra[field_name] = random_dynamodb_value(i)
return extra

def generate_easy_entry():
base = {
"id": {"S": str(uuid4())},
"timestamp": {"S": datetime.now().isoformat()},
}

base.update(generate_extra_easy_fields(50))
return base

def generate_simple_jsonl_file(filename: str, n: int = 20):
with open(filename, "w") as f:
for i in range(n):
entry = json.dumps(generate_easy_entry())
f.write(entry + "\n")
Hier mein Skript für komplexe Daten:

Code: Select all

import json
import random
from uuid import uuid4
from datetime import datetime

DYNAMO_TYPES = ["S", "N", "BOOL", "M", "L"]

def random_dynamodb_value(i, depth=0):
if depth >  2:
return {"S": str(uuid4())}

value_type = DYNAMO_TYPES[i%5]

if value_type == "S":
return {"S": f"val-{uuid4().hex[:8]}"}
elif value_type == "N":
return {"N": str(random.randint(0, 1000))}
elif value_type == "BOOL":
return {"BOOL": random.choice([True, False])}
elif value_type == "M":
return {
"M": {
f"key_{j}": random_dynamodb_value(random.randint(0, 4), depth + 1)
for j in range(random.randint(1, 3))
}
}
elif value_type == "L":
field = random.randint(0, 4)
return {
"L": [random_dynamodb_value(field, depth + 1) for _ in range(random.randint(2, 4))]
}

def generate_multitype_field():
return random.choice([
{"S": "text"},
{"BOOL": random.choice([True, False])},
{"N": str(random.randint(0, 100))},
])

def generate_extra_fields(n=50):
extra = {}
for i in range(n):
field_name = f"field_{i}"
extra[field_name] = random_dynamodb_value(i)
return extra

def generate_entry():
base = {
"id": {"S": str(uuid4())},
"timestamp": {"S": datetime.now().isoformat()},
"meta": {"M": {"level1": {"M": {"level2": {"M": {"level3": {"L": [random_dynamodb_value(3) for _ in range(2)]}}}}}}},
"listOfStructs": {
"L": [
{
"M": {
"subId": {"N": str(i)},
"flag": {"BOOL": random.choice([True, False])}
}
} for i in range(random.randint(2, 4))
]
},
"structOfStructOfStruct": {"M": {"a": {"M": {"b": {"M": {"c": random_dynamodb_value(2)}}}}}},
"structOfStructOfList": {"M": {"outer": {"M": {"innerList": {"L": [random_dynamodb_value(2) for _ in range(3)]}}}}},
"multiTypeField": generate_multitype_field()
}

base.update(generate_extra_fields(50))
return base

def generate_complexe_jsonl_file(filename: str, n: int = 20):
with open(filename, "w") as f:
for i in range(n):
entry = json.dumps(generate_entry())
f.write(entry + "\n")
Der Code zum Ausführen von Experimenten (wählen Sie Ihr Szenario):

Code: Select all

import os

import polars as pl
import psutil

from generate_complexe_data import generate_complexe_jsonl_file
from generate_simple_data import generate_simple_jsonl_file

def print_memory():
process = psutil.Process(os.getpid())
rss = process.memory_info().rss
print(f"Mémoire utilisée : {rss / 1024 / 1024:.2f} Mo")

def main() ->  None:
"""Process DynamoDB export from manifest to partitioned parquet."""
# generate_complexe_jsonl_file("dynamodb_complexe.json", n=25000)
# generate_simple_jsonl_file("dynamodb_simple.json", n=100000)

# print("read simple")
# print_memory()
# df_simple = pl.read_ndjson("dynamodb_simple.json")
# print_memory()
# print("write simple")
# df_simple.write_parquet("output_simple.parquet")
# print_memory()
#
# print("read simple gz")
# print_memory()
# df_simple_gz = pl.read_ndjson("dynamodb_simple.json.gz")
# print_memory()
# print("write simple gz")
# df_simple_gz.write_parquet("output_simple_gz.parquet")
# print_memory()
#
# print("read complexe")
# print_memory()
# df_complexe = pl.read_ndjson("dynamodb_complexe.json")
# print_memory()
# print("write complexe")
# df_complexe.write_parquet("output_complexe.parquet")
# print_memory()

print("read complexe gz")
print_memory()
df_complexe_gz = pl.read_ndjson("dynamodb_complexe.json.gz")
print_memory()
print("write complexe gz")
df_complexe_gz.write_parquet("output_complexe_gz.parquet")
print_memory()

if __name__== "__main__":
main()

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post