Code: Select all
( id, campaign_id, platform, duration_type ENUM[daily,monthly], breakdown_type ENUM[None,age,gender,country], insights: JSON )
Code: Select all
[{activity_type: 'like', value: '1'}] or
[{activity_type: 'offsite_conversion.fb_pixel_add_payment_info', value: '1'}],
Ich muss das reduzieren, ich kann eine statische Karte wie
bereitstellen
Code: Select all
{
like: { actions: 'like', cost_per_action_type: 'cost_per_like' },
'offsite_conversion.fb_pixel_add_payment_info': { actions: 'website_add_payment_info', cost_per_action_type: 'cost_per_website_add_payment_info' },
'app_custom_event.fb_mobile_add_payment_info': { actions: 'in_app_add_payment_info', cost_per_action_type: 'cost_per_in_app_add_payment_info' }
}
Code: Select all
META_METRIC_FIELDS = {
"actions": "",
"unique_actions": "unique_",
"cost_per_action_type": "cost_per_",
"cost_per_unique_action_type": "unique_cost_per_",
"outbound_clicks": "",
"outbound_clicks_ctr": "",
"cost_per_outbound_click": "",
"cost_per_unique_outbound_click": "",
"video_time_watched_actions": "",
"video_play_actions": "",
"video_p25_watched_actions": "",
"video_p50_watched_actions": "",
"video_p75_watched_actions": "",
"video_p95_watched_actions": "",
"video_p100_watched_actions": "",
"cost_per_2_sec_continuous_video_view": "",
"cost_per_15_sec_video_view": "",
"video_avg_time_watched_actions": "",
"cost_per_30_sec_video_view": ""
}
def flatten_meta_array_metrics(df):
df.show(1)
insights_col = "insights"
for field, prefix in META_METRIC_FIELDS.items():
# Get unique action_types present in the field across the dataset (optional optimization)
# Otherwise, use a fixed list or schema metadata
full_field = f"{insights_col}.{field}"
if field not in df.schema[insights_col].dataType.names:
continue # Skip if not present in schema
flat_map_col = f"{field}_flat_map"
# action_type_col = f"{insights_col}.{field}"
df = df.withColumn(
flat_map_col,
F.when(
F.col(full_field).isNotNull(),
F.expr(f"""
map_from_entries(
filter(transform({full_field}, x -> struct(
CASE WHEN x.action_type IS NOT NULL THEN '{prefix}' || x.action_type ELSE NULL END as key,
x.value as value
)), x -> x.key IS NOT NULL)
)
""")
)
)
# )
keys = (
df.select(flat_map_col)
.rdd.flatMap(lambda row: list(row[0].keys()) if row[0] else [])
.distinct()
.collect()
)
key_map = {key: key.replace(".", "_") for key in keys}
# Add new fields into insights struct
for raw_key, safe_key in key_map.items():
df = df.withColumn(
insights_col,
F.col(insights_col).withField(safe_key, F.col(flat_map_col).getItem(raw_key))
)
# Drop original complex array field and the map
df = df.withColumn(insights_col, F.col(insights_col).dropFields(field))
df = df.drop(flat_map_col)
df.show(1)
return df
Code: Select all
{'like': 1, 'offsite_conversion.fb_pixel_add_payment_info': 1, app_custom_event.fb_mobile_add_payment_info: 1}
Code: Select all
{'like': 1, 'website_add_payment_info': 1, in_app_add_payment_info: 1}
Mobile version