Wie kann ich die Elliott-Wellen-Erkennung in großen Kryptowährungsdatensätzen optimieren?

Post a reply

Smilies
:) :( :oops: :chelo: :roll: :wink: :muza: :sorry: :angel: :read: *x) :clever:
View more smilies

BBCode is ON
[img] is ON
[flash] is OFF
[url] is ON
Smilies are ON

Topic review
   

Expand view Topic review: Wie kann ich die Elliott-Wellen-Erkennung in großen Kryptowährungsdatensätzen optimieren?

by Guest » 28 Dec 2024, 18:21

Ich arbeite an einem Python-Programm, das Elliott-Wellen-Muster in Marktdaten für Kryptowährungen identifiziert. Die aktuelle Implementierung verwendet verschachtelte Schleifen und umfassende Suchvorgänge, was den Rechenaufwand für große Datenmengen erhöht.
Hier ist mein Code:

Code: Select all

from IPython import get_ipython
import numpy as np
import pandas as pd
import math
import time

from matplotlib import rc
from pandas.plotting import register_matplotlib_converters
import matplotlib.pyplot as plt
from pylab import rcParams
from scipy import signal

import os
import datetime as dt
import seaborn as sns

# PLOTTING SETUP
get_ipython().run_line_magic('matplotlib', 'inline')
get_ipython().run_line_magic('config', "InlineBackend.figure_format='retina'")
register_matplotlib_converters()
sns.set(style='whitegrid', palette='muted', font_scale=1.5)
rcParams['figure.figsize'] = 22, 10

# COMMODITY
dateTimeObj = dt.datetime.now()
today = dateTimeObj.strftime("%Y-%m-%d")

symbol = "BTC-USD"
date = today

def getData():
df = pd.read_csv("bitcoin_data.csv")

# Preprocess the data
# 1. Remove the "Change %" column
df.drop(columns=['Change %'], inplace=True)

# 2. Convert the 'Date' column to datetime format
df['Date'] = pd.to_datetime(df['Date'])

# 3. Rename the 'Price' column to 'Close'
df.rename(columns={'Price': 'Close'}, inplace=True)

# 5.  Fill any null values with zero
df.fillna(0, inplace=True)

# Convert 'Vol.' column to numeric values
df['Vol.'] = df['Vol.'].astype(str).str.replace('K', '')
df['Vol.'] = pd.to_numeric(df['Vol.'], errors='coerce')
df['Vol.'] = df['Vol.'].fillna(0) * 1000

# Convert the other columns to numeric
for col in ['Close', 'Open', 'High', 'Low']:
df[col] = df[col].replace({',': ''}, regex=True).astype(float)

return df

# def minmaxTwoMeasures(df, measureMin, measureMax, column, order=3):
#     # Use Low values for minima and High values for maxima
#     df['DateTmp'] = df.index
#     x = np.array(df["DateTmp"].values)

#     # Use Low values for finding minima
#     y1 = np.array(df["Low"].values)
#     # Use High values for finding maxima
#     y2 = np.array(df["High"].values)

#     sortId = np.argsort(x)
#     x = x[sortId]
#     y1 = y1[sortId]
#     y2 = y2[sortId]

#     df[column] = 0

#     # Find maxima using High values
#     maxm = signal.argrelextrema(y2, np.greater, order=order)
#     # Find minima using Low values
#     minm = signal.argrelextrema(y1, np.less, order=order)

#     for elem in maxm[0]:
#         df.iloc[elem, df.columns.get_loc(column)] = 1
#     for elem in minm[0]:
#         df.iloc[elem, df.columns.get_loc(column)] = -1

#     return df.drop(columns=['DateTmp'])

import pandas as pd
import numpy as np

def minmaxTwoMeasures(df):
"""
Process the DataFrame to identify peaks using Highs and troughs using Lows.
Adds a 'FlowMinMax' column where:
- 1 represents peaks (Highs),
- -1 represents troughs (Lows),
- 0 represents all other points.

Returns a DataFrame with columns: Date, Open, High, Close, Low, FlowMinMax.
"""
# Ensure 'Date' is in datetime format
df['Date'] = pd.to_datetime(df['Date'])

# Ensure numeric columns are converted properly
for col in ['Open', 'High', 'Close', 'Low']:
df[col] = df[col].replace({',': ''}, regex=True).astype(float)

# Process Peaks and Troughs
high_data = df['High'].values
low_data = df['Low'].values

# Identify Peaks from High values
doublediff_high = np.diff(np.sign(np.diff(high_data)))
high_peak_locations = np.where(doublediff_high == -2)[0] + 1

# Identify Troughs from Low values
doublediff_low = np.diff(np.sign(np.diff(-1 * low_data)))
low_trough_locations = np.where(doublediff_low == -2)[0] + 1

# Add FlowMinMax column to the DataFrame
df['FlowMinMax'] = 0  # Initialize with 0s
df.loc[df.index[high_peak_locations], 'FlowMinMax'] = 1  # Assign 1 for peaks
df.loc[df.index[low_trough_locations], 'FlowMinMax'] = -1  # Assign -1 for troughs

# Return the required columns
return df[['Date', 'Open', 'High', 'Close', 'Low', 'FlowMinMax']]

def isMin(df,i):
return df["FlowMinMax"].iat[i] == -1

def isMax(df,i):
return df["FlowMinMax"].iat[i] == 1

def distance(x1,y1,x2,y2):
dist = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
return dist

def validate_wave_sequence(df, value, i0, i1, i2, i3, i4, i5, ia, ib, ic, wave_type='up'):
"""
Modified to use High/Low values for validation
"""
try:
if wave_type == 'up':
# Wave 1 (Points 0-1)
for i in range(i0, i1 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i0]:  # Use Low for troughs
return False
if df['High'].iloc[i] > df['High'].iloc[i1]:  # Use High for peaks
return False

# Wave 2 (Points 1-2)
for i in range(i1, i2 + 1):
if df['High'].iloc[i] > df['High'].iloc[i1]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i2]:
return False

# Wave 3 (Points 2-3)
for i in range(i2, i3 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i2]:
return False
if df['High'].iloc[i] > df['High'].iloc[i3]:
return False

# Wave 4 (Points 3-4)
for i in range(i3, i4 + 1):
if df['High'].iloc[i] > df['High'].iloc[i3]:
return False
if df['Low'].iloc[i] <  df['Low'].iloc[i4]:
return False

# Wave 5 (Points 4-5)
for i in range(i4, i5 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i4]:
return False
if df['High'].iloc[i] > df['High'].iloc[i5]:
return False

# Corrective Waves (A-B-C)
if ia is not None and ib is not None and ic is not None:
# Wave A (Points 5-ia)
for i in range(i5, ia + 1):
if df['High'].iloc[i] > df['High'].iloc[i5]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[ia]:
return False

# Wave B (Points ia-ib)
for i in range(ia, ib + 1):
if df['Low'].iloc[i] < df['Low'].iloc[ia]:
return False
if df['High'].iloc[i] > df['High'].iloc[ib]:
return False

# Wave C (Points ib-ic)
for i in range(ib, ic + 1):
if df['High'].iloc[i] > df['High'].iloc[ib]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[ic]:
return False

else:  # downtrend
# Similar modifications for downtrend using High/Low values
# Wave 1 (Points 0-1)
for i in range(i0, i1 + 1):
if df['High'].iloc[i] > df['High'].iloc[i0]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i1]:
return False

# Wave 2 (Points 1-2)
for i in range(i1, i2 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i1]:
return False
if df['High'].iloc[i] > df['High'].iloc[i2]:
return False

# Wave 3 (Points 2-3)
for i in range(i2, i3 + 1):
if df['High'].iloc[i] > df['High'].iloc[i2]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i3]:
return False

# Wave 4 (Points 3-4)
for i in range(i3, i4 + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i3]:
return False
if df['High'].iloc[i] > df['High'].iloc[i4]:
return False

# Wave 5 (Points 4-5)
for i in range(i4, i5 + 1):
if df['High'].iloc[i] > df['High'].iloc[i4]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[i5]:
return False

# Corrective Waves (A-B-C)
if ia is not None and ib is not None and ic is not None:
# Wave A (Points 5-ia)
for i in range(i5, ia + 1):
if df['Low'].iloc[i] < df['Low'].iloc[i5]:
return False
if df['High'].iloc[i] > df['High'].iloc[ia]:
return False

# Wave B (Points ia-ib)
for i in range(ia, ib + 1):
if df['High'].iloc[i] > df['High'].iloc[ia]:
return False
if df['Low'].iloc[i] < df['Low'].iloc[ib]:
return False

# Wave C (Points ib-ic)
for i in range(ib, ic + 1):
if df['Low'].iloc[i] < df['Low'].iloc[ib]:
return False
if df['High'].iloc[i] >  df['High'].iloc[ic]:
return False

return True
except IndexError:
return False

def isElliottWave(df, value, i0, i1, i2, i3, i4, i5, ia, ib, ic, wave_type='up'):
"""
Check if the given sequence forms an Elliott Wave pattern with comprehensive validation
"""
try:
# First validate the sequence integrity with comprehensive rules
if not validate_wave_sequence(df, value, i0, i1, i2, i3, i4, i5, ia, ib, ic, wave_type):
return None

# Check min and max points based on wave type
if wave_type == 'up':
# For uptrend
if (df['FlowMinMax'].iloc[i0] != -1 or df['FlowMinMax'].iloc[i2] != -1 or
df['FlowMinMax'].iloc[i4] != -1 or df['FlowMinMax'].iloc[ia] != -1 or
df['FlowMinMax'].iloc[ic] != -1):
return None

if (df['FlowMinMax'].iloc[i1] != 1 or df['FlowMinMax'].iloc[i3] != 1 or
df['FlowMinMax'].iloc[i5] != 1 or df['FlowMinMax'].iloc[ib] != 1):
return None
else:
# For downtrend
if (df['FlowMinMax'].iloc[i0] != 1 or df['FlowMinMax'].iloc[i2] != 1 or
df['FlowMinMax'].iloc[i4] != 1 or df['FlowMinMax'].iloc[ia] != 1 or
df['FlowMinMax'].iloc[ic] != 1):
return None

if (df['FlowMinMax'].iloc[i1] != -1 or df['FlowMinMax'].iloc[i3] != -1 or
df['FlowMinMax'].iloc[i5] != -1 or df['FlowMinMax'].iloc[ib] != -1):
return None

# Check wave conditions based on trend type
if wave_type == 'up':
# Check if i5 is the top
if not (df[value].iloc[i5] > df[value].iloc[i1] and
df[value].iloc[i5] > df[value].iloc[i2] and
df[value].iloc[i5] > df[value].iloc[i3] and
df[value].iloc[i5] > df[value].iloc[i4]):
return None

conditions = [
df[value].iloc[i1] > df[value].iloc[i0],
df[value].iloc[i1] > df[value].iloc[i2],
df[value].iloc[i2] > df[value].iloc[i0],
df[value].iloc[i3] > df[value].iloc[i2],
df[value].iloc[i3] > df[value].iloc[i4],
df[value].iloc[i4] > df[value].iloc[i2],
df[value].iloc[i4] > df[value].iloc[i1],
df[value].iloc[i5] > df[value].iloc[i4],
df[value].iloc[i5] > df[value].iloc[i3]
]
else:
# Check if i5 is the bottom
if not (df[value].iloc[i5] < df[value].iloc[i1] and
df[value].iloc[i5] < df[value].iloc[i2] and
df[value].iloc[i5] < df[value].iloc[i3] and
df[value].iloc[i5] < df[value].iloc[i4]):
return None

conditions = [
df[value].iloc[i1] < df[value].iloc[i0],
df[value].iloc[i1] < df[value].iloc[i2],
df[value].iloc[i2] < df[value].iloc[i0],
df[value].iloc[i3] < df[value].iloc[i2],
df[value].iloc[i3] < df[value].iloc[i4],
df[value].iloc[i4] < df[value].iloc[i2],
df[value].iloc[i4] < df[value].iloc[i1],
df[value].iloc[i5] < df[value].iloc[i4],
df[value].iloc[i5] < df[value].iloc[i3]
]

if not all(conditions):
return None

# Calculate wave lengths
w1_len = distance(i0, df[value].iloc[i0], i1, df[value].iloc[i1])
w3_len = distance(i2, df[value].iloc[i2], i3, df[value].iloc[i3])
w5_len = distance(i4, df[value].iloc[i4], i5, df[value].iloc[i5])

if w3_len < w1_len and w3_len < w5_len:
return None

# Check ABC wave conditions based on trend type
if wave_type == 'up':
abc_conditions = [
df[value].iloc[i5] > df[value].iloc[ia],
df[value].iloc[i5] > df[value].iloc[ib],
df[value].iloc[i5] > df[value].iloc[ic],
df[value].iloc[ib] > df[value].iloc[ia],
df[value].iloc[ia] > df[value].iloc[ic],
df[value].iloc[ib] > df[value].iloc[ic]
]
else:
abc_conditions = [
df[value].iloc[i5] <  df[value].iloc[ia],
df[value].iloc[i5] < df[value].iloc[ib],
df[value].iloc[i5] < df[value].iloc[ic],
df[value].iloc[ib] < df[value].iloc[ia],
df[value].iloc[ia] < df[value].iloc[ic],
df[value].iloc[ib] < df[value].iloc[ic]
]

if all(abc_conditions):
return [i0, i1, i2, i3, i4, i5, ia, ib, ic]

return [i0, i1, i2, i3, i4, i5]

except IndexError:
return None

def ElliottWaveDiscovery(df, measure, wave_type='up'):
"""Discover Elliott Wave patterns in the data for both uptrend and downtrend"""
print(f"Starting Elliott Wave Discovery for {wave_type}trend...")
df = df.reset_index(drop=True)

def minRange(df, start, end):
return [i for i in range(start, end) if i < len(df) and df['FlowMinMax'].iloc[i] == -1]

def maxRange(df, start, end):
return [i for i in range(start, end) if i < len(df) and df['FlowMinMax'].iloc[i] == 1]

execution_count = 0
start_time = time.time()
waves = []

# For uptrend, start with minima; for downtrend, start with maxima
initial_points = minRange(df, 0, len(df)) if wave_type == 'up' else maxRange(df, 0, len(df))
print(f"Number of {'minimum' if wave_type == 'up' else 'maximum'} points: {len(initial_points)}")

for i0 in initial_points:
next_points_1 = maxRange(df, i0+1, len(df)) if wave_type == 'up' else minRange(df, i0+1, len(df))
for i1 in next_points_1:
next_points_2 = minRange(df, i1+1, len(df)) if wave_type == 'up' else maxRange(df, i1+1, len(df))
for i2 in next_points_2:
next_points_3 = maxRange(df, i2+1, len(df)) if wave_type == 'up' else minRange(df, i2+1, len(df))
for i3 in next_points_3:
next_points_4 = minRange(df, i3+1, len(df)) if wave_type == 'up' else maxRange(df, i3+1, len(df))
for i4 in next_points_4:
next_points_5 = maxRange(df, i4+1, len(df)) if wave_type == 'up' else minRange(df, i4+1, len(df))
for i5 in next_points_5:
# Check wave direction condition
condition = (df[measure].iloc[i5] > df[measure].iloc[i1] and
df[measure].iloc[i5] > df[measure].iloc[i2] and
df[measure].iloc[i5] > df[measure].iloc[i3] and
df[measure].iloc[i5] > df[measure].iloc[i4]) if wave_type == 'up' else (
df[measure].iloc[i5] < df[measure].iloc[i1] and
df[measure].iloc[i5] < df[measure].iloc[i2] and
df[measure].iloc[i5] < df[measure].iloc[i3] and
df[measure].iloc[i5] <  df[measure].iloc[i4])

if condition:
next_points_a = minRange(df, i5+1, len(df)) if wave_type == 'up' else maxRange(df, i5+1, len(df))
for ia in next_points_a:
next_points_b = maxRange(df, ia+1, len(df)) if wave_type == 'up' else minRange(df, ia+1, len(df))
for ib in next_points_b:
next_points_c = minRange(df, ib+1, len(df)) if wave_type == 'up' else maxRange(df, ib+1, len(df))
for ic in next_points_c:
wave = isElliottWave(df, measure, i0,i1,i2,i3,i4,i5,ia,ib,ic, wave_type)
execution_count += 1
if wave is not None and wave not in waves:
print(f"Found {wave_type}trend wave: {wave}")
waves.append(wave)
wave_values = [df[measure].iat[idx] for idx in wave]
print(f"Wave Values: {wave_values}")

# Plotting the wave
plt.figure(figsize=(10, 6))
plt.plot(df[measure], label="Full Dataset", color='blue', alpha=0.5)
plt.plot(wave, wave_values, marker='o', label=f"Detected {wave_type}trend Wave", color='red')
plt.title(f"Elliott Wave Detection - {wave_type}trend")
plt.xlabel("Index")
plt.ylabel(measure)
plt.legend()
plt.grid(True)
plt.show()

end_time = time.time()
print(f"Total iterations: {execution_count}")
print(f"Total time taken: {end_time - start_time:.2f} seconds")
print(f"Total {wave_type}trend waves found: {len(waves)}")

return waves

def draw_wave(df, df_waves, w=None):
"""
Draw peaks and troughs of the wave pattern

Parameters:
df: Original dataframe with price data
df_waves: Dataframe containing only the wave points
w: List of indices representing specific wave points (optional)
"""
plt.figure(figsize=(15, 7))

# If no specific wave points provided, use all peaks and troughs
if w is None:
# Convert df_waves to a list of indices where FlowMinMax is not 0
w = list(range(len(df_waves)))

if len(w) > 0:
wave_points_y = []
wave_points_x = []

# Get points for plotting
for i in range(len(df_waves)):
# Determine if it's a peak or trough
if df_waves['FlowMinMax'].iloc[i] != 0:  # If it's either a peak or trough
if df_waves['FlowMinMax'].iloc[i] == 1:  # Peak
wave_points_y.append(df_waves['High'].iloc[i])
else:  # Trough
wave_points_y.append(df_waves['Low'].iloc[i])
wave_points_x.append(i)  # Use simple index for x-axis

if len(wave_points_x) >  0:  # Only plot if we have points
# Plot wave connections
plt.plot(wave_points_x, wave_points_y, 'r-', linewidth=2, label='Price Movement')

# Plot peaks and troughs with different markers
for i, (x, y) in enumerate(zip(wave_points_x, wave_points_y)):
if df_waves['FlowMinMax'].iloc[x] == 1:  # Peak
plt.plot(x, y, 'r^', markersize=10, label='Peak' if i == 0 else "")
else:  # Trough
plt.plot(x, y, 'rv', markersize=10, label='Trough' if i == 0 else "")

# Add point labels
plt.annotate(f'P{i}', (x, y), xytext=(5, 5),
textcoords='offset points', color='red')

plt.title("Market Peaks and Troughs")
plt.grid(True)
plt.legend()
plt.show()

# select the waves the best fit the chart
def filterWaveSet(waves, min_len=6, max_len=6, extremes=True):

result = []
for w in waves:
l = len(w)
if min_len

Top