File size: 5,368 Bytes
6683fa5
d66e6ff
6683fa5
 
 
 
 
 
d66e6ff
6683fa5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d66e6ff
 
 
 
 
6683fa5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d66e6ff
6683fa5
 
 
 
 
 
 
d66e6ff
 
 
6683fa5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d66e6ff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import subprocess

# Install required libraries
subprocess.check_call(["pip", "install", "torch>=1.11.0"])
subprocess.check_call(["pip", "install", "transformers>=4.31.0"])
subprocess.check_call(["pip", "install", "diffusers>=0.14.0"])
subprocess.check_call(["pip", "install", "librosa"])
subprocess.check_call(["pip", "install", "accelerate>=0.20.1"])
subprocess.check_call(["pip", "install", "gradio>=3.35.2"])

import os
import threading
import numpy as np
import librosa
import torch
import gradio as gr
from functools import lru_cache
from transformers import pipeline
from huggingface_hub import login
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler

# Ensure required dependencies are installed
def install_missing_packages():
    required_packages = {
        "librosa": None,
        "diffusers": ">=0.14.0",
        "gradio": ">=3.35.2",
        "huggingface_hub": None,
        "accelerate": ">=0.20.1",
        "transformers": ">=4.31.0"
    }
    for package, version in required_packages.items():
        try:
            __import__(package)
        except ImportError:
            package_name = f"{package}{version}" if version else package
            subprocess.check_call(["pip", "install", package_name])

install_missing_packages()

# Get Hugging Face token for authentication
hf_token = os.getenv("HF_TOKEN")
if hf_token:
    login(hf_token)
else:
    raise ValueError("HF_TOKEN environment variable not set.")

# Load speech-to-text model (Whisper)
speech_to_text = pipeline(
    "automatic-speech-recognition", 
    model="openai/whisper-tiny", 
    return_timestamps=True
)

# Load Stable Diffusion model for text-to-image
text_to_image = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5")
device = "cuda" if torch.cuda.is_available() else "cpu"
text_to_image.to(device)
text_to_image.enable_attention_slicing()
text_to_image.safety_checker = None
text_to_image.scheduler = DPMSolverMultistepScheduler.from_config(text_to_image.scheduler.config)

# Preprocess audio file into NumPy array
def preprocess_audio(audio_path):
    try:
        audio, sr = librosa.load(audio_path, sr=16000)  # Resample to 16kHz
        return np.array(audio, dtype=np.float32)
    except Exception as e:
        return f"Error in preprocessing audio: {str(e)}"

# Speech-to-text function with long-form transcription support
@lru_cache(maxsize=10)
def transcribe_audio(audio_path):
    try:
        audio_array = preprocess_audio(audio_path)
        if isinstance(audio_array, str):  # Error message from preprocessing
            return audio_array
        result = speech_to_text(audio_array)
        # Combine text from multiple segments for long-form transcription
        transcription = " ".join(segment["text"] for segment in result["chunks"])
        return transcription
    except Exception as e:
        return f"Error in transcription: {str(e)}"

# Text-to-image function
@lru_cache(maxsize=10)
def generate_image_from_text(text):
    try:
        image = text_to_image(text, height=256, width=256).images[0]  # Generate smaller images for speed
        return image
    except Exception as e:
        return f"Error in image generation: {str(e)}"

# Optimized combined processing function
def process_audio_and_generate_image(audio_path):
    transcription_result = {"result": None}
    image_result = {"result": None}

    # Function to run transcription and image generation in parallel
    def transcription_thread():
        transcription_result["result"] = transcribe_audio(audio_path)

    def image_generation_thread():
        transcription = transcription_result["result"]
        if transcription and "Error" not in transcription:
            image_result["result"] = generate_image_from_text(transcription)

    # Start both tasks in parallel
    t1 = threading.Thread(target=transcription_thread)
    t2 = threading.Thread(target=image_generation_thread)

    t1.start()
    t2.start()

    t1.join()  # Wait for transcription to finish
    t2.join()  # Wait for image generation to finish

    transcription = transcription_result["result"]
    image = image_result["result"]

    if "Error" in transcription:
        return None, transcription
    if isinstance(image, str) and "Error" in image:
        return None, image

    return image, transcription

# Gradio interface for speech-to-text
speech_to_text_iface = gr.Interface(
    fn=transcribe_audio,
    inputs=gr.Audio(type="filepath", label="Upload audio file for transcription (WAV/MP3)"),
    outputs=gr.Textbox(label="Transcription"),
    title="Speech-to-Text Transcription",
    description="Upload an audio file to transcribe speech into text.",
)

# Gradio interface for voice-to-image
voice_to_image_iface = gr.Interface(
    fn=process_audio_and_generate_image,
    inputs=gr.Audio(type="filepath", label="Upload audio file (WAV/MP3)"),
    outputs=[gr.Image(label="Generated Image"), gr.Textbox(label="Transcription")],
    title="Voice-to-Image Generator",
    description="Upload an audio file to transcribe speech to text, and then generate an image based on the transcription.",
)

# Combined Gradio app
iface = gr.TabbedInterface(
    interface_list=[speech_to_text_iface, voice_to_image_iface],
    tab_names=["Speech-to-Text", "Voice-to-Image"]
)

# Launch Gradio interface
iface.launch(debug=True, share=True)