import os import random import string import subprocess import requests from datetime import datetime from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context from bs4 import BeautifulSoup import markdown import threading from queue import Queue import time import json app = Flask(__name__) # Define directories file_folder = os.path.dirname(os.path.abspath(__file__)) temp_audio_folder = os.path.join(file_folder, 'temp_audio') model_folder = None piper_binary_path = os.path.join(file_folder, 'piper') # Create necessary directories os.makedirs(temp_audio_folder, exist_ok=True) # Check default user folder default_user_folder = "./" if os.path.exists(default_user_folder) and any(f.endswith('.onnx') for f in os.listdir(default_user_folder)): model_folder = default_user_folder # Global settings DEFAULT_BASE_HOST = "http://localhost:11434" SETTINGS = { 'speaker': 0, 'noise_scale': 0.667, 'length_scale': 1.0, 'noise_w': 0.8, 'sentence_silence': 0.2 } def get_available_models(): if not model_folder: return [] return [os.path.splitext(model)[0] for model in os.listdir(model_folder) if model.endswith('.onnx')] def get_ollama_models(base_host=DEFAULT_BASE_HOST): try: response = requests.get(f"{base_host}/api/tags") if response.status_code == 200: return [model['name'] for model in response.json().get('models', [])] return [] except: return [] def remove_markdown(text): html_content = markdown.markdown(text) soup = BeautifulSoup(html_content, 'html.parser') return soup.get_text().strip() def convert_to_speech(text, model_name, remove_md=False): if model_name not in get_available_models(): return None if remove_md: text = remove_markdown(text) random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + '.wav' output_file = os.path.join(temp_audio_folder, random_name) # Clean old audio files for file in os.listdir(temp_audio_folder): if file.endswith('.wav'): os.remove(os.path.join(temp_audio_folder, file)) model_path = os.path.join(model_folder, model_name + '.onnx') try: # Use the text directly in the command command = ( f'"{piper_binary_path}" -m "{model_path}" -f "{output_file}" ' f'--speaker {SETTINGS["speaker"]} --noise_scale {SETTINGS["noise_scale"]} ' f'--length_scale {SETTINGS["length_scale"]} --noise_w {SETTINGS["noise_w"]} ' f'--sentence_silence {SETTINGS["sentence_silence"]}' ) # Pass the text as input to the command result = subprocess.run(command, input=text.encode('utf-8'), shell=True, check=True) if os.path.exists(output_file): return output_file except Exception as e: print(f"Error during text-to-speech conversion: {e}") return None def set_default_models(): tts_models = get_available_models() ollama_models = get_ollama_models() default_tts_model = "RecomendacionesConMiau" if "RecomendacionesConMiau" in tts_models else None default_ollama_model = "llama3.2:1b" if "llama3.2:1b" in ollama_models else None return default_tts_model, default_ollama_model @app.route('/') def index(): tts_models = get_available_models() default_tts_model, default_ollama_model = set_default_models() return render_template('index.html', tts_models=tts_models, default_tts_model=default_tts_model, default_ollama_model=default_ollama_model) @app.route('/api/list_ollama_models') def list_ollama_models(): base_host = request.args.get('base_host', DEFAULT_BASE_HOST) return jsonify(models=get_ollama_models(base_host)) @app.route('/api/chat', methods=['POST']) def chat(): data = request.json base_host = data.get('base_host', DEFAULT_BASE_HOST) model = data.get('model') messages = data.get('messages', []) def generate(): queue = Queue() thread = threading.Thread( target=stream_ollama_response, args=(base_host, model, messages, queue) ) thread.start() complete_response = "" while True: msg_type, content = queue.get() if msg_type == "error": yield f"data: {json.dumps({'error': content})}\n\n" break elif msg_type == "chunk": complete_response = content yield f"data: {json.dumps({'chunk': content})}\n\n" elif msg_type == "done": yield f"data: {json.dumps({'done': complete_response})}\n\n" break return Response(stream_with_context(generate()), mimetype='text/event-stream') def stream_ollama_response(base_host, model, messages, queue): url = f"{base_host}/api/chat" data = { "model": model, "messages": messages, "stream": True } try: with requests.post(url, json=data, stream=True) as response: if response.status_code == 200: complete_response = "" for line in response.iter_lines(): if line: try: json_response = json.loads(line) chunk = json_response.get("message", {}).get("content", "") if chunk: complete_response += chunk queue.put(("chunk", complete_response)) except json.JSONDecodeError: continue queue.put(("done", complete_response)) else: queue.put(("error", f"Error: {response.status_code}")) except Exception as e: queue.put(("error", f"Error: {str(e)}")) @app.route('/api/tts', methods=['POST']) def text_to_speech(): data = request.json text = data.get('text', '') model = data.get('model') remove_md = data.get('remove_markdown', False) if not text or not model: return jsonify(error="Missing text or model"), 400 audio_file = convert_to_speech(text, model, remove_md) if not audio_file: return jsonify(error="Failed to convert text to speech"), 500 return jsonify(audio_file=os.path.basename(audio_file)) @app.route('/audio/') def serve_audio(filename): return send_file(os.path.join(temp_audio_folder, filename)) if __name__ == '__main__': app.run(debug=True, port=7860, host='0.0.0.0')