Wie optimiert man ein Python-Multithreading-Skript, das Hashes verarbeitet?Python

Python-Programme
Guest
 Wie optimiert man ein Python-Multithreading-Skript, das Hashes verarbeitet?

Post by Guest »

Ich arbeite an einem Python-Skript, das prüft, ob eine gehashte Dokumentnummer mit einer Dokumentnummer in einer großen Liste von Datensätzen übereinstimmt, die in einer MySQL-Datenbank gespeichert sind. Um die Leistung zu verbessern, verwende ich die Multiprocessing-Bibliothek, um die Verarbeitung über mehrere Teile der Dokumentliste hinweg zu parallelisieren.
Mein Hauptproblem besteht darin, noch kürzere Verarbeitungszeiten zu erreichen. Nach dem Experimentieren habe ich herausgefunden, dass die Aufteilung der Dokumente in 6 Blöcke und die Verwendung von 6 Prozessen die beste Leistung bringt. Allerdings bin ich mir nicht sicher, wie ich dies weiter optimieren kann.
Für den Kontext führe ich dieses Skript auf einem VPS mit 4 vCPUs und 8 GB RAM aus. Meine Datenbankabfrage ruft mindestens 580 Datensätze ab und im schlimmsten Fall kann der Vorgang 40 Sekunden dauern. Schließlich wurden die Dokumente mit bcrypt in 12 Runden gehasht.

Code: Select all

import bcrypt
import mysql.connector
from multiprocessing import Manager, Pool
from dotenv import load_dotenv
import time
import sys
import os

MAX_WORKERS = 6
MAX_CHUNKS = 6

load_dotenv()

DB_HOST = os.getenv("ENTRIES_HOST")
DB_DATABASE = os.getenv("ENTRIES_DATABASE")
DB_USERNAME = os.getenv("ENTRIES_USERNAME")
DB_PASSWORD = os.getenv("ENTRIES_PASSWORD")
DB_PORT = os.getenv("ENTRIES_PORT")

def get_documents():
connection = mysql.connector.connect(
host=DB_HOST,
database=DB_DATABASE,
user=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
cursor = connection.cursor()
query = "SELECT nro_document FROM family_and_friends WHERE deleted_at IS NULL ORDER BY id ASC;"
cursor.execute(query)
documents = [row[0] for row in cursor.fetchall()]
cursor.close()
connection.close()
return documents

def verify_password(stored_hash, password_to_verify):
return bcrypt.checkpw(password_to_verify.encode('utf-8'), stored_hash)

def process_chunk(chunk, stored_hash, found_flag):
for document in chunk:
if found_flag["found"]:
break
if verify_password(stored_hash, document):
found_flag["found"] = True
return document
return None

def chunk_documents(documents, num_chunks):
length = len(documents)
chunk_size = (length + num_chunks - 1) // num_chunks
return [documents[i:i + chunk_size] for i in range(0, length, chunk_size)]

def main(stored_hash):
documents = get_documents()
start_time = time.time()

with Manager() as manager:
found_flag = manager.dict({"found": False})
chunks = chunk_documents(documents, MAX_CHUNKS)

with Pool(processes=MAX_WORKERS) as pool:
results = [
pool.apply_async(process_chunk, args=(chunk, stored_hash, found_flag))
for chunk in chunks
]
for result in results:
output = result.get()
if output is not None:
pool.terminate()
end_time = time.time()
return output

end_time = time.time()
return ''

if __name__ == "__main__":
if len(sys.argv) < 2:
print("")
sys.exit(-1)

stored_hash = bytes(sys.argv[1], 'utf-8')
result = main(stored_hash)
print(result)
sys.exit(1)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post