Langsame Datenmigration von Snowflake zu MySQL in Python mit SQlAlchemy [Duplikat]MySql

MySQL DBMS-Forum
Guest
 Langsame Datenmigration von Snowflake zu MySQL in Python mit SQlAlchemy [Duplikat]

Post by Guest »

Ich habe also eine große Datenmenge in Snowflake, von der ich gerne eine Kopie auf einem vorhandenen MySQL-Server behalten würde. Ich habe dieses Skript erstellt. Ich möchte nur eine Kopie der Daten in MySQL behalten, nicht für die Verwendung in der Entwicklung oder Produktion, sondern nur eine Kopie behalten.

Code: Select all

from sqlalchemy import create_engine
from sqlalchemy import text
import pandas as pd
import time

snowflake_engine = create_engine(
'snowflake://{user}:{password}@{account}/{database_name}/{schema_name}?warehouse={warehouse_name}'.format(
user='XXXXXX',
password='XXXXXX',
account='XXXX-XXXXX',
warehouse_name='WAREHOUSE',
database_name='XXXXX',
schema_name='XXXXX'
)
)

mysql_engine = create_engine('mysql+mysqlconnector://XXXXX:XXXXXX@XXXXX.amazonaws.com:3306/XXXXXXX')

schema = 'XXXXXXX'
table_name = ''

# Fetch data in chunks and append to MySQL
chunk_size = 2500
try:
snowflake_connection = snowflake_engine.connect()
mysql_connection = mysql_engine.connect()

# Query to fetch table names
query = f"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='{schema}'"
print(f"Fetching table names from schema: {schema}...")
tables_df = pd.read_sql(text(query), snowflake_connection)
total_tables = len(tables_df)

# Iterate through each table
for index, row in tables_df.iterrows():
table_name = row['table_name']
print(f"Fetching data from table: {table_name}...")

#fetch entire table data in chunks
offset = 0
while True:
#fetch the chunk of data
table_query = f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {offset}"
df = pd.read_sql(text(table_query), snowflake_connection)

if not df.empty:
# Save the dataframe to MySQL database in chunks
df.to_sql(table_name, con=mysql_engine, if_exists='append', index=False)
print(f"Processed chunk for table {table_name}, offset {offset}")

# Move the offset to fetch the next chunk
offset += chunk_size
else:
break  # Exit the loop when no more rows are returned

print(f"Table {index+1} of {total_tables} has been processed")

finally:
snowflake_connection.close()
snowflake_engine.dispose()
mysql_connection.close()
mysql_engine.dispose()
Es funktioniert. Das Problem ist, dass die Datenübertragung sehr langsam ist. Die Verarbeitung einer einzelnen Charge dauert mindestens 5 Minuten. Vor dem Hinzufügen von Batch-Abfragen wurde diese Fehlermeldung angezeigt und das Skript wurde beendet.

Code: Select all

Killed
Jetzt erhalte ich Folgendes, nachdem das Skript den ganzen Tag lang ausgeführt wurde:

Code: Select all

sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 000629 (57014): Warehouse 'WAREHOUSE' was suspended immediate by resource monitor 'RESOURCEMONITOR', statement aborted.
[SQL: SELECT * FROM XXXXXXXXX LIMIT 2500 OFFSET 1047500]
(Background on this error at: https://sqlalche.me/e/20/f405)
Wie ändere ich dieses Skript, um Daten problemlos zu migrieren? Bitte schlagen Sie einige Änderungen vor, die ich vornehmen kann.
Es gibt insgesamt 115 Tabellen und mindestens 40 % davon enthalten über eine Million Zeilen.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post