Algo_Trading_Dashboard / indicators.py
Invicto69's picture
Upload 6 files
7d8ad6e verified
raw
history blame
9.45 kB
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class SMC:
def __init__(self, data, swing_hl_window_sz=10):
self.data = data
self.data['Date'] = self.data.index.to_series()
self.swing_hl_window_sz = swing_hl_window_sz
self.order_blocks = self.order_block()
def backtest_buy_signal(self):
bull_ob = self.order_blocks[(self.order_blocks['OB']==1) & (self.order_blocks['MitigatedIndex']!=0)]
arr = np.zeros(len(self.data))
arr[bull_ob['MitigatedIndex'].apply(lambda x: int(x))] = 1
return arr
def backtest_sell_signal(self):
bear_ob = self.order_blocks[(self.order_blocks['OB'] == -1) & (self.order_blocks['MitigatedIndex'] != 0)]
arr = np.zeros(len(self.data))
arr[bear_ob['MitigatedIndex'].apply(lambda x: int(x))] = -1
return arr
def swing_highs_lows(self, window_size):
l = self.data['Low'].reset_index(drop=True)
h = self.data['High'].reset_index(drop=True)
swing_highs = (h.rolling(window_size, center=True).max() / h == 1.)
swing_lows = (l.rolling(window_size, center=True).min() / l == 1.)
return pd.DataFrame({'Date':self.data.index.to_series(), 'highs':swing_highs.values, 'lows':swing_lows.values})
def fvg(self):
"""
FVG - Fair Value Gap
A fair value gap is when the previous high is lower than the next low if the current candle is bullish.
Or when the previous low is higher than the next high if the current candle is bearish.
parameters:
returns:
FVG = 1 if bullish fair value gap, -1 if bearish fair value gap
Top = the top of the fair value gap
Bottom = the bottom of the fair value gap
MitigatedIndex = the index of the candle that mitigated the fair value gap
"""
fvg = np.where(
(
(self.data["High"].shift(1) < self.data["Low"].shift(-1))
& (self.data["Close"] > self.data["Open"])
)
| (
(self.data["Low"].shift(1) > self.data["High"].shift(-1))
& (self.data["Close"] < self.data["Open"])
),
np.where(self.data["Close"] > self.data["Open"], 1, -1),
np.nan,
)
top = np.where(
~np.isnan(fvg),
np.where(
self.data["Close"] > self.data["Open"],
self.data["Low"].shift(-1),
self.data["Low"].shift(1),
),
np.nan,
)
bottom = np.where(
~np.isnan(fvg),
np.where(
self.data["Close"] > self.data["Open"],
self.data["High"].shift(1),
self.data["High"].shift(-1),
),
np.nan,
)
mitigated_index = np.zeros(len(self.data), dtype=np.int32)
for i in np.where(~np.isnan(fvg))[0]:
mask = np.zeros(len(self.data), dtype=np.bool_)
if fvg[i] == 1:
mask = self.data["Low"][i + 2:] <= top[i]
elif fvg[i] == -1:
mask = self.data["High"][i + 2:] >= bottom[i]
if np.any(mask):
j = np.argmax(mask) + i + 2
mitigated_index[i] = j
mitigated_index = np.where(np.isnan(fvg), np.nan, mitigated_index)
return pd.concat(
[
pd.Series(fvg.flatten(), name="FVG"),
pd.Series(top.flatten(), name="Top"),
pd.Series(bottom.flatten(), name="Bottom"),
pd.Series(mitigated_index.flatten(), name="MitigatedIndex"),
],
axis=1,
)
def order_block(self, imb_perc=.1, join_consecutive=True):
hl = self.swing_highs_lows(self.swing_hl_window_sz)
ob = np.where(
(
((self.data["High"]*((100+imb_perc)/100)) < self.data["Low"].shift(-2))
& ((hl['lows']==True) | (hl['lows'].shift(1)==True))
)
| (
(self.data["Low"] > (self.data["High"].shift(-2)*((100+imb_perc)/100)))
& ((hl['highs']==True) | (hl['highs'].shift(1)==True))
),
np.where(((hl['lows']==True) | (hl['lows'].shift(1)==True)), 1, -1),
np.nan,
)
# print(ob)
top = np.where(
~np.isnan(ob),
np.where(
self.data["Close"] > self.data["Open"],
self.data["Low"].shift(-2),
self.data["Low"],
),
np.nan,
)
bottom = np.where(
~np.isnan(ob),
np.where(
self.data["Close"] > self.data["Open"],
self.data["High"],
self.data["High"].shift(-2),
),
np.nan,
)
# if join_consecutive:
# for i in range(len(ob) - 1):
# if ob[i] == ob[i + 1]:
# top[i + 1] = max(top[i], top[i + 1])
# bottom[i + 1] = min(bottom[i], bottom[i + 1])
# ob[i] = top[i] = bottom[i] = np.nan
mitigated_index = np.zeros(len(self.data), dtype=np.int32)
for i in np.where(~np.isnan(ob))[0]:
mask = np.zeros(len(self.data), dtype=np.bool_)
if ob[i] == 1:
mask = self.data["Low"][i + 3:] <= top[i]
elif ob[i] == -1:
mask = self.data["High"][i + 3:] >= bottom[i]
if np.any(mask):
j = np.argmax(mask) + i + 3
mitigated_index[i] = int(j)
ob = ob.flatten()
mitigated_index1 = np.where(np.isnan(ob), np.nan, mitigated_index)
return pd.concat(
[
pd.Series(ob.flatten(), name="OB"),
pd.Series(top.flatten(), name="Top"),
pd.Series(bottom.flatten(), name="Bottom"),
pd.Series(mitigated_index1.flatten(), name="MitigatedIndex"),
],
axis=1,
).dropna(subset=['OB'])
def plot(self, swing_hl=True, show=True):
fig = make_subplots(1, 1)
# plot the candle stick graph
fig.add_trace(go.Candlestick(x=self.data.index.to_series(),
open=self.data['Open'],
high=self.data['High'],
low=self.data['Low'],
close=self.data['Close'],
name='ohlc'))
# grab first and last observations from df.date and make a continuous date range from that
dt_all = pd.date_range(start=self.data['Date'].iloc[0], end=self.data['Date'].iloc[-1], freq='5min')
# check which dates from your source that also accur in the continuous date range
dt_obs = [d.strftime("%Y-%m-%d %H:%M:%S") for d in self.data['Date']]
# isolate missing timestamps
dt_breaks = [d for d in dt_all.strftime("%Y-%m-%d %H:%M:%S").tolist() if not d in dt_obs]
# adjust xaxis for rangebreaks
fig.update_xaxes(rangebreaks=[dict(dvalue=5 * 60 * 1000, values=dt_breaks)])
print(self.order_blocks.head())
print(self.order_blocks.index.to_list())
ob_df = self.data.iloc[self.order_blocks.index.to_list()]
# print(ob_df)
fig.add_trace(go.Scatter(
x=ob_df['Date'],
y=ob_df['Low'],
name="Order Block",
mode='markers',
marker_symbol='diamond-dot',
marker_size=13,
marker_line_width=2,
# offsetgroup=0,
))
if swing_hl:
hl = self.swing_highs_lows(self.swing_hl_window_sz)
h = hl[(hl['highs']==True)]
l = hl[hl['lows']==True]
# print(h)
# exit(0)
fig.add_trace(go.Scatter(
x=h['Date'],
y=self.data[self.data.Date.isin(h['Date'])]['High']*(100.1/100),
mode='markers',
marker_symbol="triangle-up-dot",
marker_size=10,
name='Swing High',
# offsetgroup=2,
))
fig.add_trace(go.Scatter(
x=l['Date'],
y=self.data[self.data.Date.isin(l['Date'])]['Low']*(99.9/100),
mode='markers',
marker_symbol="triangle-down-dot",
marker_size=10,
name='Swing Low',
marker_color='red',
# offsetgroup=2,
))
fig.update_layout(xaxis_rangeslider_visible=False)
if show:
fig.show()
return fig
def EMA(array, n):
return pd.Series(array).ewm(span=n, adjust=False).mean()
if __name__ == "__main__":
from data_fetcher import fetch
data = fetch('ICICIBANK.NS', period='1mo', interval='15m')
# data = fetch('RELIANCE.NS', period='1mo', interval='15m')
# print(SMC(data).backtest_buy_signal())
SMC(data).plot()