Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
class SMC: | |
def __init__(self, data, swing_hl_window_sz=10): | |
self.data = data | |
self.data['Date'] = self.data.index.to_series() | |
self.swing_hl_window_sz = swing_hl_window_sz | |
self.order_blocks = self.order_block() | |
def backtest_buy_signal(self): | |
bull_ob = self.order_blocks[(self.order_blocks['OB']==1) & (self.order_blocks['MitigatedIndex']!=0)] | |
arr = np.zeros(len(self.data)) | |
arr[bull_ob['MitigatedIndex'].apply(lambda x: int(x))] = 1 | |
return arr | |
def backtest_sell_signal(self): | |
bear_ob = self.order_blocks[(self.order_blocks['OB'] == -1) & (self.order_blocks['MitigatedIndex'] != 0)] | |
arr = np.zeros(len(self.data)) | |
arr[bear_ob['MitigatedIndex'].apply(lambda x: int(x))] = -1 | |
return arr | |
def swing_highs_lows(self, window_size): | |
l = self.data['Low'].reset_index(drop=True) | |
h = self.data['High'].reset_index(drop=True) | |
swing_highs = (h.rolling(window_size, center=True).max() / h == 1.) | |
swing_lows = (l.rolling(window_size, center=True).min() / l == 1.) | |
return pd.DataFrame({'Date':self.data.index.to_series(), 'highs':swing_highs.values, 'lows':swing_lows.values}) | |
def fvg(self): | |
""" | |
FVG - Fair Value Gap | |
A fair value gap is when the previous high is lower than the next low if the current candle is bullish. | |
Or when the previous low is higher than the next high if the current candle is bearish. | |
parameters: | |
returns: | |
FVG = 1 if bullish fair value gap, -1 if bearish fair value gap | |
Top = the top of the fair value gap | |
Bottom = the bottom of the fair value gap | |
MitigatedIndex = the index of the candle that mitigated the fair value gap | |
""" | |
fvg = np.where( | |
( | |
(self.data["High"].shift(1) < self.data["Low"].shift(-1)) | |
& (self.data["Close"] > self.data["Open"]) | |
) | |
| ( | |
(self.data["Low"].shift(1) > self.data["High"].shift(-1)) | |
& (self.data["Close"] < self.data["Open"]) | |
), | |
np.where(self.data["Close"] > self.data["Open"], 1, -1), | |
np.nan, | |
) | |
top = np.where( | |
~np.isnan(fvg), | |
np.where( | |
self.data["Close"] > self.data["Open"], | |
self.data["Low"].shift(-1), | |
self.data["Low"].shift(1), | |
), | |
np.nan, | |
) | |
bottom = np.where( | |
~np.isnan(fvg), | |
np.where( | |
self.data["Close"] > self.data["Open"], | |
self.data["High"].shift(1), | |
self.data["High"].shift(-1), | |
), | |
np.nan, | |
) | |
mitigated_index = np.zeros(len(self.data), dtype=np.int32) | |
for i in np.where(~np.isnan(fvg))[0]: | |
mask = np.zeros(len(self.data), dtype=np.bool_) | |
if fvg[i] == 1: | |
mask = self.data["Low"][i + 2:] <= top[i] | |
elif fvg[i] == -1: | |
mask = self.data["High"][i + 2:] >= bottom[i] | |
if np.any(mask): | |
j = np.argmax(mask) + i + 2 | |
mitigated_index[i] = j | |
mitigated_index = np.where(np.isnan(fvg), np.nan, mitigated_index) | |
return pd.concat( | |
[ | |
pd.Series(fvg.flatten(), name="FVG"), | |
pd.Series(top.flatten(), name="Top"), | |
pd.Series(bottom.flatten(), name="Bottom"), | |
pd.Series(mitigated_index.flatten(), name="MitigatedIndex"), | |
], | |
axis=1, | |
) | |
def order_block(self, imb_perc=.1, join_consecutive=True): | |
hl = self.swing_highs_lows(self.swing_hl_window_sz) | |
ob = np.where( | |
( | |
((self.data["High"]*((100+imb_perc)/100)) < self.data["Low"].shift(-2)) | |
& ((hl['lows']==True) | (hl['lows'].shift(1)==True)) | |
) | |
| ( | |
(self.data["Low"] > (self.data["High"].shift(-2)*((100+imb_perc)/100))) | |
& ((hl['highs']==True) | (hl['highs'].shift(1)==True)) | |
), | |
np.where(((hl['lows']==True) | (hl['lows'].shift(1)==True)), 1, -1), | |
np.nan, | |
) | |
# print(ob) | |
top = np.where( | |
~np.isnan(ob), | |
np.where( | |
self.data["Close"] > self.data["Open"], | |
self.data["Low"].shift(-2), | |
self.data["Low"], | |
), | |
np.nan, | |
) | |
bottom = np.where( | |
~np.isnan(ob), | |
np.where( | |
self.data["Close"] > self.data["Open"], | |
self.data["High"], | |
self.data["High"].shift(-2), | |
), | |
np.nan, | |
) | |
# if join_consecutive: | |
# for i in range(len(ob) - 1): | |
# if ob[i] == ob[i + 1]: | |
# top[i + 1] = max(top[i], top[i + 1]) | |
# bottom[i + 1] = min(bottom[i], bottom[i + 1]) | |
# ob[i] = top[i] = bottom[i] = np.nan | |
mitigated_index = np.zeros(len(self.data), dtype=np.int32) | |
for i in np.where(~np.isnan(ob))[0]: | |
mask = np.zeros(len(self.data), dtype=np.bool_) | |
if ob[i] == 1: | |
mask = self.data["Low"][i + 3:] <= top[i] | |
elif ob[i] == -1: | |
mask = self.data["High"][i + 3:] >= bottom[i] | |
if np.any(mask): | |
j = np.argmax(mask) + i + 3 | |
mitigated_index[i] = int(j) | |
ob = ob.flatten() | |
mitigated_index1 = np.where(np.isnan(ob), np.nan, mitigated_index) | |
return pd.concat( | |
[ | |
pd.Series(ob.flatten(), name="OB"), | |
pd.Series(top.flatten(), name="Top"), | |
pd.Series(bottom.flatten(), name="Bottom"), | |
pd.Series(mitigated_index1.flatten(), name="MitigatedIndex"), | |
], | |
axis=1, | |
).dropna(subset=['OB']) | |
def plot(self, swing_hl=True, show=True): | |
fig = make_subplots(1, 1) | |
# plot the candle stick graph | |
fig.add_trace(go.Candlestick(x=self.data.index.to_series(), | |
open=self.data['Open'], | |
high=self.data['High'], | |
low=self.data['Low'], | |
close=self.data['Close'], | |
name='ohlc')) | |
# grab first and last observations from df.date and make a continuous date range from that | |
dt_all = pd.date_range(start=self.data['Date'].iloc[0], end=self.data['Date'].iloc[-1], freq='5min') | |
# check which dates from your source that also accur in the continuous date range | |
dt_obs = [d.strftime("%Y-%m-%d %H:%M:%S") for d in self.data['Date']] | |
# isolate missing timestamps | |
dt_breaks = [d for d in dt_all.strftime("%Y-%m-%d %H:%M:%S").tolist() if not d in dt_obs] | |
# adjust xaxis for rangebreaks | |
fig.update_xaxes(rangebreaks=[dict(dvalue=5 * 60 * 1000, values=dt_breaks)]) | |
print(self.order_blocks.head()) | |
print(self.order_blocks.index.to_list()) | |
ob_df = self.data.iloc[self.order_blocks.index.to_list()] | |
# print(ob_df) | |
fig.add_trace(go.Scatter( | |
x=ob_df['Date'], | |
y=ob_df['Low'], | |
name="Order Block", | |
mode='markers', | |
marker_symbol='diamond-dot', | |
marker_size=13, | |
marker_line_width=2, | |
# offsetgroup=0, | |
)) | |
if swing_hl: | |
hl = self.swing_highs_lows(self.swing_hl_window_sz) | |
h = hl[(hl['highs']==True)] | |
l = hl[hl['lows']==True] | |
# print(h) | |
# exit(0) | |
fig.add_trace(go.Scatter( | |
x=h['Date'], | |
y=self.data[self.data.Date.isin(h['Date'])]['High']*(100.1/100), | |
mode='markers', | |
marker_symbol="triangle-up-dot", | |
marker_size=10, | |
name='Swing High', | |
# offsetgroup=2, | |
)) | |
fig.add_trace(go.Scatter( | |
x=l['Date'], | |
y=self.data[self.data.Date.isin(l['Date'])]['Low']*(99.9/100), | |
mode='markers', | |
marker_symbol="triangle-down-dot", | |
marker_size=10, | |
name='Swing Low', | |
marker_color='red', | |
# offsetgroup=2, | |
)) | |
fig.update_layout(xaxis_rangeslider_visible=False) | |
if show: | |
fig.show() | |
return fig | |
def EMA(array, n): | |
return pd.Series(array).ewm(span=n, adjust=False).mean() | |
if __name__ == "__main__": | |
from data_fetcher import fetch | |
data = fetch('ICICIBANK.NS', period='1mo', interval='15m') | |
# data = fetch('RELIANCE.NS', period='1mo', interval='15m') | |
# print(SMC(data).backtest_buy_signal()) | |
SMC(data).plot() |