File size: 5,847 Bytes
ba2bdfe 6138a65 ba2bdfe 6138a65 ba2bdfe 6138a65 ba2bdfe 83e4b5e ba2bdfe 6138a65 ba2bdfe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Layer, Dense, Dropout, LayerNormalization, MultiHeadAttention
from tensorflow.keras.models import Sequential
from sklearn.preprocessing import LabelEncoder
import gradio as gr
import os
# Define the EnhancedTransformerBlock class
class EnhancedTransformerBlock(Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super(EnhancedTransformerBlock, self).__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.ff_dim = ff_dim
self.rate = rate
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = Sequential([
Dense(ff_dim, activation="relu"),
Dense(embed_dim),
])
self.layernorm1 = LayerNormalization(epsilon=1e-6)
self.layernorm2 = LayerNormalization(epsilon=1e-6)
self.dropout1 = Dropout(rate)
self.dropout2 = Dropout(rate)
self.self_attention = MultiHeadAttention(num_heads=1, key_dim=embed_dim)
def call(self, inputs, training=False):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
out2 = self.layernorm2(out1 + ffn_output)
self_attn_output = self.self_attention(out2, out2)
return self.layernorm2(out2 + self_attn_output)
def get_config(self):
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"num_heads": self.num_heads,
"ff_dim": self.ff_dim,
"rate": self.rate,
})
return config
# Initialize global variables
sequence_length = 10
data = ["Single big", "Double big", "Double big", "Single small", "Single big",
"Double small", "Single big", "Double small", "Single small", "Single small",
"Single big", "Single small", "triple", "single big", "double small", "double small", "double small", "double small"] # This will store the recent outcomes
encoder = LabelEncoder()
# Try to load the saved model and encoder classes
try:
model = tf.keras.models.load_model("enhanced_adaptive_model.keras", custom_objects={'EnhancedTransformerBlock': EnhancedTransformerBlock})
encoder.classes_ = np.load('label_encoder_classes.npy', allow_pickle=True)
except FileNotFoundError:
print("Model or encoder classes file not found. Using a dummy model and encoder for demonstration.")
# Create a dummy model and encoder for demonstration
model = tf.keras.Sequential([tf.keras.layers.Dense(10, input_shape=(sequence_length,), activation='softmax')])
encoder.classes_ = np.array(['Single small', 'Single big', 'Double small', 'Double big', 'Triple'])
def update_data(data, new_outcome):
data.append(new_outcome)
if len(data) > sequence_length:
data.pop(0)
return data
def enhanced_predict_next(model, data, sequence_length, encoder):
last_sequence = data[-sequence_length:]
last_sequence = np.array(encoder.transform(last_sequence)).reshape((1, sequence_length))
# Monte Carlo Dropout for uncertainty estimation
predictions = []
for _ in range(100):
prediction = model(last_sequence, training=True)
predictions.append(prediction)
mean_prediction = np.mean(predictions, axis=0)
std_prediction = np.std(predictions, axis=0)
predicted_label = encoder.inverse_transform([np.argmax(mean_prediction)])
uncertainty = np.mean(std_prediction)
return predicted_label[0], uncertainty
def gradio_predict(outcome):
global data
if outcome not in encoder.classes_:
return "Invalid outcome. Please try again."
data = update_data(data, outcome)
if len(data) < sequence_length:
return f"Not enough data to make a prediction. Please enter {sequence_length - len(data)} more outcomes."
predicted_next, uncertainty = enhanced_predict_next(model, data, sequence_length, encoder)
return f'Predicted next outcome: {predicted_next} (Uncertainty: {uncertainty:.4f})'
def gradio_update(actual_next):
global data, model
if actual_next not in encoder.classes_:
return "Invalid outcome. Please try again."
data = update_data(data, actual_next)
if len(data) < sequence_length:
return f"Not enough data to update the model. Please enter {sequence_length - len(data)} more outcomes."
encoded_actual_next = encoder.transform([actual_next])[0]
new_X = np.array(encoder.transform(data[-sequence_length:])).reshape((1, sequence_length))
new_y = to_categorical(encoded_actual_next, num_classes=len(encoder.classes_))
model.fit(new_X, new_y, epochs=1, verbose=0)
return "Model updated with new data."
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("## Enhanced Outcome Prediction Model")
gr.Markdown(f"Enter a sequence of {sequence_length} outcomes to get started.")
gr.Markdown(f"Valid outcomes: {', '.join(encoder.classes_)}")
with gr.Row():
outcome_input = gr.Textbox(label="Enter an outcome")
predict_button = gr.Button("Predict Next")
predicted_output = gr.Textbox(label="Prediction")
with gr.Row():
actual_input = gr.Textbox(label="Enter actual next outcome")
update_button = gr.Button("Update Model")
update_output = gr.Textbox(label="Update Status")
predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output)
update_button.click(gradio_update, inputs=actual_input, outputs=update_output)
demo.launch() |