AIRider's picture
Update app.py
1966494 verified
raw
history blame
6.37 kB
import gradio as gr
from gradio_client import Client
import json
import logging
import ast
import openai
import os
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from multiprocessing import Pool, cpu_count
logging.basicConfig(filename='youtube_script_extractor.log', level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
openai.api_key = os.getenv("OPENAI_API_KEY")
def parse_api_response(response):
try:
if isinstance(response, str):
response = ast.literal_eval(response)
if isinstance(response, list) and len(response) > 0:
response = response[0]
if not isinstance(response, dict):
raise ValueError(f"μ˜ˆμƒμΉ˜ λͺ»ν•œ 응닡 ν˜•μ‹μž…λ‹ˆλ‹€. 받은 데이터 νƒ€μž…: {type(response)}")
return response
except Exception as e:
raise ValueError(f"API 응닡 νŒŒμ‹± μ‹€νŒ¨: {str(e)}")
def get_youtube_script(url):
logging.info(f"슀크립트 μΆ”μΆœ μ‹œμž‘: URL = {url}")
client = Client("whispersound/YT_Ts_R")
try:
logging.debug("API 호좜 μ‹œμž‘")
result = client.predict(youtube_url=url, api_name="/predict")
logging.debug("API 호좜 μ™„λ£Œ")
parsed_result = parse_api_response(result)
title = parsed_result["data"][0]["title"]
transcription_text = parsed_result["data"][0]["transcriptionAsText"]
original_sections = parsed_result["data"][0]["sections"]
merged_sections = merge_sections(original_sections)
processed_sections = process_merged_sections_parallel(merged_sections)
logging.info("슀크립트 μΆ”μΆœ 및 처리 μ™„λ£Œ")
return title, transcription_text, processed_sections
except Exception as e:
error_msg = f"슀크립트 μΆ”μΆœ 쀑 였λ₯˜ λ°œμƒ: {str(e)}"
logging.exception(error_msg)
return "", "", []
def is_same_topic_tfidf(text1, text2, threshold=0.3):
vectorizer = TfidfVectorizer().fit([text1, text2])
vectors = vectorizer.transform([text1, text2])
similarity = (vectors[0] * vectors[1].T).A[0][0]
return similarity > threshold
def merge_sections(sections, min_duration=60, max_duration=300):
merged_sections = []
current_section = sections[0].copy()
for section in sections[1:]:
duration = current_section['end_time'] - current_section['start_time']
if duration < min_duration:
current_section['end_time'] = section['end_time']
current_section['text'] += ' ' + section['text']
elif duration >= max_duration:
merged_sections.append(current_section)
current_section = section.copy()
else:
if is_same_topic_tfidf(current_section['text'], section['text']):
current_section['end_time'] = section['end_time']
current_section['text'] += ' ' + section['text']
else:
merged_sections.append(current_section)
current_section = section.copy()
merged_sections.append(current_section)
return merged_sections
def summarize_section(section_text):
prompt = f"""
λ‹€μŒ 유튜브 λŒ€λ³Έ μ„Ήμ…˜μ˜ 핡심 λ‚΄μš©μ„ κ°„κ²°ν•˜κ²Œ μš”μ•½ν•˜μ„Έμš”:
1. ν•œκΈ€λ‘œ μž‘μ„±ν•˜μ„Έμš”.
2. μ£Όμš” 논점과 μ€‘μš”ν•œ 세뢀사항을 ν¬ν•¨ν•˜μ„Έμš”.
3. μš”μ•½μ€ 2-3λ¬Έμž₯으둜 μ œν•œν•˜μ„Έμš”.
μ„Ήμ…˜ λ‚΄μš©:
{section_text}
"""
try:
response = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=150,
temperature=0.3,
top_p=0.9
)
return response['choices'][0]['message']['content']
except Exception as e:
logging.exception("μš”μ•½ 생성 쀑 였λ₯˜ λ°œμƒ")
return "μš”μ•½μ„ μƒμ„±ν•˜λŠ” λ™μ•ˆ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€."
def process_section(section):
summary = summarize_section(section['text'])
return {
'start_time': section['start_time'],
'end_time': section['end_time'],
'summary': summary
}
def process_merged_sections_parallel(merged_sections):
with Pool(processes=cpu_count()) as pool:
return pool.map(process_section, merged_sections)
def format_time(seconds):
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
def generate_timeline_summary(processed_sections):
timeline_summary = ""
for i, section in enumerate(processed_sections, 1):
start_time = format_time(section['start_time'])
end_time = format_time(section['end_time'])
timeline_summary += f"{start_time} - {end_time} {i}. {section['summary']}\n\n"
return timeline_summary
def display_script_and_summary(title, script, processed_sections):
timeline_summary = generate_timeline_summary(processed_sections)
script_html = f"""<h2 style='font-size:24px;'>{title}</h2>
<h3>νƒ€μž„λΌμΈ μš”μ•½:</h3>
<div style="white-space: pre-wrap; max-height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">
{timeline_summary}
</div>
<details>
<summary><h3>원문 슀크립트 (ν΄λ¦­ν•˜μ—¬ 펼치기)</h3></summary>
<div style="white-space: pre-wrap;">{script}</div>
</details>"""
return script_html
with gr.Blocks() as demo:
gr.Markdown("## YouTube 슀크립트 μΆ”μΆœ 및 μš”μ•½ 도ꡬ")
youtube_url_input = gr.Textbox(label="YouTube URL μž…λ ₯")
analyze_button = gr.Button("λΆ„μ„ν•˜κΈ°")
output = gr.HTML(label="κ²°κ³Ό")
cached_data = gr.State({"url": "", "title": "", "script": "", "processed_sections": []})
def analyze(url, cache):
if url == cache["url"]:
return display_script_and_summary(cache["title"], cache["script"], cache["processed_sections"]), cache
title, script, processed_sections = get_youtube_script(url)
new_cache = {"url": url, "title": title, "script": script, "processed_sections": processed_sections}
return display_script_and_summary(title, script, processed_sections), new_cache
analyze_button.click(
analyze,
inputs=[youtube_url_input, cached_data],
outputs=[output, cached_data]
)
demo.launch(share=True)