Py Large JSON -Datei ParserPython

Python-Programme
Anonymous
 Py Large JSON -Datei Parser

Post by Anonymous »

2 JSON -Dateien haben ~ 1 Million Datensätze, jede Zeile hat 1 Rekord, JSON -Datensatz ist verschachtelte große Daten. (4 -GB -Datei jede Dateigröße) < /p>
Datei lesen Sie mehr Zeit und vergleichen (Deepdiff _ mehr Zeit. Datensätze (Dateigröße 4 GB).
Mit Multiprocessing auch mit Pool (mit CPU_COUNT) versucht, ist jedoch schlimmer als dieser Code. Br /> Ich muss also jeden Datensatz vergleichen, jedes Feld.

Code: Select all

import json
import time
import logging
import argparse
import ijson

# Algorithm Steps:
# 1. Setup logging and time tracking.
# 2. Preformat both JSON files to ensure they are valid JSON arrays.
# 3. Read and parse JSON records in batches using ijson.
# 4. Compare each record field-by-field, including nested fields.
# 5. Store differences and log execution times for analysis.
# 6. Save differences to the output file.
# 7. Log performance metrics including total execution time and batch processing times.
# 8.  Display summary statistics.

# Setup logging
def setup_logger(log_file):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
file_handler = logging.FileHandler(log_file, mode='w')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
return logger

execution_times = {}

def time_function(func):
"""Decorator to measure execution time of a function and log total time at the end."""
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
elapsed_time = time.time() - start_time
execution_times[func.__name__] = elapsed_time
return result
return wrapper

@time_function
def preformat_json_file(input_file, output_file):
"""Converts a line-by-line JSON file into a valid JSON array format for ijson parsing."""
with open(input_file, 'r', encoding='utf-8') as infile, open(output_file, 'w', encoding='utf-8') as outfile:
outfile.write('[')
first_line = True
for line in infile:
if not first_line:
outfile.write(',')
outfile.write(line.strip())
first_line = False
outfile.write(']')

@time_function
def parse_json_file(file_path, batch_size):
"""Parses JSON records from a preformatted JSON file in batches using ijson."""
with open(file_path, 'r', encoding='utf-8') as file:
parser = ijson.items(file, "item")
records = []
for record in parser:
records.append(record)
if len(records) >= batch_size:
yield records
records = []
if records:
yield records

@time_function
def compare_json_records(record1, record2):
"""Custom function to compare two JSON records field by field and return differences."""
differences = {}

def compare_fields(path, val1, val2):
if isinstance(val1, dict) and isinstance(val2, dict):
for key in set(val1.keys()).union(val2.keys()):
compare_fields(f"{path}.{key}", val1.get(key), val2.get(key))
elif isinstance(val1, list) and isinstance(val2, list) and len(val1) == len(val2):
for i, (item1, item2) in enumerate(zip(val1, val2)):
compare_fields(f"{path}[{i}]", item1, item2)
else:
if val1 != val2:
differences[path] = {"old": val1, "new": val2}

compare_fields("root", record1, record2)
return differences

@time_function
def compare_large_json_files(file1, file2, batch_size, output_file, log_file):
"""Optimized function to compare two large JSON files efficiently."""
logger = setup_logger(log_file)

logger.info(f"Preformatting JSON files: {file1}, {file2}")
formatted_file1 = file1 + ".formatted.json"
formatted_file2 = file2 + ".formatted.json"
preformat_json_file(file1, formatted_file1)
preformat_json_file(file2, formatted_file2)

logger.info(f"Starting comparison between {formatted_file1} and {formatted_file2}")
start_time = time.time()

batch_times = []
differences = []

file1_batches = parse_json_file(formatted_file1, batch_size)
file2_batches = parse_json_file(formatted_file2, batch_size)

batch_index = 0

for batch1, batch2 in zip(file1_batches, file2_batches):
batch_start = time.time()

batch_differences = []
for idx, (record1, record2) in enumerate(zip(batch1, batch2)):
diff = compare_json_records(record1, record2)
if diff:
batch_differences.append({"record_index": batch_index * batch_size + idx, "differences":  diff})

if batch_differences:
differences.extend(batch_differences)

batch_time = time.time() - batch_start
batch_times.append(batch_time)
logger.info(f"Batch {batch_index + 1} processed in {batch_time:.2f} seconds")
batch_index += 1

total_time = time.time() - start_time
avg_time = sum(batch_times) / len(batch_times) if batch_times else 0
best_time = min(batch_times, default=0)
worst_time = max(batch_times, default=0)

logger.info(f"Comparison completed in {total_time:.2f} seconds")
logger.info(f"Total batches processed: {batch_index}")
logger.info(f"Average batch time: {avg_time:.2f} seconds")
logger.info(f"Best batch time: {best_time:.2f} seconds")
logger.info(f"Worst batch time: {worst_time:.2f} seconds")

with open(output_file, 'w', encoding='utf-8') as out:
json.dump(differences, out, indent=4)

logger.info(f"Differences saved in {output_file}")

@time_function
def main():
"""Main function to handle input arguments and execute comparison."""
parser = argparse.ArgumentParser(description="Compare two large JSON files in batches.")
parser.add_argument("file1", help="Path to the first JSON file")
parser.add_argument("file2", help="Path to the second JSON file")
parser.add_argument("--batch_size", type=int, default=1000, help="Number of records per batch")
parser.add_argument("--output_file", required=True, help="Path to save the output differences JSON")
parser.add_argument("--log_file", required=True, help="Path to save the log file")

args = parser.parse_args()

compare_large_json_files(args.file1, args.file2, args.batch_size, args.output_file, args.log_file)

logger = logging.getLogger()
logger.info("Execution times for each function:")
for func, elapsed_time in execution_times.items():
logger.info(f"{func} executed in {elapsed_time:.4f} seconds")

print("Execution times for each function:")
for func, elapsed_time in execution_times.items():
print(f"{func} executed in {elapsed_time:.4f} seconds")

if __name__ == "__main__":
main()

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post