Spaces:
Running
Running
import gradio as gr | |
from gradio_client import Client | |
import json | |
import logging | |
import ast | |
import openai | |
import os | |
import re | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from multiprocessing import Pool, cpu_count | |
logging.basicConfig(filename='youtube_script_extractor.log', level=logging.DEBUG, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
def parse_api_response(response): | |
try: | |
if isinstance(response, str): | |
response = ast.literal_eval(response) | |
if isinstance(response, list) and len(response) > 0: | |
response = response[0] | |
if not isinstance(response, dict): | |
raise ValueError(f"μμμΉ λͺ»ν μλ΅ νμμ λλ€. λ°μ λ°μ΄ν° νμ : {type(response)}") | |
return response | |
except Exception as e: | |
raise ValueError(f"API μλ΅ νμ± μ€ν¨: {str(e)}") | |
def get_youtube_script(url): | |
logging.info(f"μ€ν¬λ¦½νΈ μΆμΆ μμ: URL = {url}") | |
client = Client("whispersound/YT_Ts_R") | |
try: | |
logging.debug("API νΈμΆ μμ") | |
result = client.predict(youtube_url=url, api_name="/predict") | |
logging.debug("API νΈμΆ μλ£") | |
parsed_result = parse_api_response(result) | |
title = parsed_result["data"][0]["title"] | |
transcription_text = parsed_result["data"][0]["transcriptionAsText"] | |
original_sections = parsed_result["data"][0]["sections"] | |
merged_sections = merge_sections(original_sections) | |
processed_sections = process_merged_sections_parallel(merged_sections) | |
logging.info("μ€ν¬λ¦½νΈ μΆμΆ λ° μ²λ¦¬ μλ£") | |
return title, transcription_text, processed_sections | |
except Exception as e: | |
error_msg = f"μ€ν¬λ¦½νΈ μΆμΆ μ€ μ€λ₯ λ°μ: {str(e)}" | |
logging.exception(error_msg) | |
return "", "", [] | |
def is_same_topic_tfidf(text1, text2, threshold=0.3): | |
vectorizer = TfidfVectorizer().fit([text1, text2]) | |
vectors = vectorizer.transform([text1, text2]) | |
similarity = (vectors[0] * vectors[1].T).A[0][0] | |
return similarity > threshold | |
def merge_sections(sections, min_duration=60, max_duration=300): | |
merged_sections = [] | |
current_section = sections[0].copy() | |
for section in sections[1:]: | |
duration = current_section['end_time'] - current_section['start_time'] | |
if duration < min_duration: | |
current_section['end_time'] = section['end_time'] | |
current_section['text'] += ' ' + section['text'] | |
elif duration >= max_duration: | |
merged_sections.append(current_section) | |
current_section = section.copy() | |
else: | |
if is_same_topic_tfidf(current_section['text'], section['text']): | |
current_section['end_time'] = section['end_time'] | |
current_section['text'] += ' ' + section['text'] | |
else: | |
merged_sections.append(current_section) | |
current_section = section.copy() | |
merged_sections.append(current_section) | |
return merged_sections | |
def summarize_section(section_text): | |
prompt = f""" | |
λ€μ μ νλΈ λλ³Έ μΉμ μ ν΅μ¬ λ΄μ©μ κ°κ²°νκ² μμ½νμΈμ: | |
1. νκΈλ‘ μμ±νμΈμ. | |
2. μ£Όμ λ Όμ κ³Ό μ€μν μΈλΆμ¬νμ ν¬ν¨νμΈμ. | |
3. μμ½μ 2-3λ¬Έμ₯μΌλ‘ μ ννμΈμ. | |
μΉμ λ΄μ©: | |
{section_text} | |
""" | |
try: | |
response = openai.ChatCompletion.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=150, | |
temperature=0.3, | |
top_p=0.9 | |
) | |
return response['choices'][0]['message']['content'] | |
except Exception as e: | |
logging.exception("μμ½ μμ± μ€ μ€λ₯ λ°μ") | |
return "μμ½μ μμ±νλ λμ μ€λ₯κ° λ°μνμ΅λλ€." | |
def process_section(section): | |
summary = summarize_section(section['text']) | |
return { | |
'start_time': section['start_time'], | |
'end_time': section['end_time'], | |
'summary': summary | |
} | |
def process_merged_sections_parallel(merged_sections): | |
with Pool(processes=cpu_count()) as pool: | |
return pool.map(process_section, merged_sections) | |
def format_time(seconds): | |
minutes, seconds = divmod(seconds, 60) | |
hours, minutes = divmod(minutes, 60) | |
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}" | |
def generate_timeline_summary(processed_sections): | |
timeline_summary = "" | |
for i, section in enumerate(processed_sections, 1): | |
start_time = format_time(section['start_time']) | |
end_time = format_time(section['end_time']) | |
timeline_summary += f"{start_time} - {end_time} {i}. {section['summary']}\n\n" | |
return timeline_summary | |
def display_script_and_summary(title, script, processed_sections): | |
timeline_summary = generate_timeline_summary(processed_sections) | |
script_html = f"""<h2 style='font-size:24px;'>{title}</h2> | |
<h3>νμλΌμΈ μμ½:</h3> | |
<div style="white-space: pre-wrap; max-height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;"> | |
{timeline_summary} | |
</div> | |
<details> | |
<summary><h3>μλ¬Έ μ€ν¬λ¦½νΈ (ν΄λ¦νμ¬ νΌμΉκΈ°)</h3></summary> | |
<div style="white-space: pre-wrap;">{script}</div> | |
</details>""" | |
return script_html | |
with gr.Blocks() as demo: | |
gr.Markdown("## YouTube μ€ν¬λ¦½νΈ μΆμΆ λ° μμ½ λꡬ") | |
youtube_url_input = gr.Textbox(label="YouTube URL μ λ ₯") | |
analyze_button = gr.Button("λΆμνκΈ°") | |
output = gr.HTML(label="κ²°κ³Ό") | |
cached_data = gr.State({"url": "", "title": "", "script": "", "processed_sections": []}) | |
def analyze(url, cache): | |
if url == cache["url"]: | |
return display_script_and_summary(cache["title"], cache["script"], cache["processed_sections"]), cache | |
title, script, processed_sections = get_youtube_script(url) | |
new_cache = {"url": url, "title": title, "script": script, "processed_sections": processed_sections} | |
return display_script_and_summary(title, script, processed_sections), new_cache | |
analyze_button.click( | |
analyze, | |
inputs=[youtube_url_input, cached_data], | |
outputs=[output, cached_data] | |
) | |
demo.launch(share=True) |