|
import pandas as pd
|
|
import numpy as np
|
|
import plotly.graph_objects as go
|
|
from plotly.subplots import make_subplots
|
|
|
|
class SMC:
|
|
def __init__(self, data, swing_hl_window_sz=10):
|
|
self.data = data
|
|
self.data['Date'] = self.data.index.to_series()
|
|
self.swing_hl_window_sz = swing_hl_window_sz
|
|
self.order_blocks = self.order_block()
|
|
|
|
def backtest_buy_signal(self):
|
|
bull_ob = self.order_blocks[(self.order_blocks['OB']==1) & (self.order_blocks['MitigatedIndex']!=0)]
|
|
arr = np.zeros(len(self.data))
|
|
arr[bull_ob['MitigatedIndex'].apply(lambda x: int(x))] = 1
|
|
return arr
|
|
|
|
def backtest_sell_signal(self):
|
|
bear_ob = self.order_blocks[(self.order_blocks['OB'] == -1) & (self.order_blocks['MitigatedIndex'] != 0)]
|
|
arr = np.zeros(len(self.data))
|
|
arr[bear_ob['MitigatedIndex'].apply(lambda x: int(x))] = -1
|
|
return arr
|
|
|
|
def swing_highs_lows(self, window_size):
|
|
l = self.data['Low'].reset_index(drop=True)
|
|
h = self.data['High'].reset_index(drop=True)
|
|
swing_highs = (h.rolling(window_size, center=True).max() / h == 1.)
|
|
swing_lows = (l.rolling(window_size, center=True).min() / l == 1.)
|
|
return pd.DataFrame({'Date':self.data.index.to_series(), 'highs':swing_highs.values, 'lows':swing_lows.values})
|
|
|
|
def fvg(self):
|
|
"""
|
|
FVG - Fair Value Gap
|
|
A fair value gap is when the previous high is lower than the next low if the current candle is bullish.
|
|
Or when the previous low is higher than the next high if the current candle is bearish.
|
|
|
|
parameters:
|
|
|
|
returns:
|
|
FVG = 1 if bullish fair value gap, -1 if bearish fair value gap
|
|
Top = the top of the fair value gap
|
|
Bottom = the bottom of the fair value gap
|
|
MitigatedIndex = the index of the candle that mitigated the fair value gap
|
|
"""
|
|
|
|
fvg = np.where(
|
|
(
|
|
(self.data["High"].shift(1) < self.data["Low"].shift(-1))
|
|
& (self.data["Close"] > self.data["Open"])
|
|
)
|
|
| (
|
|
(self.data["Low"].shift(1) > self.data["High"].shift(-1))
|
|
& (self.data["Close"] < self.data["Open"])
|
|
),
|
|
np.where(self.data["Close"] > self.data["Open"], 1, -1),
|
|
np.nan,
|
|
)
|
|
|
|
top = np.where(
|
|
~np.isnan(fvg),
|
|
np.where(
|
|
self.data["Close"] > self.data["Open"],
|
|
self.data["Low"].shift(-1),
|
|
self.data["Low"].shift(1),
|
|
),
|
|
np.nan,
|
|
)
|
|
|
|
bottom = np.where(
|
|
~np.isnan(fvg),
|
|
np.where(
|
|
self.data["Close"] > self.data["Open"],
|
|
self.data["High"].shift(1),
|
|
self.data["High"].shift(-1),
|
|
),
|
|
np.nan,
|
|
)
|
|
|
|
mitigated_index = np.zeros(len(self.data), dtype=np.int32)
|
|
for i in np.where(~np.isnan(fvg))[0]:
|
|
mask = np.zeros(len(self.data), dtype=np.bool_)
|
|
if fvg[i] == 1:
|
|
mask = self.data["Low"][i + 2:] <= top[i]
|
|
elif fvg[i] == -1:
|
|
mask = self.data["High"][i + 2:] >= bottom[i]
|
|
if np.any(mask):
|
|
j = np.argmax(mask) + i + 2
|
|
mitigated_index[i] = j
|
|
|
|
mitigated_index = np.where(np.isnan(fvg), np.nan, mitigated_index)
|
|
|
|
return pd.concat(
|
|
[
|
|
pd.Series(fvg.flatten(), name="FVG"),
|
|
pd.Series(top.flatten(), name="Top"),
|
|
pd.Series(bottom.flatten(), name="Bottom"),
|
|
pd.Series(mitigated_index.flatten(), name="MitigatedIndex"),
|
|
],
|
|
axis=1,
|
|
)
|
|
|
|
def order_block(self, imb_perc=.1, join_consecutive=True):
|
|
hl = self.swing_highs_lows(self.swing_hl_window_sz)
|
|
|
|
ob = np.where(
|
|
(
|
|
((self.data["High"]*((100+imb_perc)/100)) < self.data["Low"].shift(-2))
|
|
& ((hl['lows']==True) | (hl['lows'].shift(1)==True))
|
|
)
|
|
| (
|
|
(self.data["Low"] > (self.data["High"].shift(-2)*((100+imb_perc)/100)))
|
|
& ((hl['highs']==True) | (hl['highs'].shift(1)==True))
|
|
),
|
|
np.where(((hl['lows']==True) | (hl['lows'].shift(1)==True)), 1, -1),
|
|
np.nan,
|
|
)
|
|
|
|
|
|
|
|
top = np.where(
|
|
~np.isnan(ob),
|
|
np.where(
|
|
self.data["Close"] > self.data["Open"],
|
|
self.data["Low"].shift(-2),
|
|
self.data["Low"],
|
|
),
|
|
np.nan,
|
|
)
|
|
|
|
bottom = np.where(
|
|
~np.isnan(ob),
|
|
np.where(
|
|
self.data["Close"] > self.data["Open"],
|
|
self.data["High"],
|
|
self.data["High"].shift(-2),
|
|
),
|
|
np.nan,
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mitigated_index = np.zeros(len(self.data), dtype=np.int32)
|
|
for i in np.where(~np.isnan(ob))[0]:
|
|
mask = np.zeros(len(self.data), dtype=np.bool_)
|
|
if ob[i] == 1:
|
|
mask = self.data["Low"][i + 3:] <= top[i]
|
|
elif ob[i] == -1:
|
|
mask = self.data["High"][i + 3:] >= bottom[i]
|
|
if np.any(mask):
|
|
j = np.argmax(mask) + i + 3
|
|
mitigated_index[i] = int(j)
|
|
ob = ob.flatten()
|
|
mitigated_index1 = np.where(np.isnan(ob), np.nan, mitigated_index)
|
|
|
|
return pd.concat(
|
|
[
|
|
pd.Series(ob.flatten(), name="OB"),
|
|
pd.Series(top.flatten(), name="Top"),
|
|
pd.Series(bottom.flatten(), name="Bottom"),
|
|
pd.Series(mitigated_index1.flatten(), name="MitigatedIndex"),
|
|
],
|
|
axis=1,
|
|
).dropna(subset=['OB'])
|
|
|
|
def plot(self, swing_hl=True, show=True):
|
|
fig = make_subplots(1, 1)
|
|
|
|
|
|
fig.add_trace(go.Candlestick(x=self.data.index.to_series(),
|
|
open=self.data['Open'],
|
|
high=self.data['High'],
|
|
low=self.data['Low'],
|
|
close=self.data['Close'],
|
|
name='ohlc'))
|
|
|
|
|
|
dt_all = pd.date_range(start=self.data['Date'].iloc[0], end=self.data['Date'].iloc[-1], freq='5min')
|
|
|
|
|
|
dt_obs = [d.strftime("%Y-%m-%d %H:%M:%S") for d in self.data['Date']]
|
|
|
|
|
|
dt_breaks = [d for d in dt_all.strftime("%Y-%m-%d %H:%M:%S").tolist() if not d in dt_obs]
|
|
|
|
|
|
fig.update_xaxes(rangebreaks=[dict(dvalue=5 * 60 * 1000, values=dt_breaks)])
|
|
|
|
print(self.order_blocks.head())
|
|
print(self.order_blocks.index.to_list())
|
|
|
|
ob_df = self.data.iloc[self.order_blocks.index.to_list()]
|
|
|
|
|
|
fig.add_trace(go.Scatter(
|
|
x=ob_df['Date'],
|
|
y=ob_df['Low'],
|
|
name="Order Block",
|
|
mode='markers',
|
|
marker_symbol='diamond-dot',
|
|
marker_size=13,
|
|
marker_line_width=2,
|
|
|
|
))
|
|
|
|
if swing_hl:
|
|
hl = self.swing_highs_lows(self.swing_hl_window_sz)
|
|
h = hl[(hl['highs']==True)]
|
|
l = hl[hl['lows']==True]
|
|
|
|
|
|
fig.add_trace(go.Scatter(
|
|
x=h['Date'],
|
|
y=self.data[self.data.Date.isin(h['Date'])]['High']*(100.1/100),
|
|
mode='markers',
|
|
marker_symbol="triangle-up-dot",
|
|
marker_size=10,
|
|
name='Swing High',
|
|
|
|
))
|
|
fig.add_trace(go.Scatter(
|
|
x=l['Date'],
|
|
y=self.data[self.data.Date.isin(l['Date'])]['Low']*(99.9/100),
|
|
mode='markers',
|
|
marker_symbol="triangle-down-dot",
|
|
marker_size=10,
|
|
name='Swing Low',
|
|
marker_color='red',
|
|
|
|
))
|
|
|
|
fig.update_layout(xaxis_rangeslider_visible=False)
|
|
if show:
|
|
fig.show()
|
|
return fig
|
|
|
|
|
|
def EMA(array, n):
|
|
return pd.Series(array).ewm(span=n, adjust=False).mean()
|
|
|
|
if __name__ == "__main__":
|
|
from data_fetcher import fetch
|
|
data = fetch('ICICIBANK.NS', period='1mo', interval='15m')
|
|
|
|
|
|
|
|
SMC(data).plot() |