Mein Hauptproblem besteht darin, noch kürzere Verarbeitungszeiten zu erreichen. Nach dem Experimentieren habe ich herausgefunden, dass die Aufteilung der Dokumente in 6 Blöcke und die Verwendung von 6 Prozessen die beste Leistung bringt. Allerdings bin ich mir nicht sicher, wie ich dies weiter optimieren kann.
Für den Kontext führe ich dieses Skript auf einem VPS mit 4 vCPUs und 8 GB RAM aus. Meine Datenbankabfrage ruft mindestens 580 Datensätze ab und im schlimmsten Fall kann der Vorgang 40 Sekunden dauern. Schließlich wurden die Dokumente mit bcrypt in 12 Runden gehasht.
Code: Select all
import bcrypt
import mysql.connector
from multiprocessing import Manager, Pool
from dotenv import load_dotenv
import time
import sys
import os
MAX_WORKERS = 6
MAX_CHUNKS = 6
load_dotenv()
DB_HOST = os.getenv("ENTRIES_HOST")
DB_DATABASE = os.getenv("ENTRIES_DATABASE")
DB_USERNAME = os.getenv("ENTRIES_USERNAME")
DB_PASSWORD = os.getenv("ENTRIES_PASSWORD")
DB_PORT = os.getenv("ENTRIES_PORT")
def get_documents():
connection = mysql.connector.connect(
host=DB_HOST,
database=DB_DATABASE,
user=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
cursor = connection.cursor()
query = "SELECT nro_document FROM family_and_friends WHERE deleted_at IS NULL ORDER BY id ASC;"
cursor.execute(query)
documents = [row[0] for row in cursor.fetchall()]
cursor.close()
connection.close()
return documents
def verify_password(stored_hash, password_to_verify):
return bcrypt.checkpw(password_to_verify.encode('utf-8'), stored_hash)
def process_chunk(chunk, stored_hash, found_flag):
for document in chunk:
if found_flag["found"]:
break
if verify_password(stored_hash, document):
found_flag["found"] = True
return document
return None
def chunk_documents(documents, num_chunks):
length = len(documents)
chunk_size = (length + num_chunks - 1) // num_chunks
return [documents[i:i + chunk_size] for i in range(0, length, chunk_size)]
def main(stored_hash):
documents = get_documents()
start_time = time.time()
with Manager() as manager:
found_flag = manager.dict({"found": False})
chunks = chunk_documents(documents, MAX_CHUNKS)
with Pool(processes=MAX_WORKERS) as pool:
results = [
pool.apply_async(process_chunk, args=(chunk, stored_hash, found_flag))
for chunk in chunks
]
for result in results:
output = result.get()
if output is not None:
pool.terminate()
end_time = time.time()
return output
end_time = time.time()
return ''
if __name__ == "__main__":
if len(sys.argv) < 2:
print("")
sys.exit(-1)
stored_hash = bytes(sys.argv[1], 'utf-8')
result = main(stored_hash)
print(result)
sys.exit(1)