Laden Sie einen HTTP -Link im Körper der Mail mit einem bestimmten Betreffnamen, Label (Ordner_Name) und dem Absender au
Posted: 20 Aug 2025, 14:45
`
i, um einen Link im Körper der E -Mail auf einen bestimmten Pfad herunterzuladen, der von einem https: //storage.googleapis.com beginnt. HH: MM: SS -Format. Das Betreff besteht auch aus mehreren Zeichenfolgen im folgenden Format
Superbi_download_NL_UPSTREAM_PENDENCE_LH_WED Aug 2025 17:03:54 gmt+0530
Bigfoot-Reporting < /p>
Ich muss den HTTP-Link im Körper der E-Mail mit diesen Zeichenfolgen mit dem Zeitstempelformular herunterladen. Der Beschriftungsordner, der von einem bestimmten Pfad mit ähnlicher Dateinamen ähnelt, der der Betreffzeile ähnelt.
Innerhalb der Hauptfunktion
i, um einen Link im Körper der E -Mail auf einen bestimmten Pfad herunterzuladen, der von einem https: //storage.googleapis.com beginnt. HH: MM: SS -Format. Das Betreff besteht auch aus mehreren Zeichenfolgen im folgenden Format
Superbi_download_NL_UPSTREAM_PENDENCE_LH_WED Aug 2025 17:03:54 gmt+0530
Bigfoot-Reporting < /p>
Ich muss den HTTP-Link im Körper der E-Mail mit diesen Zeichenfolgen mit dem Zeitstempelformular herunterladen. Der Beschriftungsordner, der von einem bestimmten Pfad mit ähnlicher Dateinamen ähnelt, der der Betreffzeile ähnelt.
Code: Select all
def wait_and_download_report_email_by_time(
service, start_ms, subject_time_hms, dest_dir,
subject_contains=REPORT_SUBJECT_CONTAINS,
from_addr=REPORT_FROM_ADDR, label_name=REPORT_LABEL_NAME,
max_wait_sec=5400, poll_sec=20, verbose=True):
`Wait for label'd email AFTER start_ms whose Subject contains HH:MM:SS; download first https link.`
deadline = time.time() + max_wait_sec
label_id = _gmail_get_label_id_by_name(service, label_name)
base_q = f'newer_than:7d from:{from_addr} subject:"{subject_contains}"'
if not label_id:
base_q = f"label:{label_name} " + base_q
if verbose:
print(f"Waitingfor'{subject_contains}'with time {subject_time_hms}in label '{label_name}'")
while time.time() < deadline:
try:
kwargs = dict(userId="me", q=base_q, maxResults=20)
if label_id: kwargs["labelIds"] = [label_id]
res = service.users().messages().list(**kwargs).execute()
for item in res.get("messages", []):
msg = service.users().messages().get(userId="me", id=item["id"]).execute()
internal_ms = int(msg.get("internalDate", "0"))
if internal_ms < int(start_ms): continue
headers = msg.get("payload", {}).get("headers", [])
subject = next((h["value"] for h in headers if h["name"]=="Subject"), "")
if subject_time_hms not in subject: continue
text = _gmail_get_full_text(msg)
url = _first_https_link(text)
if not url:
if verbose: print("Matching subject found but no https link; polling…")
continue
ts = time.strftime("%Y%m%d_%H%M%S", time.localtime(internal_ms/1000))
default_name = f"{(subject_contains or 'report').replace('','_')}_{ts}.csv"
local_path = download_signed_url(url, dest_dir, filename=default_name)
if verbose: print("Downloaded:", local_path)
return local_path
if verbose: print("…not yet; checking again")
except HttpError as e:
if verbose: print("⚠️ Gmail API error:", e)
time.sleep(poll_sec)
if verbose:
print("Report email not found (or no link) within wait window.")
return None
Code: Select all
def main():
driver = setup_driver()
wait = WebDriverWait(driver, 20)
local_file = wait_and_download_report_email_by_time(
service=service_gmail,
start_ms=report_wait_start_ms,
subject_time_hms=subject_time_hms,
dest_dir=DOWNLOAD_PATH,
subject_contains=REPORT_SUBJECT_CONTAINS,
from_addr=REPORT_FROM_ADDR,
label_name=REPORT_LABEL_NAME,
max_wait_sec=5400, # up to 90 minutes
poll_sec=20,
verbose=True)
< /code>
** Gmail -ID mit Namen erhalten ** < /p>
def _gmail_get_label_id_by_name(service, name):
try:
labels = service.users().labels().list(userId="me").execute().get("labels", [])
for lb in labels:
if lb.get("name","").lower() == name.lower():
return lb.get("id")
except HttpError:
pass
return None