Wie implementiert man eine Mischung von Gammaverteilungen in Python ohne Bayes?Python

Python-Programme
Guest
 Wie implementiert man eine Mischung von Gammaverteilungen in Python ohne Bayes?

Post by Guest »

Ich versuche, Beispiele zu erstellen, um Bayesian MCMC (z. B. HMC) mit nicht-Bayesianischen Äquivalenten zu vergleichen und gegenüberzustellen. Einer der Fälle, die ich als schwierig empfinde, ist das Erstellen einer Mischung aus Gammaverteilungen.
Ich hatte zunächst einige vorläufige Erfolge mit einer Mischung aus zwei Verteilungen:

Code: Select all

import numpy as np
from scipy.stats import gamma, rv_continuous
import matplotlib.pyplot as plt
from scipy.optimize import minimize

class gamma_mixture(rv_continuous):
def _pdf(self, x, w, a1, scale1, a2, scale2):
return w * gamma.pdf(x, a1, scale=scale1) + (1 - w) * gamma.pdf(x, a2, scale=scale2)

def fit(self, data):
def log_likelihood(params):
w, a1, scale1, a2, scale2 = params
mixture = w * gamma.pdf(data, a1, scale=scale1) + (1 - w) * gamma.pdf(data, a2, scale=scale2)
return -np.sum(np.log(mixture))

initial_params = [0.8, 2.0, 2.0, 10.0, 1.0]
bounds = [(0, 1), (0, None), (0, None), (0, None), (0, None)]
result = minimize(log_likelihood, initial_params, bounds=bounds, method='L-BFGS-B')
if result.success:
self.fitted_params = result.x
else:
raise RuntimeError("Optimization failed")

# Generate sample data
np.random.seed(2018)
data = np.concatenate([
gamma.rvs(a=2.0, scale=2.0, size=100),
gamma.rvs(a=20.0, scale=1.0, size=100)
])

# Define and fit the gamma mixture model to the data
custom_gamma_mixture = gamma_mixture(name='gamma_mixture')
custom_gamma_mixture.fit(data)
w, a1, scale1, a2, scale2 = custom_gamma_mixture.fitted_params

# Evaluate the PDF of the fitted mixture model
x = np.linspace(data.min(), data.max(), 1000)
pdf_vals = custom_gamma_mixture.pdf(x, w, a1, scale1, a2, scale2)

# Plot the fitted PDF against the histogram of the data
fig, axes = plt.subplots(2, sharex=True)
axes[0].hist(data, bins=30, density=True, alpha=0.6, color='g', label='Data Histogram')
axes[0].plot(x, pdf_vals, 'r-', lw=2, label='Fitted Mixture PDF')
axes[0].set_title('Original Sample')

axes[1].hist(custom_gamma_mixture(*custom_gamma_mixture.fitted_params).rvs(size=200), bins=30, density=True, alpha=0.6, color='b', label='Data Histogram')
axes[1].plot(x, pdf_vals, 'r-', lw=2, label='Fitted Mixture PDF')
axes[1].set_title('New Sample')

plt.tight_layout()
plt.show()

# Output fitted parameters
print("Fitted Parameters:")
print(f"w: {w:.4f}")
print(f"a1: {a1:.4f}, scale1: {scale1:.4f}")
print(f"a2: {a2:.4f}, scale2: {scale2:.4f}")
Ich habe dann versucht, auf mehrere Verteilungen zu verallgemeinern, und festgestellt, dass entweder die Konvergenz fehlschlug oder die dargestellte Verteilung einfach nicht richtig aussah. Hier ist ein Beispiel:

Code: Select all

import numpy as np
from scipy.stats import gamma
from scipy.optimize import minimize
from typing import Tuple

class GammaMixture:
def __init__(self, n_components: int):
self.n_components = n_components
self.weights = np.ones(n_components) / n_components
self.alphas = np.ones(n_components)
self.scales = np.ones(n_components)

def _pdf(self, x: np.ndarray) -> np.ndarray:
mixture = np.sum(self.weights[i] * gamma.pdf(x, self.alphas[i], scale=self.scales[i]) for i in range(self.n_components))
return mixture

def _negative_log_likelihood(self, params: np.ndarray, data: np.ndarray) -> float:
self.weights, self.alphas, self.scales = np.split(params, [self.n_components, 2*self.n_components])
self.weights = np.exp(self.weights) / np.sum(np.exp(self.weights))  # Ensure probabilities sum to 1
neg_log_likelihood = -np.sum(np.log(self._pdf(data)))
return neg_log_likelihood

def fit(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
initial_params = np.concatenate([np.zeros(self.n_components), np.ones(2*self.n_components)])
bounds = [(0, None)] * self.n_components + [(0, None)] * (2*self.n_components)
result = minimize(self._negative_log_likelihood, initial_params, args=(data,), bounds=bounds)
if result.success:
self.weights, self.alphas, self.scales = np.split(result.x, [self.n_components, 2*self.n_components])
self.weights = np.exp(self.weights) / np.sum(np.exp(self.weights))  # Ensure probabilities sum to 1
return self.weights, self.alphas, self.scales
else:
raise RuntimeError("Optimization failed")

def sample(self, n_samples: int) -> np.ndarray:
components = np.random.choice(self.n_components, size=n_samples, p=self.weights)
samples = np.array([gamma.rvs(self.alphas[i], scale=self.scales[i]) for i in components])
return samples

# Example usage:
np.random.seed(0)
data = np.concatenate([
gamma.rvs(a=2.0, scale=2.0, size=100),
gamma.rvs(a=20.0, scale=1.0, size=100)
])

n_components = 3
gamma_mixture = GammaMixture(n_components)
weights, alphas, scales = gamma_mixture.fit(data)

print("Fitted Parameters:")
print("Weights:", weights)
print("Alphas:", alphas)
print("Scales:", scales)

# Generate samples from the fitted model
samples = gamma_mixture.sample(n_samples=1000)

import matplotlib.pyplot as plt

# Plot histograms
plt.figure(figsize=(10, 5))

# Histogram of original data
plt.subplot(1, 2, 1)
plt.hist(data, bins=30, density=True, color='blue', alpha=0.6, label='Original Data')
plt.title('Histogram of Original Data')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()

# Histogram of new samples
plt.subplot(1, 2, 2)
plt.hist(samples, bins=30, density=True, color='orange', alpha=0.6, label='New Samples')
plt.title('Histogram of New Samples')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()

plt.tight_layout()
plt.show()
Ich hatte mich gefragt, ob ich vielleicht die Erwartungsmaximierung verwenden sollte, also habe ich versucht, auch KMeans-Clustering zu verwenden, um dem Prozess einen Warmstart zu ermöglichen:

Code: Select all

import numpy as np
from scipy.stats import gamma
from sklearn.cluster import KMeans

class GammaMixture:
def __init__(self, n_components: int):
"""
Initialize the Gamma Mixture Model.

Args:
n_components (int): Number of gamma distributions (components) in the mixture.
"""
self.n_components = n_components
self.weights = np.ones(n_components) / n_components
self.alphas = np.ones(n_components)
self.scales = np.ones(n_components)
self.fitted = False

def _e_step(self, data: np.ndarray) ->  np.ndarray:
"""
E-step: Calculate the responsibilities.

Args:
data (np.ndarray): Observed data.

Returns:
np.ndarray: Responsibilities of each component for each data point.
"""
responsibilities = np.zeros((data.shape[0], self.n_components))
for i in range(self.n_components):
responsibilities[:, i] = self.weights[i] * gamma.pdf(data, a=self.alphas[i], scale=self.scales[i])

sum_responsibilities = np.sum(responsibilities, axis=1).reshape(-1, 1)
if np.any(sum_responsibilities == 0):
raise ValueError("Some data points have zero responsibilities.")

responsibilities /= sum_responsibilities
return responsibilities

def _m_step(self, data: np.ndarray, responsibilities: np.ndarray):
"""
M-step: Update the parameters of the gamma distributions and the weights.

Args:
data (np.ndarray): Observed data.
responsibilities (np.ndarray): Responsibilities of each component for each data point.
"""
total_resp = np.sum(responsibilities, axis=0)
self.weights = total_resp / data.shape[0]

for i in range(self.n_components):
resp = responsibilities[:, i]
weighted_data_sum = np.sum(resp * data)
weighted_log_data_sum = np.sum(resp * np.log(data))

if total_resp[i] == 0 or weighted_data_sum == 0 or weighted_log_data_sum == 0:
raise ValueError(f"Invalid weighted sums for component {i}: total_resp={total_resp[i]}, weighted_data_sum={weighted_data_sum}, weighted_log_data_sum={weighted_log_data_sum}")

self.alphas[i] = total_resp[i] / (np.sum(resp * np.log(data)) - np.sum(resp) * np.log(weighted_data_sum / total_resp[i]))
self.scales[i] = weighted_data_sum / (total_resp[i] * self.alphas[i])

if np.isnan(self.alphas[i]) or np.isnan(self.scales[i]):
raise ValueError(f"NaN encountered in alphas or scales during M-step for component {i}.")

print(f"Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")

def _warm_start(self, data: np.ndarray):
"""
Warm start the parameters using K-means clustering.

Args:
data (np.ndarray): Observed data.
"""
kmeans = KMeans(n_clusters=self.n_components, random_state=0)
labels = kmeans.fit_predict(data.reshape(-1, 1))

for i in range(self.n_components):
cluster_data = data[labels == i]
if len(cluster_data) == 0:
continue
data_mean = np.mean(cluster_data)
data_var = np.var(cluster_data)
self.alphas[i] = data_mean ** 2 / data_var
self.scales[i] = data_var / data_mean
self.weights[i] = len(cluster_data) / len(data)
print(f"Warm start Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")

def fit(self, data: np.ndarray, tol: float = 1e-6, max_iter: int = 100):
"""
Fit the Gamma Mixture Model to the data.

Args:
data (np.ndarray): Observed data.
tol (float): Tolerance for convergence.
max_iter (int): Maximum number of iterations.

Raises:
RuntimeError: If the optimization fails to converge.
"""
self._warm_start(data)

log_likelihood_prev = -np.inf
for iteration in range(max_iter):
responsibilities = self._e_step(data)
self._m_step(data, responsibilities)

log_likelihood = np.sum(np.log(np.sum([w * gamma.pdf(data, a, scale=s) for w, a, s in zip(self.weights, self.alphas, self.scales)], axis=0)))
print(f"Iteration {iteration}: log_likelihood={log_likelihood}")

if np.abs(log_likelihood - log_likelihood_prev) <  tol:
break
log_likelihood_prev = log_likelihood

if np.any(np.isnan(self.weights)) or np.any(np.isnan(self.alphas)) or np.any(np.isnan(self.scales)):
raise ValueError("NaN encountered in parameters after fitting.")

self.fitted = True

def sample(self, n_samples: int) -> np.ndarray:
"""
Sample from the fitted Gamma Mixture Model.

Args:
n_samples (int): Number of samples to generate.

Returns:
np.ndarray: Samples generated from the model.

Raises:
RuntimeError: If the model has not been fitted yet.
"""
if not self.fitted:
raise RuntimeError("Model has not been fitted yet. Fit the model first.")

samples = np.zeros(n_samples)
component_samples = np.random.choice(self.n_components, size=n_samples, p=self.weights)

for i in range(self.n_components):
n_component_samples = np.sum(component_samples == i)
if n_component_samples > 0:
samples[component_samples == i] = gamma.rvs(a=self.alphas[i], scale=self.scales[i], size=n_component_samples)

return samples

# Example usage
np.random.seed(0)
data = np.concatenate([
gamma.rvs(a=2, scale=2, size=300),
gamma.rvs(a=5, scale=1, size=300),
gamma.rvs(a=9, scale=0.5, size=400)
])

gamma_mixture = GammaMixture(n_components=3)
gamma_mixture.fit(data)
samples = gamma_mixture.sample(n_samples=1000)

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(8, 6))
sns.histplot(data, color='blue', kde=True, label='Observed', stat='density')
sns.histplot(samples, color='red', kde=True, label='Sampled', stat='density')
plt.title('Distribution of Observed vs Sampled Data')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()
plt.show()
Aber das führte auch zu Konvergenzproblemen oder einer optisch schlechten Übereinstimmung mit den Daten. Zuletzt habe ich einen Warmstart versucht, bei dem die Momentenmethode verwendet wurde:

Code: Select all

import numpy as np
from scipy.stats import gamma

class GammaMixture:
def __init__(self, n_components: int):
"""
Initialize the Gamma Mixture Model.

Args:
n_components (int): Number of gamma distributions (components) in the mixture.
"""
self.n_components = n_components
self.weights = np.ones(n_components) / n_components
self.alphas = np.ones(n_components)
self.scales = np.ones(n_components)
self.fitted = False

def _e_step(self, data: np.ndarray) ->  np.ndarray:
"""
E-step: Calculate the responsibilities.

Args:
data (np.ndarray): Observed data.

Returns:
np.ndarray: Responsibilities of each component for each data point.
"""
responsibilities = np.zeros((data.shape[0], self.n_components))
for i in range(self.n_components):
responsibilities[:, i] = self.weights[i] * gamma.pdf(data, a=self.alphas[i], scale=self.scales[i])

sum_responsibilities = np.sum(responsibilities, axis=1).reshape(-1, 1)
if np.any(sum_responsibilities == 0):
raise ValueError("Some data points have zero responsibilities.")

responsibilities /= sum_responsibilities
return responsibilities

def _m_step(self, data: np.ndarray, responsibilities: np.ndarray):
"""
M-step: Update the parameters of the gamma distributions and the weights.

Args:
data (np.ndarray): Observed data.
responsibilities (np.ndarray): Responsibilities of each component for each data point.
"""
total_resp = np.sum(responsibilities, axis=0)
self.weights = total_resp / data.shape[0]

for i in range(self.n_components):
resp = responsibilities[:, i]
weighted_data_sum = np.sum(resp * data)
weighted_log_data_sum = np.sum(resp * np.log(data))

if total_resp[i] == 0 or weighted_data_sum == 0 or weighted_log_data_sum == 0:
raise ValueError(f"Invalid weighted sums for component {i}: total_resp={total_resp[i]}, weighted_data_sum={weighted_data_sum}, weighted_log_data_sum={weighted_log_data_sum}")

self.alphas[i] = (total_resp[i] / weighted_log_data_sum)
self.scales[i] = (weighted_data_sum / total_resp[i]) / self.alphas[i]

if np.isnan(self.alphas[i]) or np.isnan(self.scales[i]):
raise ValueError(f"NaN encountered in alphas or scales during M-step for component {i}.")
print(f"Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")

def _warm_start(self, data: np.ndarray):
"""
Warm start the parameters using Method of Moments.

Args:
data (np.ndarray): Observed data.
"""
data_mean = np.mean(data)
data_var = np.var(data)

for i in range(self.n_components):
self.alphas[i] = data_mean ** 2 / data_var
self.scales[i] = data_var / data_mean
self.weights[i] = 1 / self.n_components
print(f"Warm start Component {i}: alpha={self.alphas[i]}, scale={self.scales[i]}, weight={self.weights[i]}")

def fit(self, data: np.ndarray, tol: float = 1e-6, max_iter: int = 100):
"""
Fit the Gamma Mixture Model to the data.

Args:
data (np.ndarray): Observed data.
tol (float): Tolerance for convergence.
max_iter (int): Maximum number of iterations.

Raises:
RuntimeError: If the optimization fails to converge.
"""
self._warm_start(data)

log_likelihood_prev = -np.inf
for iteration in range(max_iter):
responsibilities = self._e_step(data)
self._m_step(data, responsibilities)

log_likelihood = np.sum(np.log(np.sum([w * gamma.pdf(data, a, scale=s) for w, a, s in zip(self.weights, self.alphas, self.scales)], axis=0)))
print(f"Iteration {iteration}: log_likelihood={log_likelihood}")

if np.abs(log_likelihood - log_likelihood_prev) < tol:
break
log_likelihood_prev = log_likelihood

if np.any(np.isnan(self.weights)) or np.any(np.isnan(self.alphas)) or np.any(np.isnan(self.scales)):
raise ValueError("NaN encountered in parameters after fitting.")

self.fitted = True

def sample(self, n_samples: int) ->  np.ndarray:
"""
Sample from the fitted Gamma Mixture Model.

Args:
n_samples (int): Number of samples to generate.

Returns:
np.ndarray: Samples generated from the model.

Raises:
RuntimeError: If the model has not been fitted yet.
"""
if not self.fitted:
raise RuntimeError("Model has not been fitted yet. Fit the model first.")

samples = np.zeros(n_samples)
component_samples = np.random.choice(self.n_components, size=n_samples, p=self.weights)

for i in range(self.n_components):
n_component_samples = np.sum(component_samples == i)
if n_component_samples > 0:
samples[component_samples == i] = gamma.rvs(a=self.alphas[i], scale=self.scales[i], size=n_component_samples)

return samples

# Example usage
np.random.seed(0)
data = np.concatenate([
gamma.rvs(a=2, scale=2, size=300),
gamma.rvs(a=5, scale=1, size=300),
gamma.rvs(a=9, scale=0.5, size=400)
])

gamma_mixture = GammaMixture(n_components=3)
gamma_mixture.fit(data)
samples = gamma_mixture.sample(n_samples=1000)

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(8, 6))
sns.histplot(data, color='blue', kde=True, label='Observed', stat='density')
sns.histplot(samples, color='red', kde=True, label='Sampled', stat='density')
plt.title('Distribution of Observed vs Sampled Data')
plt.xlabel('Value')
plt.ylabel('Density')
plt.legend()
plt.show()
Wie sollte ich eigentlich vorgehen, eine Mischung aus Gammaverteilungen zu implementieren, ohne Bayes zu verwenden?
  • Wikipedia schlägt Heuristiken wie Simulated Annealing vor, um das Verhalten des EM-Algorithmus zu verbessern. Vielleicht könnte man das Gleiche auch für MLE sagen.
  • Etwas, das ich noch nicht erforscht habe, ist die numerische Stabilität der Zielfunktion. Vielleicht ist es schlechter, die PDFs zu berechnen und dann ihre Logarithmen zu bilden, als irgendein anderer Ansatz. Leider lassen sich Logarithmen nicht über eine Summe verteilen, der Logarithmus der Wahrscheinlichkeit einer Mischungsverteilung ist möglicherweise nicht besser. Ich weiß nicht, ob das eine Verbesserung wäre, aber ich könnte mir stattdessen die Berechnung von Taylor-Reihenentwicklungen des logarithmischen Likeloods ansehen.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post