Ich habe eine einfache Bronze → Silver → Gold-Pipeline, aber ich erhalte ständig Folgendes:
pyspark.errors.Exceptions.connect.AnalysisException:
[TABLE_OR_VIEW_NOT_FOUND] Die Tabelle oder Ansicht „bronze_raw“ kann nicht gefunden werden
obwohl bronze_raw mit @sdp.table definiert ist.
Code: Select all
Environment
• Spark: 4.1.0
• PySpark
• Spark Connect (used implicitly by spark-pipelines)
• Local machine (macOS)
• Running with: spark-pipelines run pipeline.yml
Pipeline-Spezifikation:pipeline.yml
Code: Select all
name: bronze_silver_gold_pipeline
storage:
root: file:///tmp/spark-pipelines/bronze_silver_gold_pipeline
libraries:
- glob:
include: "pipeline_definitions.py"
Code: Select all
from pyspark import pipelines as sdp
import pyspark.sql.functions as F
spark = SparkSession.active()
# Bronze Layer
@sdp.table(name="bronze_raw")
def bronze_raw():
return (
spark.read
.option("header", True)
.csv("file:///Users/abhisheknarayanchaudhury/Desktop/Spark Learning/dirty_data_2.csv")
)
# Silver Layer
@sdp.materialized_view(name="silver_cleaned")
def silver_cleaned():
df_bronze = spark.table("bronze_raw")
df = (
df_bronze
.withColumn("rn", F.monotonically_increasing_id())
.filter(F.col("rn") > 1)
.drop("rn")
)
return df
# Gold Layer
@sdp.materialized_view(name="gold_unpivoted")
def gold_unpivoted():
df_silver = spark.table("silver_cleaned")
return df_silver
AnalysisException: [TABLE_OR_VIEW_NOT_FOUND]
Die Tabelle oder Ansicht „bronze_raw“ wurde nicht gefunden.
'UnresolvedRelation [bronze_raw]
Der Fehler tritt während der Pipeline auf Registrierung,
Was ich bereits versucht habe
• Definieren von bronze_raw mit @sdp.table
• Sicherstellen absoluter Dateipfade für CSV
• Entfernen von SparkSession.builder und SparkSession.getActiveSession()
• Verwenden von spark.table("bronze_raw") für Downstream Abhängigkeiten
Trotzdem kann silver_cleaned bronze_raw nicht auflösen.
Frage
- Was ist der richtige Weg, um Upstream-Tabellen in Spark Declarative Pipelines zu referenzieren?
- Ist spark.table("bronze_raw") der richtige Ansatz?
- Gibt es etwas Bestimmtes am Spark Connect- oder SDP-Ausführungskontext, das diesen Fehler verursacht?
- Gibt es zusätzliche Anforderungen für die Deklaration oder Materialisierung von Upstream-Tabellen? Jedes minimale Arbeitsbeispiel oder jede Erklärung des Ausführungsmodells wäre sehr hilfreich.
Mobile version