by Guest » 24 Dec 2024, 10:33
Ich baue eine DAG, die Pandas verwendet, aber sie spiegelt sich nicht im Airflow wider. WebUi zeigt an, dass Pandas nicht gefunden wurde. Ich habe es auf meinem lokalen Rechner installiert und wsl aktiviert Ubuntu.
Wenn ich die Python-Umgebung auf beiden Seiten ausführe und importiere, funktioniert es und zeigt auch die installierte Version an.
Windows
Ubuntu
Code: Select all
import requests as re
from io import StringIO as si
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from datetime import datetime, timedelta
import sys
import os
sys.path.insert(0,os.path.abspath(os.path.dirname(__file__)))
default_args = {
'owner' : '****',
'retires' : 5,
'retry_delay' : timedelta(minutes=3)
}
def fetch_data():
url = re.get("https://catalog.ourworldindata.org/explorers/who/latest/monkeypox/monkeypox.csv")
if url.status_code == 200:
csv_data=si(url.text)
df = pd.read_csv(csv_data, low_memory=False)
new_df = df[['location', 'date', 'iso_code', 'total_cases', 'total_deaths', 'new_cases', 'new_deaths']]
print(new_df.shape)
with DAG(
default_args=default_args,
dag_id='owid_data_pipeline',
description='DAG to fetch and load owid mpox daily data into postgres',
start_date=datetime(2024, 12, 18),
schedule_interval='@daily'
) as dag :
task1 = PythonOperator(
task_id='fetch_data',
python_callable=fetch_data
)
task1
Fehler
Bitte schlagen Sie Jungs vor, da ich mir jetzt schon einen ganzen Tag darüber den Kopf zerbreche.
Vielen Dank. GGs.
Ich baue eine DAG, die Pandas verwendet, aber sie spiegelt sich nicht im Airflow wider. WebUi zeigt an, dass Pandas nicht gefunden wurde. Ich habe es auf meinem lokalen Rechner installiert und wsl aktiviert Ubuntu.
Wenn ich die Python-Umgebung auf beiden Seiten ausführe und importiere, funktioniert es und zeigt auch die installierte Version an.
Windows
[img]https://i.sstatic.net/1K0Eabl3.png[/img]
Ubuntu
[img]https://i.sstatic.net/QS4q9e4n.png[/img]
[code]import requests as re
from io import StringIO as si
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from datetime import datetime, timedelta
import sys
import os
sys.path.insert(0,os.path.abspath(os.path.dirname(__file__)))
default_args = {
'owner' : '****',
'retires' : 5,
'retry_delay' : timedelta(minutes=3)
}
def fetch_data():
url = re.get("https://catalog.ourworldindata.org/explorers/who/latest/monkeypox/monkeypox.csv")
if url.status_code == 200:
csv_data=si(url.text)
df = pd.read_csv(csv_data, low_memory=False)
new_df = df[['location', 'date', 'iso_code', 'total_cases', 'total_deaths', 'new_cases', 'new_deaths']]
print(new_df.shape)
with DAG(
default_args=default_args,
dag_id='owid_data_pipeline',
description='DAG to fetch and load owid mpox daily data into postgres',
start_date=datetime(2024, 12, 18),
schedule_interval='@daily'
) as dag :
task1 = PythonOperator(
task_id='fetch_data',
python_callable=fetch_data
)
task1
[/code]
Fehler
[img]https://i.sstatic.net/WxnF1kVw.png[/img]
Bitte schlagen Sie Jungs vor, da ich mir jetzt schon einen ganzen Tag darüber den Kopf zerbreche.
Vielen Dank. GGs.