HirCoir's picture
Update app.py
e23dd95 verified
import os
import random
import string
import subprocess
import requests
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context
from bs4 import BeautifulSoup
import markdown
import threading
from queue import Queue
import time
import json
app = Flask(__name__)
# Define directories
file_folder = os.path.dirname(os.path.abspath(__file__))
temp_audio_folder = os.path.join(file_folder, 'temp_audio')
model_folder = None
piper_binary_path = os.path.join(file_folder, 'piper')
# Create necessary directories
os.makedirs(temp_audio_folder, exist_ok=True)
# Check default user folder
default_user_folder = "./"
if os.path.exists(default_user_folder) and any(f.endswith('.onnx') for f in os.listdir(default_user_folder)):
model_folder = default_user_folder
# Global settings
DEFAULT_BASE_HOST = "http://localhost:11434"
SETTINGS = {
'speaker': 0,
'noise_scale': 0.667,
'length_scale': 1.0,
'noise_w': 0.8,
'sentence_silence': 0.2
}
def get_available_models():
if not model_folder:
return []
return [os.path.splitext(model)[0] for model in os.listdir(model_folder) if model.endswith('.onnx')]
def get_ollama_models(base_host=DEFAULT_BASE_HOST):
try:
response = requests.get(f"{base_host}/api/tags")
if response.status_code == 200:
return [model['name'] for model in response.json().get('models', [])]
return []
except:
return []
def remove_markdown(text):
html_content = markdown.markdown(text)
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text().strip()
def convert_to_speech(text, model_name, remove_md=False):
if model_name not in get_available_models():
return None
if remove_md:
text = remove_markdown(text)
random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + '.wav'
output_file = os.path.join(temp_audio_folder, random_name)
# Clean old audio files
for file in os.listdir(temp_audio_folder):
if file.endswith('.wav'):
os.remove(os.path.join(temp_audio_folder, file))
model_path = os.path.join(model_folder, model_name + '.onnx')
try:
# Use the text directly in the command
command = (
f'"{piper_binary_path}" -m "{model_path}" -f "{output_file}" '
f'--speaker {SETTINGS["speaker"]} --noise_scale {SETTINGS["noise_scale"]} '
f'--length_scale {SETTINGS["length_scale"]} --noise_w {SETTINGS["noise_w"]} '
f'--sentence_silence {SETTINGS["sentence_silence"]}'
)
# Pass the text as input to the command
result = subprocess.run(command, input=text.encode('utf-8'), shell=True, check=True)
if os.path.exists(output_file):
return output_file
except Exception as e:
print(f"Error during text-to-speech conversion: {e}")
return None
def set_default_models():
tts_models = get_available_models()
ollama_models = get_ollama_models()
default_tts_model = "RecomendacionesConMiau" if "RecomendacionesConMiau" in tts_models else None
default_ollama_model = "llama3.2:1b" if "llama3.2:1b" in ollama_models else None
return default_tts_model, default_ollama_model
@app.route('/')
def index():
tts_models = get_available_models()
default_tts_model, default_ollama_model = set_default_models()
return render_template('index.html', tts_models=tts_models, default_tts_model=default_tts_model, default_ollama_model=default_ollama_model)
@app.route('/api/list_ollama_models')
def list_ollama_models():
base_host = request.args.get('base_host', DEFAULT_BASE_HOST)
return jsonify(models=get_ollama_models(base_host))
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.json
base_host = data.get('base_host', DEFAULT_BASE_HOST)
model = data.get('model')
messages = data.get('messages', [])
def generate():
queue = Queue()
thread = threading.Thread(
target=stream_ollama_response,
args=(base_host, model, messages, queue)
)
thread.start()
complete_response = ""
while True:
msg_type, content = queue.get()
if msg_type == "error":
yield f"data: {json.dumps({'error': content})}\n\n"
break
elif msg_type == "chunk":
complete_response = content
yield f"data: {json.dumps({'chunk': content})}\n\n"
elif msg_type == "done":
yield f"data: {json.dumps({'done': complete_response})}\n\n"
break
return Response(stream_with_context(generate()), mimetype='text/event-stream')
def stream_ollama_response(base_host, model, messages, queue):
url = f"{base_host}/api/chat"
data = {
"model": model,
"messages": messages,
"stream": True
}
try:
with requests.post(url, json=data, stream=True) as response:
if response.status_code == 200:
complete_response = ""
for line in response.iter_lines():
if line:
try:
json_response = json.loads(line)
chunk = json_response.get("message", {}).get("content", "")
if chunk:
complete_response += chunk
queue.put(("chunk", complete_response))
except json.JSONDecodeError:
continue
queue.put(("done", complete_response))
else:
queue.put(("error", f"Error: {response.status_code}"))
except Exception as e:
queue.put(("error", f"Error: {str(e)}"))
@app.route('/api/tts', methods=['POST'])
def text_to_speech():
data = request.json
text = data.get('text', '')
model = data.get('model')
remove_md = data.get('remove_markdown', False)
if not text or not model:
return jsonify(error="Missing text or model"), 400
audio_file = convert_to_speech(text, model, remove_md)
if not audio_file:
return jsonify(error="Failed to convert text to speech"), 500
return jsonify(audio_file=os.path.basename(audio_file))
@app.route('/audio/<filename>')
def serve_audio(filename):
return send_file(os.path.join(temp_audio_folder, filename))
if __name__ == '__main__':
app.run(debug=True, port=7860, host='0.0.0.0')