Ich bin neu im AWS -Kleber und ich bin mit dem folgenden Code mit Leistungsproblemen konfrontiert. < /p>
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
# Define S3 path with wildcard to match all .ind files
s3_ind_path = f"s3a://{source_bucket}/{source_prefix}/*.ind"
# Read all .ind files as whole text
whole_df = spark.sparkContext.wholeTextFiles(s3_ind_path).toDF(["path", "content"])
# Step 1: Split the raw content into key-value string array
whole_df = whole_df.withColumn("kv_pairs", split(col("content"), "\\|"))
# Step 2: Convert array of key:value strings into a map
whole_df = whole_df.withColumn(
"fields_map",
map_from_entries(
expr("transform(kv_pairs, x -> struct(split(x, ':', 2)[0] as key, split(x, ':', 2)[1] as value))")
)
)
# Step 3: Extract multiple fields from the map
fields_to_extract = [
"SSN_TIN", "TIDTYPE", "FNAME", "MNAME", "LNAME", "ENTNAME", "BRK_ACCT",
"DIR_ACCT", "NONBRKAC", "SPONSOR", "REP_ID", "RR2", "REG_TY", "LOB",
"PROC_DTE", "DT_REC", "SCANDTS", "XTRACID", "NASU_ID", "DC_SRCRF",
"CHK_AMT", "CHK_NUML", "DEPOSDT", "RCVDDATE", "STK_CNUM", "STK_SHR", "DOC-TY-ID"
]
for field in fields_to_extract:
whole_df = whole_df.withColumn(field, col("fields_map").getItem(field))
# Optional: Drop intermediate columns if not needed
whole_df = whole_df.drop("kv_pairs", "fields_map","content")
client_df=whole_df.select("SSN_TIN", "TIDTYPE", "fname","lname","entname")
client_df.cache()
print(f"Total rows in client_df: {client_df.count()}")
print("converting clients to dynamic frame")
client_dynamic_frame = DynamicFrame.fromDF(client_df, glueContext, "dynamic_frame")
print("Inserting clients")
glueContext.write_dynamic_frame.from_options(
frame=client_dynamic_frame,
connection_type="JDBC",
connection_options={
"connectionName": connection_name,
"database": tgt_database,
"dbtable": "staging.bpm_migration_client1",
"useConnectionProperties": "true"
}
)
print("client insert complete")
< /code>
Ich lese 100000.000 Dateien aus einem S3 -Eimer und schreibe den Inhalt in einen Postgres -DB. Dies dauert mehr als eine Stunde. Gibt es eine Möglichkeit, es schneller zu machen? Jede Hilfe wird geschätzt. Formulare | rep_id: test | ror: 111 | scandts: 12/4/2018 | Sponsor: h | ssn_tin: 123456789 | proc_dte: 2018-12-05 | s Süßigkeit: 2018-12-05 | docsize: +000048020 | Numpages: +000000001 | SRC_SYS: +000000213 | BPMWRK: Annuitäten - FL | DC_SRCRF: 234909631_fl_198585_AlliANZ_Annuity_Application_Details_AlliANZ_Annuity_Application_Detail.pdf | Dir_ACCT: 3333333333 | doc_cls: Direct: DirectaM: DOC_ORG: ELEKTRIC: DOC_ORG: ELEKTRIC: DOC_ORG: ELEKTRIC: 12/2018 | DOC_ORG: Pm | fname: ftst | xxxx: 12345 | kkkkk: 123456332 | lname: ltst | lob: Annuität | MailType: Electronic | Mimetype: Anwendung/PDF | Repfname: TSTR | Replname: tstl | repr 2 | repstat: regulär | spclhand: archiveonly |>
AWS -Kleber/Funkenleistungsproblem ⇐ Python
-
- Similar Topics
- Replies
- Views
- Last post