Code: Select all
oracle snowflake
or_tbl_v1 snw_tbl_v1
or_tbl_v2 snw_tbl_v2
or_tbl_v3 snw_tbl_v3
or_tbl_v4 snw_tbl_v4
or_tbl_v5 snw_tbl_v5
< /code>
Textdatei, die zuletzt maximal maximal der Datensatz für jede Tabelle angezeigt wird. < /p>
table date
snw_tbl_v1 2025-08-17 12:33:44 55:66:00
snw_tbl_v2 2025-08-17 12:33:44 56:66:00
snw_tbl_v3 2025-08-17 12:33:44 57:66:00
snw_tbl_v4 2025-08-17 12:33:44 58:66:00
snw_tbl_v5 2025-08-17 12:33:44 59:66:00
< /code>
Wenn der Auftrag die obige Textdatei mit maximaler Tabelle basierend auf der Spalte Insert_Date aktualisiert. und in die Schneeflockentabelle aus diesem Datum einfügen. Beenden und laden Sie die Daten in die Tabelle, in der das aktuelle Datum in Datei geschrieben wird. < /p>
import cx_Oracle
import os
import pandas as pd
import shutil
import time
from datetime import date
import logging
import argparse
import boto3
import json
class awscliError(Exception):
pass
def configure_logger(pwd):
log_file = "{}/cdc_output/cdc_cas_log.log".format(pwd)
logger = logging.getLogger('CDCCSKLOGS')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("[%(asctime)s - %(levelname)7s - %(filename)25s:%(lineno)3s - %(funcName)20s() ] %(message)s")
log_handler = logging.FileHandler(filename=log_file)
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(console_handler)
print("Check the log file {} for detailed logs".format(log_file))
return logger
def make_dir(dir_name):
try:
if not os.path.exists(dir_name):
os.makedirs(dir_name)
print('Made an directory named: {}'.format(dir_name))
except OSError as e:
print("Error: %s : %s" % (dir_name, e.strerror))
raise
def del_dir(dir_name):
try:
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
print('{} deleted'.format(dir_name))
else:
print('directory: {} does not exist'.format(dir_name))
except OSError as e:
print("Error: %s : %s" % (dir_name, e.strerror))
raise
def get_connection(logger,username,password,host):
try:
connection = cx_Oracle.connect(username,password,host,encoding = 'UTF-8')
logger.info('Connection Established to Oracle DB ...')
return connection
except Exception as e:
logger.info(e)
raise(e)
def upload_to_s3(curr_folder_path,aws_s3_path,logger):
aws_s3_cmd='aws s3 cp {0}/ s3://{1} --recursive'.format(curr_folder_path,aws_s3_path )
logger.info(aws_s3_cmd)
aws_s3_cmd_output=os.system(aws_s3_cmd)
if aws_s3_cmd_output!=0:
raise awscliError('return code:{}, S3 upload failed for this cmd: {}'.format(aws_s3_cmd_output,aws_s3_cmd))
def fetch_dispaly_fact_table_mapping(conn,logger,display_table_fact_table_mapping ):
df=fetch_dispaly_fact_table_mapping=pd.read_sql("""
SELECT A.DISPLAY_NAME,B.FACT_TABLE_NAME FROM WEBM9.EVENT_DEF A, WEBM9_NEW.EVENT_DB_DEF B
WHERE A.EVENT_DEF_ID=B.EVENT_OUTPUT_DEF_ID AND A.DISPLAY_NAME LIKE '%KCGF%' """,conn)
df=df.set_index('DISPLAY_NAME')
#print(df,'\n')
df.to_csv(display_table_fact_table_mapping)
def save_result_csv(csv_file_to_save,result_df):
#print(result_df)
result_df.to_csv(csv_file_to_save,sep='~', na_rep='\\N', index=False,header=False) # can use index=True if want index
def get_incremental_date(table_name,incremental_date_csv_path):
df=pd.read_csv(incremental_date_csv_path)
incremental_df=df.set_index('DISPLAY_NAME')['INCREMENTAL_DATE'].to_dict()
#print('incremental_df',incremental_df,'n')
incremental_date=incremental_df[table_name]
return incremental_date
def fetch_incremental_results(pwd,search_table_list,dispaly_fact_table_mapping,conn,logger,incremental_date_csv_path,select_table_template_folder):
record_count_list=[]
next_incremental_date_list=[]
for display_table,fact_table in dispaly_fact_table_mapping.items():
if display_table not in search_table_list:
continue
incremental_date=get_incremental_date(display_table,incremental_date_csv_path) # get incremental date
with open ('{select_table_template}/{display_table_name}.txt'.format(display_table_name=display_table,select_table_template=select_table_template_folder),'r') as f:
select_table_template=f.read() # read the template
#print('\nselect_table_template \n',select_table_template,'\n')
select_table_query=select_table_template.format(TABLE_NAME=fact_table,INCREMENTAL_DATE=incremental_date)
#print('\nselect_table_query \n',select_table_query,'\n')
logger.info('\n starting executing query for {}\n'.format(display_table))
result_df=pd.read_sql(select_table_query,conn)
no_of_records=result_df.shape[0]
logger.info('\n fetched select query cdc result for {}\n'.format(display_table))
logger.info('\nNo of records fetched for {} : {}\n'.format(display_table,no_of_records))
logger.info('\n saving to csv for {}\n'.format(display_table))
curr_table_path='{}/{}/publish_{}'.format(pwd,'cdc_output',display_table.lower())
make_dir(curr_table_path)
save_result_csv('{}/{}.csv'.format(curr_table_path,display_table),result_df)
next_incremental_date_query="Select max(CTRL_INSERT_DT) as max_incremental_date from WEBM9.{} where INSERT_DT > TIMESTAMP '{}'".format(fact_table,incremental_date)
next_incremental_date=pd.read_sql(next_incremental_date_query,conn)
#print(next_incremental_date)
next_incremental_date=next_incremental_date['MAX_INCREMENTAL_DATE'][0]
next_incremental_date=str(next_incremental_date)
#print('next_incremental_date\n',next_incremental_date)
if (next_incremental_date !='None'):
next_incremental_date_list.append([display_table,next_incremental_date])
record_count_list.append([display_table,no_of_records,next_incremental_date])
else :
next_incremental_date_list.append([display_table,incremental_date])
record_count_list.append([display_table,no_of_records,incremental_date])
#print('\nnext_incremental_date_list\n',next_incremental_date_list)
record_count_df=pd.DataFrame(record_count_list,columns=['DISPLAY_NAME','RECORDS_FETCHED','NEXT_INCREMENTAL_DATE'])
logger.info('saving output record counts ...')
record_count_df.to_csv('{}/cdc_output/output.csv'.format(pwd),index=False) #saving output
logger.info('record count saved ')
return next_incremental_date_list
def save_incremental_dates(incremental_date_csv_path,next_incremental_date_list,logger):
next_incremental_date_df=pd.DataFrame(next_incremental_date_list,columns=['DISPLAY_NAME','INCREMENTAL_DATE'])
logger.info('saving incremental dates...')
next_incremental_date_df.to_csv(incremental_date_csv_path,index=False) #saving incremental dates
logger.info('incremental dates saved ')
def get_secret(logger,secret_name,region_name,secret_key):
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
# We rethrow the exception by default.
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
secret_value = json.loads(get_secret_value_response['SecretString'])['secret-value']
return secret_value
except KeyError as e:
logger.error('KeyError:{}'.format(e))
raise e
except Exception as e:
logger.error(e)
raise e
else:
# Decrypts secret using the associated KMS CMK.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return secret
def main(env):
use_secret_manager_to_retreive_password=True
incremental_date_csv_path = '/data/incremental_dates_CAM.csv'
display_table_fact_table_mapping = '/data/Display_table_fact_table_ma
select_table_template_folder = '/data/select_table_template'
pwd='/data/oracle_hourly'
#make cdc_output directory
if os.path.exists( "{}/cdc_output".format(pwd) ):
del_dir("{}/cdc_output".format(pwd))
make_dir("{}/cdc_output".format(pwd))
logger=configure_logger(pwd)
if env =='DEV':
username="username"
password=''
host="host"
s3_bucketname='bucketname'
secret_name='secret
region_name='us-east-1'
secret_key='secret_key'
if use_secret_manager_to_retreive_password==True:
password = get_secret(logger,secret_name,region_name,secret_key)
#list of 12 tables to fetch incremental records
search_table_list=[
"CSTKBatch1",
"CSTKBatch2
"CSTKBatch3",
"CSTKBatch4",
"CSTKBatch5",
"CSTKBatch6",
"CSTKBatch7",
"CSTKBatch8",
"CSTKBatch9",
"CSTKBatch10",
"CSTKBatch11",
"CSTKBatch12"
]
conn=get_connection(logger,username,password,host) #get connection to Oracle DB
#exit(1)
fetch_dispaly_fact_table_mapping(conn,logger,display_table_fact_table_mapping) #saves display table name and corresponding fact table name in a csv
df=pd.read_csv(display_table_fact_table_mapping)
dispaly_fact_table_mapping=df.set_index('DISPLAY_NAME')['FACT_TABLE_NAME'].to_dict() # coverts to diction with DISPLAY_NAME as dict and FACT_TABLE_NAME as its value
#print('dictionary \n',dispaly_fact_table_mapping,'\n')
#generate_general_table_info(dispaly_fact_table_mapping) # generates general table info
try:
next_incremental_date_list=fetch_incremental_results(pwd,search_table_list,dispaly_fact_table_mapping,conn,logger,incremental_date_csv_path,select_table_template_folder)
except Exception as e:
print(e)
raise(e)
logger.info('uploading to s3 ..')
upload_to_s3('{}/cdc_output'.format(pwd),'{}/SourceFiles/CSK_HOURLY'.format(s3_bucketname),logger)
save_incremental_dates(incremental_date_csv_path,next_incremental_date_list,logger)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--env', type=str, required=True, help='Environment to be used')
args = parser.parse_args()
env = args.env
main(env)
Danke