Hier ist mein Code:
Code: Select all
from IPython import get_ipython
import numpy as np
import pandas as pd
import math
import time
from matplotlib import rc
from pandas.plotting import register_matplotlib_converters
import matplotlib.pyplot as plt
from pylab import rcParams
from scipy import signal
import os
import datetime as dt
import seaborn as sns
# PLOTTING SETUP
get_ipython().run_line_magic('matplotlib', 'inline')
get_ipython().run_line_magic('config', "InlineBackend.figure_format='retina'")
register_matplotlib_converters()
sns.set(style='whitegrid', palette='muted', font_scale=1.5)
rcParams['figure.figsize'] = 22, 10
# COMMODITY
dateTimeObj = dt.datetime.now()
today = dateTimeObj.strftime("%Y-%m-%d")
symbol = "BTC-USD"
date = today
def getData():
df = pd.read_csv("bitcoin_data.csv")
# Preprocess the data
# 1. Remove the "Change %" column
df.drop(columns=['Change %'], inplace=True)
# 2. Convert the 'Date' column to datetime format
df['Date'] = pd.to_datetime(df['Date'])
# 3. Rename the 'Price' column to 'Close'
df.rename(columns={'Price': 'Close'}, inplace=True)
# 5. Fill any null values with zero
df.fillna(0, inplace=True)
# Convert 'Vol.' column to numeric values
df['Vol.'] = df['Vol.'].astype(str).str.replace('K', '')
df['Vol.'] = pd.to_numeric(df['Vol.'], errors='coerce')
df['Vol.'] = df['Vol.'].fillna(0) * 1000
# Convert the other columns to numeric
for col in ['Close', 'Open', 'High', 'Low']:
df[col] = df[col].replace({',': ''}, regex=True).astype(float)
return df
# def minmaxTwoMeasures(df, measureMin, measureMax, column, order=3):
# # Use Low values for minima and High values for maxima
# df['DateTmp'] = df.index
# x = np.array(df["DateTmp"].values)
# # Use Low values for finding minima
# y1 = np.array(df["Low"].values)
# # Use High values for finding maxima
# y2 = np.array(df["High"].values)
# sortId = np.argsort(x)
# x = x[sortId]
# y1 = y1[sortId]
# y2 = y2[sortId]
# df[column] = 0
# # Find maxima using High values
# maxm = signal.argrelextrema(y2, np.greater, order=order)
# # Find minima using Low values
# minm = signal.argrelextrema(y1, np.less, order=order)
# for elem in maxm[0]:
# df.iloc[elem, df.columns.get_loc(column)] = 1
# for elem in minm[0]:
# df.iloc[elem, df.columns.get_loc(column)] = -1
# return df.drop(columns=['DateTmp'])
import pandas as pd
import numpy as np
def minmaxTwoMeasures(df):
"""
Process the DataFrame to identify peaks using Highs and troughs using Lows.
Adds a 'FlowMinMax' column where:
- 1 represents peaks (Highs),
- -1 represents troughs (Lows),
- 0 represents all other points.
Returns a DataFrame with columns: Date, Open, High, Close, Low, FlowMinMax.
"""
# Ensure 'Date' is in datetime format
df['Date'] = pd.to_datetime(df['Date'])
# Ensure numeric columns are converted properly
for col in ['Open', 'High', 'Close', 'Low']:
df[col] = df[col].replace({',': ''}, regex=True).astype(float)
# Process Peaks and Troughs
high_data = df['High'].values
low_data = df['Low'].values
# Identify Peaks from High values
doublediff_high = np.diff(np.sign(np.diff(high_data)))
high_peak_locations = np.where(doublediff_high == -2)[0] + 1
# Identify Troughs from Low values
doublediff_low = np.diff(np.sign(np.diff(-1 * low_data)))
low_trough_locations = np.where(doublediff_low == -2)[0] + 1
# Add FlowMinMax column to the DataFrame
df['FlowMinMax'] = 0 # Initialize with 0s
df.loc[df.index[high_peak_locations], 'FlowMinMax'] = 1 # Assign 1 for peaks
df.loc[df.index[low_trough_locations], 'FlowMinMax'] = -1 # Assign -1 for troughs
# Return the required columns
return df[['Date', 'Open', 'High', 'Close', 'Low', 'FlowMinMax']]
def isMin(df,i):
return df["FlowMinMax"].iat[i] == -1
def isMax(df,i):
return df["FlowMinMax"].iat[i] == 1
def distance(x1,y1,x2,y2):
dist = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
return dist
def validate_wave_sequence(df, value, i0, i1, i2, i3, i4, i5, ia, ib, ic, wave_type='up'):
"""
Modified to use High/Low values for validation
"""
try:
if wave_type == 'up':
# Wave 1 (Points 0-1)
for i in range(i0, i1 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i0]: # Use Low for troughs
return False
if df['High'].iloc[i] > df['High'].iloc[i1]: # Use High for peaks
return False
# Wave 2 (Points 1-2)
for i in range(i1, i2 + 1):
if df['High'].iloc[i] > df['High'].iloc[i1]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i2]:
return False
# Wave 3 (Points 2-3)
for i in range(i2, i3 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i2]:
return False
if df['High'].iloc[i] > df['High'].iloc[i3]:
return False
# Wave 4 (Points 3-4)
for i in range(i3, i4 + 1):
if df['High'].iloc[i] > df['High'].iloc[i3]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i4]:
return False
# Wave 5 (Points 4-5)
for i in range(i4, i5 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i4]:
return False
if df['High'].iloc[i] > df['High'].iloc[i5]:
return False
# Corrective Waves (A-B-C)
if ia is not None and ib is not None and ic is not None:
# Wave A (Points 5-ia)
for i in range(i5, ia + 1):
if df['High'].iloc[i] > df['High'].iloc[i5]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[ia]:
return False
# Wave B (Points ia-ib)
for i in range(ia, ib + 1):
if df['Low'].iloc[i] < df['Low'].iloc[ia]:
return False
if df['High'].iloc[i] > df['High'].iloc[ib]:
return False
# Wave C (Points ib-ic)
for i in range(ib, ic + 1):
if df['High'].iloc[i] > df['High'].iloc[ib]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[ic]:
return False
else: # downtrend
# Similar modifications for downtrend using High/Low values
# Wave 1 (Points 0-1)
for i in range(i0, i1 + 1):
if df['High'].iloc[i] > df['High'].iloc[i0]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i1]:
return False
# Wave 2 (Points 1-2)
for i in range(i1, i2 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i1]:
return False
if df['High'].iloc[i] > df['High'].iloc[i2]:
return False
# Wave 3 (Points 2-3)
for i in range(i2, i3 + 1):
if df['High'].iloc[i] > df['High'].iloc[i2]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i3]:
return False
# Wave 4 (Points 3-4)
for i in range(i3, i4 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i3]:
return False
if df['High'].iloc[i] > df['High'].iloc[i4]:
return False
# Wave 5 (Points 4-5)
for i in range(i4, i5 + 1):
if df['High'].iloc[i] > df['High'].iloc[i4]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i5]:
return False
# Corrective Waves (A-B-C)
if ia is not None and ib is not None and ic is not None:
# Wave A (Points 5-ia)
for i in range(i5, ia + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i5]:
return False
if df['High'].iloc[i] > df['High'].iloc[ia]:
return False
# Wave B (Points ia-ib)
for i in range(ia, ib + 1):
if df['High'].iloc[i] > df['High'].iloc[ia]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[ib]:
return False
# Wave C (Points ib-ic)
for i in range(ib, ic + 1):
if df['Low'].iloc[i] < df['Low'].iloc[ib]:
return False
if df['High'].iloc[i] > df['High'].iloc[ic]:
return False
return True
except IndexError:
return False
def isElliottWave(df, value, i0, i1, i2, i3, i4, i5, ia, ib, ic, wave_type='up'):
"""
Check if the given sequence forms an Elliott Wave pattern with comprehensive validation
"""
try:
# First validate the sequence integrity with comprehensive rules
if not validate_wave_sequence(df, value, i0, i1, i2, i3, i4, i5, ia, ib, ic, wave_type):
return None
# Check min and max points based on wave type
if wave_type == 'up':
# For uptrend
if (df['FlowMinMax'].iloc[i0] != -1 or df['FlowMinMax'].iloc[i2] != -1 or
df['FlowMinMax'].iloc[i4] != -1 or df['FlowMinMax'].iloc[ia] != -1 or
df['FlowMinMax'].iloc[ic] != -1):
return None
if (df['FlowMinMax'].iloc[i1] != 1 or df['FlowMinMax'].iloc[i3] != 1 or
df['FlowMinMax'].iloc[i5] != 1 or df['FlowMinMax'].iloc[ib] != 1):
return None
else:
# For downtrend
if (df['FlowMinMax'].iloc[i0] != 1 or df['FlowMinMax'].iloc[i2] != 1 or
df['FlowMinMax'].iloc[i4] != 1 or df['FlowMinMax'].iloc[ia] != 1 or
df['FlowMinMax'].iloc[ic] != 1):
return None
if (df['FlowMinMax'].iloc[i1] != -1 or df['FlowMinMax'].iloc[i3] != -1 or
df['FlowMinMax'].iloc[i5] != -1 or df['FlowMinMax'].iloc[ib] != -1):
return None
# Check wave conditions based on trend type
if wave_type == 'up':
# Check if i5 is the top
if not (df[value].iloc[i5] > df[value].iloc[i1] and
df[value].iloc[i5] > df[value].iloc[i2] and
df[value].iloc[i5] > df[value].iloc[i3] and
df[value].iloc[i5] > df[value].iloc[i4]):
return None
conditions = [
df[value].iloc[i1] > df[value].iloc[i0],
df[value].iloc[i1] > df[value].iloc[i2],
df[value].iloc[i2] > df[value].iloc[i0],
df[value].iloc[i3] > df[value].iloc[i2],
df[value].iloc[i3] > df[value].iloc[i4],
df[value].iloc[i4] > df[value].iloc[i2],
df[value].iloc[i4] > df[value].iloc[i1],
df[value].iloc[i5] > df[value].iloc[i4],
df[value].iloc[i5] > df[value].iloc[i3]
]
else:
# Check if i5 is the bottom
if not (df[value].iloc[i5] < df[value].iloc[i1] and
df[value].iloc[i5] < df[value].iloc[i2] and
df[value].iloc[i5] < df[value].iloc[i3] and
df[value].iloc[i5] < df[value].iloc[i4]):
return None
conditions = [
df[value].iloc[i1] < df[value].iloc[i0],
df[value].iloc[i1] < df[value].iloc[i2],
df[value].iloc[i2] < df[value].iloc[i0],
df[value].iloc[i3] < df[value].iloc[i2],
df[value].iloc[i3] < df[value].iloc[i4],
df[value].iloc[i4] < df[value].iloc[i2],
df[value].iloc[i4] < df[value].iloc[i1],
df[value].iloc[i5] < df[value].iloc[i4],
df[value].iloc[i5] < df[value].iloc[i3]
]
if not all(conditions):
return None
# Calculate wave lengths
w1_len = distance(i0, df[value].iloc[i0], i1, df[value].iloc[i1])
w3_len = distance(i2, df[value].iloc[i2], i3, df[value].iloc[i3])
w5_len = distance(i4, df[value].iloc[i4], i5, df[value].iloc[i5])
if w3_len < w1_len and w3_len < w5_len:
return None
# Check ABC wave conditions based on trend type
if wave_type == 'up':
abc_conditions = [
df[value].iloc[i5] > df[value].iloc[ia],
df[value].iloc[i5] > df[value].iloc[ib],
df[value].iloc[i5] > df[value].iloc[ic],
df[value].iloc[ib] > df[value].iloc[ia],
df[value].iloc[ia] > df[value].iloc[ic],
df[value].iloc[ib] > df[value].iloc[ic]
]
else:
abc_conditions = [
df[value].iloc[i5] < df[value].iloc[ia],
df[value].iloc[i5] < df[value].iloc[ib],
df[value].iloc[i5] < df[value].iloc[ic],
df[value].iloc[ib] < df[value].iloc[ia],
df[value].iloc[ia] < df[value].iloc[ic],
df[value].iloc[ib] < df[value].iloc[ic]
]
if all(abc_conditions):
return [i0, i1, i2, i3, i4, i5, ia, ib, ic]
return [i0, i1, i2, i3, i4, i5]
except IndexError:
return None
def ElliottWaveDiscovery(df, measure, wave_type='up'):
"""Discover Elliott Wave patterns in the data for both uptrend and downtrend"""
print(f"Starting Elliott Wave Discovery for {wave_type}trend...")
df = df.reset_index(drop=True)
def minRange(df, start, end):
return [i for i in range(start, end) if i < len(df) and df['FlowMinMax'].iloc[i] == -1]
def maxRange(df, start, end):
return [i for i in range(start, end) if i < len(df) and df['FlowMinMax'].iloc[i] == 1]
execution_count = 0
start_time = time.time()
waves = []
# For uptrend, start with minima; for downtrend, start with maxima
initial_points = minRange(df, 0, len(df)) if wave_type == 'up' else maxRange(df, 0, len(df))
print(f"Number of {'minimum' if wave_type == 'up' else 'maximum'} points: {len(initial_points)}")
for i0 in initial_points:
next_points_1 = maxRange(df, i0+1, len(df)) if wave_type == 'up' else minRange(df, i0+1, len(df))
for i1 in next_points_1:
next_points_2 = minRange(df, i1+1, len(df)) if wave_type == 'up' else maxRange(df, i1+1, len(df))
for i2 in next_points_2:
next_points_3 = maxRange(df, i2+1, len(df)) if wave_type == 'up' else minRange(df, i2+1, len(df))
for i3 in next_points_3:
next_points_4 = minRange(df, i3+1, len(df)) if wave_type == 'up' else maxRange(df, i3+1, len(df))
for i4 in next_points_4:
next_points_5 = maxRange(df, i4+1, len(df)) if wave_type == 'up' else minRange(df, i4+1, len(df))
for i5 in next_points_5:
# Check wave direction condition
condition = (df[measure].iloc[i5] > df[measure].iloc[i1] and
df[measure].iloc[i5] > df[measure].iloc[i2] and
df[measure].iloc[i5] > df[measure].iloc[i3] and
df[measure].iloc[i5] > df[measure].iloc[i4]) if wave_type == 'up' else (
df[measure].iloc[i5] < df[measure].iloc[i1] and
df[measure].iloc[i5] < df[measure].iloc[i2] and
df[measure].iloc[i5] < df[measure].iloc[i3] and
df[measure].iloc[i5] < df[measure].iloc[i4])
if condition:
next_points_a = minRange(df, i5+1, len(df)) if wave_type == 'up' else maxRange(df, i5+1, len(df))
for ia in next_points_a:
next_points_b = maxRange(df, ia+1, len(df)) if wave_type == 'up' else minRange(df, ia+1, len(df))
for ib in next_points_b:
next_points_c = minRange(df, ib+1, len(df)) if wave_type == 'up' else maxRange(df, ib+1, len(df))
for ic in next_points_c:
wave = isElliottWave(df, measure, i0,i1,i2,i3,i4,i5,ia,ib,ic, wave_type)
execution_count += 1
if wave is not None and wave not in waves:
print(f"Found {wave_type}trend wave: {wave}")
waves.append(wave)
wave_values = [df[measure].iat[idx] for idx in wave]
print(f"Wave Values: {wave_values}")
# Plotting the wave
plt.figure(figsize=(10, 6))
plt.plot(df[measure], label="Full Dataset", color='blue', alpha=0.5)
plt.plot(wave, wave_values, marker='o', label=f"Detected {wave_type}trend Wave", color='red')
plt.title(f"Elliott Wave Detection - {wave_type}trend")
plt.xlabel("Index")
plt.ylabel(measure)
plt.legend()
plt.grid(True)
plt.show()
end_time = time.time()
print(f"Total iterations: {execution_count}")
print(f"Total time taken: {end_time - start_time:.2f} seconds")
print(f"Total {wave_type}trend waves found: {len(waves)}")
return waves
def draw_wave(df, df_waves, w=None):
"""
Draw peaks and troughs of the wave pattern
Parameters:
df: Original dataframe with price data
df_waves: Dataframe containing only the wave points
w: List of indices representing specific wave points (optional)
"""
plt.figure(figsize=(15, 7))
# If no specific wave points provided, use all peaks and troughs
if w is None:
# Convert df_waves to a list of indices where FlowMinMax is not 0
w = list(range(len(df_waves)))
if len(w) > 0:
wave_points_y = []
wave_points_x = []
# Get points for plotting
for i in range(len(df_waves)):
# Determine if it's a peak or trough
if df_waves['FlowMinMax'].iloc[i] != 0: # If it's either a peak or trough
if df_waves['FlowMinMax'].iloc[i] == 1: # Peak
wave_points_y.append(df_waves['High'].iloc[i])
else: # Trough
wave_points_y.append(df_waves['Low'].iloc[i])
wave_points_x.append(i) # Use simple index for x-axis
if len(wave_points_x) > 0: # Only plot if we have points
# Plot wave connections
plt.plot(wave_points_x, wave_points_y, 'r-', linewidth=2, label='Price Movement')
# Plot peaks and troughs with different markers
for i, (x, y) in enumerate(zip(wave_points_x, wave_points_y)):
if df_waves['FlowMinMax'].iloc[x] == 1: # Peak
plt.plot(x, y, 'r^', markersize=10, label='Peak' if i == 0 else "")
else: # Trough
plt.plot(x, y, 'rv', markersize=10, label='Trough' if i == 0 else "")
# Add point labels
plt.annotate(f'P{i}', (x, y), xytext=(5, 5),
textcoords='offset points', color='red')
plt.title("Market Peaks and Troughs")
plt.grid(True)
plt.legend()
plt.show()
# select the waves the best fit the chart
def filterWaveSet(waves, min_len=6, max_len=6, extremes=True):
result = []
for w in waves:
l = len(w)
if min_len