import gradio as gr from gradio_client import Client import json import logging import ast import openai import os import re from sklearn.feature_extraction.text import TfidfVectorizer from multiprocessing import Pool, cpu_count logging.basicConfig(filename='youtube_script_extractor.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') openai.api_key = os.getenv("OPENAI_API_KEY") def parse_api_response(response): try: if isinstance(response, str): response = ast.literal_eval(response) if isinstance(response, list) and len(response) > 0: response = response[0] if not isinstance(response, dict): raise ValueError(f"예상치 못한 응답 형식입니다. 받은 데이터 타입: {type(response)}") return response except Exception as e: raise ValueError(f"API 응답 파싱 실패: {str(e)}") def get_youtube_script(url): logging.info(f"스크립트 추출 시작: URL = {url}") client = Client("whispersound/YT_Ts_R") try: logging.debug("API 호출 시작") result = client.predict(youtube_url=url, api_name="/predict") logging.debug("API 호출 완료") parsed_result = parse_api_response(result) title = parsed_result["data"][0]["title"] transcription_text = parsed_result["data"][0]["transcriptionAsText"] original_sections = parsed_result["data"][0]["sections"] merged_sections = merge_sections(original_sections) processed_sections = process_merged_sections_parallel(merged_sections) logging.info("스크립트 추출 및 처리 완료") return title, transcription_text, processed_sections except Exception as e: error_msg = f"스크립트 추출 중 오류 발생: {str(e)}" logging.exception(error_msg) return "", "", [] def is_same_topic_tfidf(text1, text2, threshold=0.3): vectorizer = TfidfVectorizer().fit([text1, text2]) vectors = vectorizer.transform([text1, text2]) similarity = (vectors[0] * vectors[1].T).A[0][0] return similarity > threshold def merge_sections(sections, min_duration=60, max_duration=300): merged_sections = [] current_section = sections[0].copy() for section in sections[1:]: duration = current_section['end_time'] - current_section['start_time'] if duration < min_duration: current_section['end_time'] = section['end_time'] current_section['text'] += ' ' + section['text'] elif duration >= max_duration: merged_sections.append(current_section) current_section = section.copy() else: if is_same_topic_tfidf(current_section['text'], section['text']): current_section['end_time'] = section['end_time'] current_section['text'] += ' ' + section['text'] else: merged_sections.append(current_section) current_section = section.copy() merged_sections.append(current_section) return merged_sections def summarize_section(section_text): prompt = f""" 다음 유튜브 대본 섹션의 핵심 내용을 간결하게 요약하세요: 1. 한글로 작성하세요. 2. 주요 논점과 중요한 세부사항을 포함하세요. 3. 요약은 2-3문장으로 제한하세요. 섹션 내용: {section_text} """ try: response = openai.ChatCompletion.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=150, temperature=0.3, top_p=0.9 ) return response['choices'][0]['message']['content'] except Exception as e: logging.exception("요약 생성 중 오류 발생") return "요약을 생성하는 동안 오류가 발생했습니다." def process_section(section): summary = summarize_section(section['text']) return { 'start_time': section['start_time'], 'end_time': section['end_time'], 'summary': summary } def process_merged_sections_parallel(merged_sections): with Pool(processes=cpu_count()) as pool: return pool.map(process_section, merged_sections) def format_time(seconds): minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}" def generate_timeline_summary(processed_sections): timeline_summary = "" for i, section in enumerate(processed_sections, 1): start_time = format_time(section['start_time']) end_time = format_time(section['end_time']) timeline_summary += f"{start_time} - {end_time} {i}. {section['summary']}\n\n" return timeline_summary def display_script_and_summary(title, script, processed_sections): timeline_summary = generate_timeline_summary(processed_sections) script_html = f"""

{title}

타임라인 요약:

{timeline_summary}

원문 스크립트 (클릭하여 펼치기)

{script}
""" return script_html with gr.Blocks() as demo: gr.Markdown("## YouTube 스크립트 추출 및 요약 도구") youtube_url_input = gr.Textbox(label="YouTube URL 입력") analyze_button = gr.Button("분석하기") output = gr.HTML(label="결과") cached_data = gr.State({"url": "", "title": "", "script": "", "processed_sections": []}) def analyze(url, cache): if url == cache["url"]: return display_script_and_summary(cache["title"], cache["script"], cache["processed_sections"]), cache title, script, processed_sections = get_youtube_script(url) new_cache = {"url": url, "title": title, "script": script, "processed_sections": processed_sections} return display_script_and_summary(title, script, processed_sections), new_cache analyze_button.click( analyze, inputs=[youtube_url_input, cached_data], outputs=[output, cached_data] ) demo.launch(share=True)