import streamlit as st import yfinance as yf import pandas as pd import numpy as np import feedparser import requests import base64 from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout # Function to fetch cryptocurrency data def get_crypto_data(symbol, period="30d", interval="1h"): crypto = yf.Ticker(f"{symbol}-USD") data = crypto.history(period=period, interval=interval) return data # Function to calculate RSI def calculate_rsi(data, period=14): delta = data['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi # Function to calculate Bollinger Bands def calculate_bollinger_bands(data, period=20, std_dev=2): sma = data['Close'].rolling(window=period).mean() std = data['Close'].rolling(window=period).std() upper_band = sma + (std * std_dev) lower_band = sma - (std * std_dev) return upper_band, lower_band # Function to calculate MACD def calculate_macd(data, short_window=12, long_window=26, signal_window=9): short_ema = data['Close'].ewm(span=short_window, adjust=False).mean() long_ema = data['Close'].ewm(span=long_window, adjust=False).mean() macd = short_ema - long_ema signal = macd.ewm(span=signal_window, adjust=False).mean() return macd, signal # Function to calculate EMA def calculate_ema(data, period=20): return data['Close'].ewm(span=period, adjust=False).mean() # Function to calculate OBV def calculate_obv(data): obv = (np.sign(data['Close'].diff()) * data['Volume']).cumsum() return obv # Function to calculate probabilities for the next 12 hours def calculate_probabilities(data): # Calculate indicators on the entire dataset data['RSI'] = calculate_rsi(data) data['Upper_Band'], data['Lower_Band'] = calculate_bollinger_bands(data) data['MACD'], data['MACD_Signal'] = calculate_macd(data) data['EMA_50'] = calculate_ema(data, period=50) data['EMA_200'] = calculate_ema(data, period=200) data['OBV'] = calculate_obv(data) # Use the most recent values for predictions probabilities = { "RSI": {"Value": data['RSI'].iloc[-1], "Pump": 0, "Dump": 0}, "Bollinger Bands": {"Value": data['Close'].iloc[-1], "Pump": 0, "Dump": 0}, "MACD": {"Value": data['MACD'].iloc[-1], "Pump": 0, "Dump": 0}, "EMA": {"Value": data['EMA_50'].iloc[-1], "Pump": 0, "Dump": 0}, "OBV": {"Value": data['OBV'].iloc[-1], "Pump": 0, "Dump": 0}, } # RSI rsi = data['RSI'].iloc[-1] if rsi < 25: probabilities["RSI"]["Pump"] = 90 # Strong Pump elif 25 <= rsi < 30: probabilities["RSI"]["Pump"] = 60 # Moderate Pump elif 70 < rsi <= 75: probabilities["RSI"]["Dump"] = 60 # Moderate Dump elif rsi > 75: probabilities["RSI"]["Dump"] = 90 # Strong Dump # Bollinger Bands close = data['Close'].iloc[-1] upper_band = data['Upper_Band'].iloc[-1] lower_band = data['Lower_Band'].iloc[-1] if close <= lower_band: probabilities["Bollinger Bands"]["Pump"] = 90 # Strong Pump elif lower_band < close <= lower_band * 1.05: probabilities["Bollinger Bands"]["Pump"] = 60 # Moderate Pump elif upper_band * 0.95 <= close < upper_band: probabilities["Bollinger Bands"]["Dump"] = 60 # Moderate Dump elif close >= upper_band: probabilities["Bollinger Bands"]["Dump"] = 90 # Strong Dump # MACD macd = data['MACD'].iloc[-1] macd_signal = data['MACD_Signal'].iloc[-1] if macd > macd_signal and macd > 0: probabilities["MACD"]["Pump"] = 90 # Strong Pump elif macd > macd_signal and macd <= 0: probabilities["MACD"]["Pump"] = 60 # Moderate Pump elif macd < macd_signal and macd >= 0: probabilities["MACD"]["Dump"] = 60 # Moderate Dump elif macd < macd_signal and macd < 0: probabilities["MACD"]["Dump"] = 90 # Strong Dump # EMA ema_short = data['EMA_50'].iloc[-1] ema_long = data['EMA_200'].iloc[-1] if ema_short > ema_long and close > ema_short: probabilities["EMA"]["Pump"] = 90 # Strong Pump elif ema_short > ema_long and close <= ema_short: probabilities["EMA"]["Pump"] = 60 # Moderate Pump elif ema_short < ema_long and close >= ema_short: probabilities["EMA"]["Dump"] = 60 # Moderate Dump elif ema_short < ema_long and close < ema_short: probabilities["EMA"]["Dump"] = 90 # Strong Dump # OBV obv = data['OBV'].iloc[-1] if obv > 100000: probabilities["OBV"]["Pump"] = 90 # Strong Pump elif 50000 < obv <= 100000: probabilities["OBV"]["Pump"] = 60 # Moderate Pump elif -100000 <= obv < -50000: probabilities["OBV"]["Dump"] = 60 # Moderate Dump elif obv < -100000: probabilities["OBV"]["Dump"] = 90 # Strong Dump # Normalize Pump and Dump probabilities to sum to 100% for indicator in probabilities: pump_prob = probabilities[indicator]["Pump"] dump_prob = probabilities[indicator]["Dump"] # If pump probability is set, normalize dump if pump_prob > 0: probabilities[indicator]["Dump"] = 100 - pump_prob # If dump probability is set, normalize pump if dump_prob > 0: probabilities[indicator]["Pump"] = 100 - dump_prob return probabilities, data.iloc[-1] # Function to calculate weighted probabilities def calculate_weighted_probabilities(probabilities): weightages = { "RSI": 0.20, "Bollinger Bands": 0.20, "MACD": 0.25, "EMA": 0.15, "OBV": 0.20 } # Initialize final probabilities final_probabilities = {"Pump": 0, "Dump": 0} # Calculate weighted probabilities for indicator, values in probabilities.items(): pump_prob = values["Pump"] * weightages[indicator] dump_prob = values["Dump"] * weightages[indicator] final_probabilities["Pump"] += pump_prob final_probabilities["Dump"] += dump_prob # Normalize the final probabilities to ensure they sum to 100% total = final_probabilities["Pump"] + final_probabilities["Dump"] # Handle cases where the total sum of probabilities is zero if total == 0: final_probabilities["Pump"] = 50 final_probabilities["Dump"] = 50 else: final_probabilities["Pump"] = (final_probabilities["Pump"] / total) * 100 final_probabilities["Dump"] = (final_probabilities["Dump"] / total) * 100 # Debugging the final probabilities to ensure they sum up to 100% print(f"Final Pump Probability: {final_probabilities['Pump']}%") print(f"Final Dump Probability: {final_probabilities['Dump']}%") return final_probabilities # Function to fetch news data from Google News RSS feeds def fetch_news(coin_name): try: url = f"https://news.google.com/rss/search?q={coin_name}+cryptocurrency" feed = feedparser.parse(url) news_items = [] for entry in feed.entries[:5]: # Limit to 5 news items news_items.append({ "title": entry.title, "link": entry.link, "published": entry.published, }) return news_items except Exception as e: st.error(f"Error fetching news: {e}") return [] # Prepare data for LSTM Model def prepare_lstm_data(df, seq_len=60): data = df['Price'].values.reshape(-1, 1) scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data) sequences, labels = [], [] for i in range(len(scaled_data) - seq_len): sequences.append(scaled_data[i:i + seq_len]) labels.append(scaled_data[i + seq_len]) return np.array(sequences), np.array(labels), scaler # Build and train LSTM model def build_lstm_model(input_shape): model = Sequential([ LSTM(units=50, return_sequences=True, input_shape=input_shape), Dropout(0.2), LSTM(units=50, return_sequences=False), Dropout(0.2), Dense(units=25), Dense(units=1) ]) model.compile(optimizer='adam', loss='mean_squared_error') return model # Calculate prediction based of LTSM model learning def calculate_prediction_with_ltsm(symbol="BTC", period="5d", interval="5m"): st.write("**Fetched Data...") data = get_crypto_data(symbol, period, interval) prices = [(pd.to_datetime(index, unit='m'), price) for index, price in data['Close'].items()] df = pd.DataFrame(prices, columns=['Date', 'Price']) st.write("**Preparing data for LTSM Model training......(processing)...") X, Y, scaler = prepare_lstm_data(df) st.write("**Build LTSM Model with X shape data.........(processing)...") model = build_lstm_model((X.shape[1], 1)) # Train the model st.write("**Train LTSM Model with X,Y shape data with batch_size(32), epochs(10)............(processing)...") model.fit(X, Y, batch_size=32, epochs=10) # Predict the next price point st.write("**Sequence LTSM Model with X,Y shape data with batch_size(32), epochs(10)...............(processing)...") last_sequence = X[-1].reshape(1, X.shape[1], 1) st.write("**Predict realtime price with LTSM Model trained with X,Y shape data with batch_size(32), epochs(10)..................(processing)...") scaled_prediction = model.predict(last_sequence) predicted_price = scaler.inverse_transform(scaled_prediction) return predicted_price # Streamlit App st.set_page_config(page_title="Crypto Insights ", layout="wide") # Add styled title with specific color st.markdown( """
***Based on the last 5 days with 5-minute intervals of closing data***
""", unsafe_allow_html=True ) period = "5d" interval = "5m" predicted_price = calculate_prediction_with_ltsm(symbol, period, interval) #st.write("### **Final Predicted value by learned LTSM model based on last 5 days with 5 interval of closing data** ###") #st.write(f"**Predicted next realtime price: ${predicted_price[0][0]:.10f}**") st.markdown( f"""Predicted Next Realtime Price: ${predicted_price[0][0]:.10f}
""", unsafe_allow_html=True )