Tuple hat keinen Attribut "Rang" -Fehler beim Versuch, Bayes'sche neuronales Netz aufzubauenPython

Python-Programme
Anonymous
 Tuple hat keinen Attribut "Rang" -Fehler beim Versuch, Bayes'sche neuronales Netz aufzubauen

Post by Anonymous »

Ich versuche, eine BNN zu erstellen, aber ich begegne den Fehler im Titel. Ich versuchte sicherzustellen, dass ich kein Tupel an .Shape.rank durch: < /p>

Verwenden der funktionalen API mit explizitem Eingang (sollte die erste Denseflipout -Schicht mit einem Tensor verwendet werden, nicht unterrühren) < /li>
Stellen Sie sicher, dass feat_cols nicht leer ist und einen ungültigen Tensor < /li>
< /ul>
Hier ein reproduzierbares Beispiel. Ich habe meinen gesamten Code aufgenommen, da ich nicht weiß, wo die Quelle des Fehlers ist. Dies ist mit Dummy -Daten, aber es wirft den gleichen Fehler aus, sodass meine reale Datenverarbeitung nicht die Quelle des Problems ist. Ich verwende TensorFlow 2.19.0 und TensorFlow-Probability 0.24.0 < /p>

Code: Select all

# =======================================================================
#  Bayesian Student‑T Flip‑out (Random Data)
#  ---------------------------------------------------------------------
#  • Simulates a 300‑day time‑series:
#        - column  y      (current value)
#        - column  y_next (target = tomorrow’s y)
#        - 8 random features  f1 … f8
#  • Robust‑scales with 60‑day rolling median / IQR (shifted by one day)
#  • Builds two feature sets
#        FULL_FEATURES      – all scaled columns except raw y / y_next
#        LIMITED_FEATURES   – drops every  median_*  /  iqr_*  helper
#  • Trains a Bayesian Student‑T network (128‑64‑32 Flip‑out) twice
#  • Prints overall directional‑accuracy (based on mean prediction)
#    and bucketed confidence‑vs‑direction diagnostics
# =======================================================================
import math, random, numpy as np, pandas as pd, tensorflow as tf
import tensorflow_probability as tfp
tfd, tfpl = tfp.distributions, tfp.layers
tf.keras.backend.set_floatx('float32')

# ───────────────────── 1. simulate random data ──────────────────────
def simulate_data(n=300, seed=42):
np.random.seed(seed)
dates = pd.date_range("2020-01-01", periods=n, freq="D")
y     = np.random.normal(100, 10, size=n)
feats = {f"f{i}": np.random.normal(0, 1, size=n) for i in range(1, 9)}
df = pd.DataFrame({"Date": dates, "y": y, **feats})
df["y_next"] = df["y"].shift(-1)
df.dropna(inplace=True)
df["unscaled_y"]        = df["y"]
df["unscaled_y_next"]   = df["y_next"]
return df

df_raw = simulate_data()

# ───────────── 2. robust 60‑day median / IQR scaling ───────────────
def robust_scale(df, win=60):
df = df.copy()
df.sort_values("Date", inplace=True)
df.set_index("Date", inplace=True)
med = df.rolling(win, 1).median().shift(1)
iqr = df.rolling(win, 1).quantile(0.75).shift(1) - df.rolling(win, 1).quantile(0.25).shift(1)
for c in df.columns:
df[f"median_{c}"] = med[c]
df[f"iqr_{c}"]    = iqr[c]
helpers = [c for c in df.columns if c.startswith(("median_","iqr_"))]
for c in df.columns.difference(helpers):
df[c] = (df[c] - df[f"median_{c}"]) / df[f"iqr_{c}"]
df.dropna(inplace=True)
df.reset_index(inplace=True)
return df

df_scaled = robust_scale(df_raw)

# ───────────────────── 3. feature lists ─────────────────────────────
BASE_EXCL = ["Date","unscaled_y","unscaled_y_next","y_next"]
FULL_FEATURES      = [c for c in df_scaled.columns if c not in BASE_EXCL]
LIMITED_FEATURES   = [c for c in FULL_FEATURES
if not (c.startswith("median_") or c.startswith("iqr_"))]
print("[INFO] FULL_FEATURES count   :", len(FULL_FEATURES))
print("[INFO] LIMITED_FEATURES count:", len(LIMITED_FEATURES))

def make_xy(df, cols):
return df[cols].to_numpy("float32"), df["y_next"].to_numpy("float32")

# chronological split
n=len(df_scaled); test_sz=30
train_val=df_scaled.iloc[:-test_sz]; test=df_scaled.iloc[-test_sz:]
split=int(0.8*len(train_val))
train=train_val.iloc[:split].sample(frac=1.0, random_state=85)  # shuffle
val  =train_val.iloc[split:]

Xtr_full,ytr_full = make_xy(train, FULL_FEATURES)
Xva_full,yva_full = make_xy(val,   FULL_FEATURES)
Xtr_lim ,ytr_lim  = make_xy(train, LIMITED_FEATURES)
Xva_lim ,yva_lim  = make_xy(val,   LIMITED_FEATURES)

# ───────────────────── 4.  Student‑T helper ──────────────────────────
class StudentTReparam(tfd.Distribution):
def __init__(self, loc, scale, df=5.0):
self.loc,self.scale=loc,scale
self.df=tf.constant(df, loc.dtype)
super().__init__(dtype=loc.dtype,
reparameterization_type=tfd.FULLY_REPARAMETERIZED,
validate_args=False, allow_nan_stats=True)
def _batch_shape(self):        return self.loc.shape
def _batch_shape_tensor(self): return tf.shape(self.loc)
def _event_shape(self):        return tf.TensorShape([])
def _event_shape_tensor(self): return tf.constant([],tf.int32)
def _log_prob(self,x):
z=(x-self.loc)/self.scale
return (tf.math.lgamma((self.df+1)/2)-tf.math.lgamma(self.df/2)
-0.5*tf.math.log(self.df*math.pi)-tf.math.log(self.scale)
-0.5*(self.df+1)*tf.math.log1p(z*z/self.df))
def _sample_n(self,n,seed=None):
shp=tf.concat([[n],tf.shape(self.loc)],0)
z=tf.random.normal(shp,dtype=self.loc.dtype,seed=seed)
g=tf.random.gamma(shp,self.df/2,0.5,seed=seed,dtype=self.loc.dtype)
return self.loc+self.scale*z/tf.sqrt(g/self.df)
def _cdf(self,x): return tfd.StudentT(self.df,self.loc,self.scale).cdf(x)

def make_student_t(p):
return StudentTReparam(p[...,0], tf.nn.softplus(p[...,1])+1e-2)

# ───────────────────── 5. loss components ───────────────────────────
def crps(d,y,n=100):
s1=d.sample(n,seed=0); s2=d.sample(n,seed=1)
t1=tf.reduce_mean(tf.abs(s1-y),0)
t2=tf.reduce_mean(tf.abs(s1[:,None]-s2[None,:]),[0,1])
return tf.reduce_mean(t1-0.5*t2)

def coverage_err(d,y,low=0.05,high=0.95,alpha=5.0,target=0.9):
c=tf.clip_by_value(d.cdf(y),1e-6,1-1e-6)
inside=tf.sigmoid(alpha*(c-low))*tf.sigmoid(alpha*(high-c))
return tf.abs(tf.reduce_mean(inside)-target)

def full_loss(y,p):
d=make_student_t(p)
return (-tf.reduce_mean(d.log_prob(y))
+0.1*crps(d,y)
+0.1*coverage_err(d,y))

# ───────────── 6. inverse‑transform helper ──────────────────────────
def inverse_transform_pred(scaled, med, iqr):
return scaled * iqr + med

# ───────────── 7. directional accuracy  (mean) ──────────────────────
def directional_accuracy(df_unscaled, params):
d = make_student_t(params)
pred_scaled = d.mean().numpy()
med = df_unscaled['median_y_next'].to_numpy('float32')
iqr = df_unscaled['iqr_y_next'].to_numpy('float32')
pred_raw = inverse_transform_pred(pred_scaled, med, iqr)
curr_raw = df_unscaled['unscaled_y'].to_numpy('float32')
next_raw = df_unscaled['unscaled_y_next'].to_numpy('float32')
pred_dir = np.sign(pred_raw - curr_raw)
true_dir = np.sign(next_raw - curr_raw)
return np.mean(pred_dir == true_dir)

# ───────────── 8. bucket confidence diagnostics ─────────────────────
def bucket_directional_confidences(df_unscaled, params):
d = make_student_t(params)
curr_raw = df_unscaled['unscaled_y'].to_numpy('float32')
next_raw = df_unscaled['unscaled_y_next'].to_numpy('float32')
med = df_unscaled['median_y_next'].to_numpy('float32')
iqr = df_unscaled['iqr_y_next'].to_numpy('float32')
curr_scaled = (curr_raw - med) / iqr

pred_scaled = d.mean().numpy()
pred_raw = inverse_transform_pred(pred_scaled, med, iqr)
pred_dir = np.sign(pred_raw - curr_raw)

p_up = 1.0 - d.cdf(curr_scaled).numpy()
conf = np.where(pred_dir >= 0, p_up, 1.0 - p_up)

true_dir = np.sign(next_raw - curr_raw)
correct = pred_dir == true_dir

bucket = np.clip(np.floor(conf*10).astype(int), 0, 9)
total  = np.bincount(bucket, minlength=10)
hit    = np.bincount(bucket, weights=correct, minlength=10)
pct    = np.divide(hit, total, out=np.zeros_like(hit), where=total>0)

return {"bucket_accuracy": pct,
"total": total,
"correct": hit,
"best_bucket": int(np.argmax(pct)),
"best_bucket_accuracy": pct.max() if total.sum() else 0.0}

# ───────────────────── 9.  model & trainer ───────────────────────────
def build_bnn(input_dim, seed=1):
print(f"[INFO] Building network with input_dim = {input_dim}")
inp=tf.keras.Input(shape=(input_dim,),dtype='float32')
x=tfpl.DenseFlipout(128,activation='relu',seed=seed)(inp)
x=tfpl.DenseFlipout( 64,activation='relu',seed=seed+1)(x)
x=tfpl.DenseFlipout( 32,activation='relu',seed=seed+2)(x)
out=tfpl.DenseFlipout(  2,activation=None ,seed=seed+3)(x)
return tf.keras.Model(inp,out)

class Trainer(tf.keras.Model):
def __init__(self,base): super().__init__(inputs=base.input,outputs=base.output)
def train_step(self,data):
x,y=data
with tf.GradientTape() as tape:
p=self(x,training=True)
tape.watch(self.trainable_variables)
loss=full_loss(y,p)+sum(self.losses)
grads=tape.gradient(loss,self.trainable_variables)
self.optimizer.apply_gradients(zip(grads,self.trainable_variables))
return {"loss":loss}

# ─────────────────── 10. training helper ────────────────────────────
def run_variant(name,Xtr,ytr,Xva,yva,df_val,seed=85,epochs=3):
tf.random.set_seed(seed); np.random.seed(seed); random.seed(seed)
net=Trainer(build_bnn(Xtr.shape[1],seed))
net.compile(optimizer=tf.keras.optimizers.Adam(1e-3),run_eagerly=True)
print(f"\n[TRAIN] {name}")
net.fit(Xtr,ytr,validation_data=(Xva,yva),epochs=epochs,batch_size=32,verbose=1)
params_val=net(Xva,training=False)
dir_acc = directional_accuracy(df_val, params_val)
print("[VAL] directional accuracy:", dir_acc)
print("bucket diagnostics:", bucket_directional_confidences(df_val, params_val))

# ───────────────────────── 11. RUN  ────────────────────────────────
run_variant("FULL features",   Xtr_full,ytr_full,Xva_full,yva_full,val,epochs=3)
run_variant("LIMITED features",Xtr_lim ,ytr_lim ,Xva_lim ,yva_lim ,val,epochs=3)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post