Spaces:
Sleeping
Sleeping
import streamlit as st | |
from faster_whisper import WhisperModel | |
from transformers import pipeline | |
from textblob import TextBlob | |
import numpy as np | |
import queue | |
import threading | |
import time | |
# Initialize shared state | |
audio_queue = queue.Queue() | |
transcription_results = [] | |
ai_detection_results = [] | |
# Global stop event for threads | |
stop_event = threading.Event() | |
def initialize_model(): | |
"""Initialize Whisper model and AI detectors.""" | |
st.session_state.model = WhisperModel(model_size="small", device="cpu", compute_type="int8") | |
st.session_state.ai_detector = pipeline('text-classification', model='roberta-base-openai-detector') | |
def advanced_ai_detection(text, ai_detector): | |
"""Perform AI detection on the text.""" | |
if len(text.split()) < 5: # Skip short texts | |
return "Insufficient Data" | |
result = ai_detector(text)[0] | |
return result | |
def analyze_text_linguistics(text): | |
"""Perform linguistic analysis on the text.""" | |
blob = TextBlob(text) | |
return { | |
"sentiment": blob.sentiment.polarity, | |
"subjectivity": blob.sentiment.subjectivity | |
} | |
def transcribe_audio(audio_chunk, model): | |
"""Transcribe audio using the Whisper model.""" | |
segments, _ = model.transcribe(audio_chunk, language="en") | |
return [segment.text for segment in segments] | |
def run_app(): | |
"""Main Streamlit app function.""" | |
st.title("AI Speech Detector") | |
st.subheader("Real-Time Speech Transcription and AI Detection") | |
st.text("This app transcribes audio input and detects if the text is AI-generated.") | |
# Sidebar for controls | |
st.sidebar.title("Controls") | |
start_button = st.sidebar.button("Start Recording") | |
stop_button = st.sidebar.button("Stop Recording") | |
if "model" not in st.session_state: | |
st.text("Loading AI models...") | |
initialize_model() | |
st.text("Models loaded successfully!") | |
# Display transcript | |
st.text_area("Real-Time Transcript", height=200, key="transcript") | |
# Display AI detection results | |
st.text_area("AI Detection Results", height=200, key="ai_detection") | |
if start_button: | |
st.session_state.is_recording = True | |
threading.Thread(target=process_audio_stream).start() | |
if stop_button: | |
st.session_state.is_recording = False | |
stop_event.set() | |
def process_audio_stream(): | |
"""Simulated audio capture and processing.""" | |
model = st.session_state.model | |
ai_detector = st.session_state.ai_detector | |
while not stop_event.is_set(): | |
try: | |
# Simulate real-time audio input | |
fake_audio_chunk = np.random.rand(16000 * 3).astype(np.float32) # 3 seconds of fake audio | |
transcription = transcribe_audio(fake_audio_chunk, model) | |
# Update real-time transcript | |
for text in transcription: | |
st.session_state.transcript += text + "\n" | |
# Perform AI detection | |
ai_result = advanced_ai_detection(text, ai_detector) | |
st.session_state.ai_detection += f"Text: {text}\nResult: {ai_result}\n\n" | |
except Exception as e: | |
st.error(f"Error during transcription: {str(e)}") | |
break | |
stop_event.clear() | |
if __name__ == "__main__": | |
run_app() | |