chatbot / Main2.py
onisj's picture
Upload folder using huggingface_hub
770eb6c verified
raw
history blame
17.7 kB
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from dotenv import load_dotenv
import gradio as gr
import requests
from contextlib import contextmanager
from datetime import datetime
import uuid
import logging
import speech_recognition as sr
from moviepy import VideoFileClip
from sqlalchemy import inspect, text
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
# Initialize Flask app and SQLAlchemy
app = Flask(__name__)
database_url = os.getenv('DATABASE_URL')
if not database_url:
raise ValueError("DATABASE_URL is not set in the .env file or environment.")
app.config['SQLALCHEMY_DATABASE_URI'] = database_url
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# Set Gemini API key from environment variable
gemini_api_key = os.getenv('GEMINI_API_KEY')
gemini_api_url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"
# Database models
class Character(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, unique=True)
description = db.Column(db.Text, nullable=False)
prompt_template = db.Column(db.Text, nullable=False)
class Conversation(db.Model):
__tablename__ = 'conversation'
id = db.Column(db.Integer, primary_key=True)
character_id = db.Column(db.Integer, db.ForeignKey('character.id'), nullable=False)
user_input = db.Column(db.Text, nullable=True)
bot_response = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
chat_id = db.Column(db.String(36), nullable=True)
user_id = db.Column(db.Integer, nullable=False)
@contextmanager
def app_context():
with app.app_context():
yield
# def reset_and_initialize_database():
# """Drop existing tables, recreate them, and verify the schema."""
# with app_context():
# try:
# db.session.execute(text("DROP TABLE IF EXISTS conversation CASCADE;"))
# db.session.execute(text("DROP TABLE IF EXISTS character CASCADE;"))
# db.session.commit()
# logger.info("Dropped existing tables 'conversation' and 'character'.")
# db.create_all()
# logger.info("Database tables recreated successfully.")
# inspector = inspect(db.engine)
# columns = inspector.get_columns('character')
# column_names = [col['name'] for col in columns]
# logger.info(f"Columns in 'character' table: {column_names}")
# if 'prompt_template' not in column_names:
# raise RuntimeError("Failed to create 'prompt_template' column in 'character' table.")
# except Exception as e:
# db.session.rollback()
# logger.error(f"Error resetting database: {e}")
# raise
def add_predefined_characters():
with app_context():
characters = [
{"name": "Chuck the Clown", "description": "A funny clown who tells jokes and entertains.", "prompt_template": "You are Chuck the Clown, always ready with a joke and entertainment. Be upbeat, silly, and include jokes in your responses."},
{"name": "Sarcastic Pirate", "description": "A pirate with a sharp tongue and a love for treasure.", "prompt_template": "You are a Sarcastic Pirate, ready to share your tales of adventure. Use pirate slang, be witty, sarcastic, and mention your love for treasure and the sea."},
{"name": "Professor Sage", "description": "A wise professor knowledgeable about many subjects.", "prompt_template": "You are Professor Sage, sharing wisdom and knowledge. Be scholarly, thoughtful, and provide educational information in your responses."}
]
for char_data in characters:
if not Character.query.filter_by(name=char_data["name"]).first():
new_character = Character(name=char_data["name"], description=char_data["description"], prompt_template=char_data["prompt_template"])
db.session.add(new_character)
logger.info(f"Adding predefined character: {char_data['name']}")
try:
db.session.commit()
logger.info("Predefined characters added successfully.")
except Exception as e:
db.session.rollback()
logger.error(f"Error adding predefined characters: {e}")
def add_character(name, description, prompt_template):
with app_context():
try:
if Character.query.filter_by(name=name).first():
return f"Character '{name}' already exists!"
new_character = Character(name=name, description=description, prompt_template=prompt_template)
db.session.add(new_character)
db.session.commit()
logger.info(f"Successfully added character: {name}")
return f"Character '{name}' added successfully!\nDescription: {description}"
except Exception as e:
db.session.rollback()
logger.error(f"Error adding character: {e}")
return f"An error occurred while adding the character: {str(e)}"
def get_existing_characters():
with app_context():
try:
characters = Character.query.all()
return [(char.name, char.description) for char in characters]
except Exception as e:
logger.error(f"Error retrieving characters: {e}")
return [("Error retrieving characters", str(e))]
def chat_with_character(character_name, user_input, user_id, chat_id=None):
with app_context():
try:
if not gemini_api_key:
raise ValueError("GEMINI_API_KEY is not set in the environment.")
character = Character.query.filter_by(name=character_name).first()
if not character:
return "Character not found.", None
if not chat_id:
chat_id = str(uuid.uuid4())
previous_conversations = Conversation.query.filter_by(user_id=user_id).order_by(Conversation.timestamp).all()
context_prompt = " ".join([f"User: {conv.user_input}\nBot: {conv.bot_response}" for conv in previous_conversations])
prompt_template = character.prompt_template
full_prompt = f"{prompt_template}\n{context_prompt}\nUser: {user_input}\nBot:"
payload = {"contents": [{"parts": [{"text": full_prompt}]}]}
headers = {'Content-Type': 'application/json'}
response = requests.post(gemini_api_url, headers=headers, json=payload, params={'key': gemini_api_key})
if response.status_code == 200:
response_data = response.json()
if 'candidates' in response_data and response_data['candidates']:
bot_response = response_data['candidates'][0]['content']['parts'][0]['text']
conversation = Conversation(character_id=character.id, user_input=user_input, bot_response=bot_response, chat_id=chat_id, user_id=user_id)
db.session.add(conversation)
db.session.commit()
logger.info(f"Saved conversation with chat_id: {chat_id}")
return bot_response, chat_id
else:
return "An error occurred while generating content: Unexpected response format.", chat_id
else:
logger.error(f"Error from Gemini API: {response.json()}")
return f"An error occurred while generating content: {response.status_code} - {response.text}", chat_id
except Exception as e:
logger.error(f"Unexpected error in chat_with_character: {e}")
return f"An unexpected error occurred: {str(e)}", chat_id
def speech_to_text(audio_file):
recognizer = sr.Recognizer()
with sr.AudioFile(audio_file) as source:
audio_data = recognizer.record(source)
try:
return recognizer.recognize_google(audio_data)
except sr.UnknownValueError:
logger.error("Could not understand audio")
return None
except sr.RequestError as e:
logger.error(f"Could not request results from Google Speech Recognition service; {e}")
return None
def extract_audio_from_video(video_file):
audio_file_path = "temp_audio.wav"
try:
with VideoFileClip(video_file) as video:
video.audio.write_audiofile(audio_file_path)
except Exception as e:
logger.error(f"Error extracting audio from video: {e}")
return None
return audio_file_path
def get_chat_history(user_id):
with app_context():
try:
conversations = Conversation.query.filter_by(user_id=user_id).order_by(Conversation.timestamp).all()
return conversations
except Exception as e:
logger.error(f"Error retrieving chat history: {e}")
return []
def create_interface():
with app_context():
add_predefined_characters()
with gr.Blocks(title="Character Chat System", theme=gr.themes.Default()) as iface:
current_chat_id = gr.State(value=None)
user_id = gr.State(value=None)
chat_messages = gr.State(value=[])
gr.Markdown("# 🎭 Character Chat System 🎭", elem_id="title")
with gr.Tab("Sign In"):
user_id_input = gr.Textbox(label="User ID (Numeric)", placeholder="Enter your numeric User ID (e.g., 123)", elem_id="user_id_input", interactive=True, lines=2)
sign_in_btn = gr.Button("Sign In", variant="primary")
sign_in_response = gr.Textbox(label="Sign In Response", interactive=False)
def sign_in(user_id_input):
try:
user_id_int = int(user_id_input) # Convert to integer
return f"Welcome, User {user_id_int}!", user_id_int
except ValueError:
return "Please enter a valid numeric User ID (e.g., 123)!", None
sign_in_btn.click(fn=sign_in, inputs=[user_id_input], outputs=[sign_in_response, user_id])
with gr.Tab("Admin: Add Character"):
with gr.Row():
with gr.Column():
name_input = gr.Textbox(label="Character Name", placeholder="Enter character name", elem_id="name_input")
description_input = gr.Textbox(label="Character Description", placeholder="Enter character description", elem_id="description_input")
prompt_input = gr.Textbox(label="Prompt Template", placeholder="Enter character prompt template", elem_id="prompt_input", lines=3)
add_character_btn = gr.Button("Add Character", elem_id="add_character_btn", variant="primary")
add_character_response = gr.Textbox(label="Response", interactive=False, elem_id="response_output")
add_character_btn.click(fn=add_character, inputs=[name_input, description_input, prompt_input], outputs=[add_character_response])
with gr.Column():
gr.Markdown("## 🌟 Existing Characters 🌟", elem_id="existing_chars_title")
existing_characters = get_existing_characters()
character_list = gr.Dataframe(value=existing_characters, headers=["Name", "Description"], interactive=False, elem_id="character_list")
refresh_characters_btn = gr.Button("Refresh Character List")
refresh_characters_btn.click(fn=lambda: gr.update(value=get_existing_characters()), outputs=[character_list])
with gr.Tab("Chat with Character"):
with gr.Row():
character_dropdown = gr.Dropdown(label="Choose Character", choices=[char[0] for char in get_existing_characters()], elem_id="character_dropdown")
chat_id_display = gr.Textbox(label="Current Chat ID", interactive=False, elem_id="chat_id_display")
user_input = gr.Textbox(label="Your Message", placeholder="Type your message or use audio/video input", elem_id="user_input", lines=2)
audio_input = gr.Audio(label="Audio Input", type="filepath", elem_id="audio_input")
video_input = gr.Video(label="Video Input", elem_id="video_input")
chat_btn = gr.Button("Send", variant="primary")
chat_response = gr.Chatbot(label="Chat Responses", elem_id="chat_response", height=300, type="messages") # Updated to 'messages' type
def handle_chat(character_name, user_input, audio_file, video_file, user_id, chat_messages, current_chat_id):
if not user_id:
return chat_messages, current_chat_id, "Please sign in with a numeric User ID first!"
if not character_name:
return chat_messages, current_chat_id, "Please select a character!"
final_input = user_input or ""
if audio_file:
audio_text = speech_to_text(audio_file)
if audio_text:
final_input += f" {audio_text}"
else:
chat_messages.append({"role": "assistant", "content": "Could not understand audio."})
return chat_messages, current_chat_id, None
if video_file:
audio_file_path = extract_audio_from_video(video_file)
if audio_file_path:
video_text = speech_to_text(audio_file_path)
if video_text:
final_input += f" {video_text}"
chat_messages.append({"role": "user", "content": "Video uploaded"})
else:
chat_messages.append({"role": "assistant", "content": "Failed to extract audio from video."})
return chat_messages, current_chat_id, None
if not final_input.strip():
return chat_messages, current_chat_id, "Please provide a message, audio, or video!"
response, new_chat_id = chat_with_character(character_name, final_input, user_id, current_chat_id)
chat_messages.append({"role": "user", "content": final_input})
chat_messages.append({"role": "assistant", "content": response})
return chat_messages, new_chat_id, new_chat_id
chat_btn.click(fn=handle_chat, inputs=[character_dropdown, user_input, audio_input, video_input, user_id, chat_messages, current_chat_id], outputs=[chat_response, current_chat_id, chat_id_display])
with gr.Tab("Chat History"):
with gr.Row():
gr.Markdown("## πŸ“œ View Chat History πŸ“œ")
view_history_btn = gr.Button("View History", variant="primary")
chat_history_display = gr.Dataframe(label="Chat History", interactive=False)
def load_chat_history(user_id):
if not user_id:
return [("Error", "Please sign in with a numeric User ID to view chat history.")]
history = get_chat_history(user_id)
return [(conv.id, f"User: {conv.user_input}\nBot: {conv.bot_response} at {conv.timestamp}") for conv in history]
view_history_btn.click(fn=load_chat_history, inputs=[user_id], outputs=[chat_history_display])
with gr.Tab("API Status"):
with gr.Row():
gr.Markdown("## πŸ”Œ API Connection Status πŸ”Œ")
check_api_btn = gr.Button("Check API Status", variant="primary")
api_status_display = gr.Textbox(label="API Status", interactive=False)
check_api_btn.click(fn=lambda: "βœ… API connection successful!" if requests.post(gemini_api_url, headers={'Content-Type': 'application/json'}, json={"contents": [{"parts": [{"text": "Hello"}]}]}, params={'key': gemini_api_key}).status_code == 200 else "❌ API connection failed!", outputs=[api_status_display])
return iface
if __name__ == "__main__":
if not os.getenv('DATABASE_URL'):
logger.error("DATABASE_URL is not set in the .env file or environment.")
raise ValueError("DATABASE_URL is required.")
if not os.getenv('GEMINI_API_KEY'):
logger.error("GEMINI_API_KEY is not set in the .env file or environment.")
raise ValueError("GEMINI_API_KEY is required.")
# with app_context():
# try:
# # reset_and_initialize_database()
# # add_predefined_characters()
# except Exception as e:
# logger.error(f"Error initializing database: {e}")
# logger.info("If the error persists, manually drop the tables using psql:")
# logger.info("psql \"postgresql://avnadmin:AVNS_WIH89YjY1kOIOBH-cFF@pg-197dad92-elyxir4lyf-aa02.d.aivencloud.com:20563/defaultdb?sslmode=require\"")
# logger.info("Then run: DROP TABLE character; DROP TABLE conversation;")
# raise
chat_interface = create_interface()
logger.info("Starting Gradio interface...")
chat_interface.launch(share=True)