Spark Declarative Pipelines (SDP) – TABLE_OR_VIEW_NOT_FOUND für Upstream-Tabelle, obwohl sie definiert istPython

Python-Programme
Anonymous
 Spark Declarative Pipelines (SDP) – TABLE_OR_VIEW_NOT_FOUND für Upstream-Tabelle, obwohl sie definiert ist

Post by Anonymous »

Ich versuche, Spark Declarative Pipelines (Spark 4.0 / pyspark.pipelines) lokal mithilfe der Spark-Pipelines-CLI zu erlernen.
Ich habe eine einfache Bronze → Silver → Gold-Pipeline, aber ich erhalte ständig Folgendes:

pyspark.errors.Exceptions.connect.AnalysisException:
[TABLE_OR_VIEW_NOT_FOUND] Die Tabelle oder Ansicht „bronze_raw“ kann nicht gefunden werden

obwohl bronze_raw mit @sdp.table definiert ist.

Code: Select all

Environment
•   Spark: 4.1.0
•   PySpark
•   Spark Connect (used implicitly by spark-pipelines)
•   Local machine (macOS)
•   Running with: spark-pipelines run pipeline.yml
Folgend sind die beiden Dateien in meinem Arbeitsordner, keine Unterordner.
Pipeline-Spezifikation:pipeline.yml

Code: Select all

name: bronze_silver_gold_pipeline

storage:
root: file:///tmp/spark-pipelines/bronze_silver_gold_pipeline

libraries:
- glob:
include: "pipeline_definitions.py"
Pipeline-Definitionen: (pipeline_defiitions.py)

Code: Select all

from pyspark import pipelines as sdp
import pyspark.sql.functions as F
spark = SparkSession.active()
# Bronze Layer
@sdp.table(name="bronze_raw")
def bronze_raw():
return (
spark.read
.option("header", True)
.csv("file:///Users/abhisheknarayanchaudhury/Desktop/Spark Learning/dirty_data_2.csv")
)

# Silver Layer
@sdp.materialized_view(name="silver_cleaned")
def silver_cleaned():
df_bronze = spark.table("bronze_raw")

df = (
df_bronze
.withColumn("rn", F.monotonically_increasing_id())
.filter(F.col("rn") > 1)
.drop("rn")
)

return df

# Gold Layer
@sdp.materialized_view(name="gold_unpivoted")
def gold_unpivoted():
df_silver = spark.table("silver_cleaned")
return df_silver
Fehlermeldung:

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND]
Die Tabelle oder Ansicht „bronze_raw“ wurde nicht gefunden.
'UnresolvedRelation [bronze_raw]

Der Fehler tritt während der Pipeline auf Registrierung,
Was ich bereits versucht habe
• Definieren von bronze_raw mit @sdp.table
• Sicherstellen absoluter Dateipfade für CSV
• Entfernen von SparkSession.builder und SparkSession.getActiveSession()
• Verwenden von spark.table("bronze_raw") für Downstream Abhängigkeiten
Trotzdem kann silver_cleaned bronze_raw nicht auflösen.
Frage
  • Was ist der richtige Weg, um Upstream-Tabellen in Spark Declarative Pipelines zu referenzieren?
  • Ist spark.table("bronze_raw") der richtige Ansatz?
  • Gibt es etwas Bestimmtes am Spark Connect- oder SDP-Ausführungskontext, das diesen Fehler verursacht?
  • Gibt es zusätzliche Anforderungen für die Deklaration oder Materialisierung von Upstream-Tabellen? Jedes minimale Arbeitsbeispiel oder jede Erklärung des Ausführungsmodells wäre sehr hilfreich.
Erwartetes Verhalten: Spark-Pipelines sollten bronze_raw als Upstream-Abhängigkeit erkennen und silver_cleaned erlauben, darauf zu verweisen.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post