File size: 6,373 Bytes
073d3e8
 
 
 
 
4a45930
073d3e8
 
1966494
 
073d3e8
 
 
 
1966494
 
073d3e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1966494
 
 
 
 
 
 
073d3e8
 
 
 
1966494
073d3e8
1966494
 
 
 
 
073d3e8
1966494
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
073d3e8
 
1966494
073d3e8
1966494
 
 
073d3e8
 
 
 
1966494
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
073d3e8
 
4a45930
073d3e8
 
 
1966494
073d3e8
1966494
073d3e8
 
1966494
 
073d3e8
1966494
 
 
073d3e8
 
 
 
1966494
073d3e8
 
4a45930
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import gradio as gr
from gradio_client import Client
import json
import logging
import ast
import openai
import os
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from multiprocessing import Pool, cpu_count

logging.basicConfig(filename='youtube_script_extractor.log', level=logging.DEBUG, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

openai.api_key = os.getenv("OPENAI_API_KEY")

def parse_api_response(response):
    try:
        if isinstance(response, str):
            response = ast.literal_eval(response)
        if isinstance(response, list) and len(response) > 0:
            response = response[0]
        if not isinstance(response, dict):
            raise ValueError(f"μ˜ˆμƒμΉ˜ λͺ»ν•œ 응닡 ν˜•μ‹μž…λ‹ˆλ‹€. 받은 데이터 νƒ€μž…: {type(response)}")
        return response
    except Exception as e:
        raise ValueError(f"API 응닡 νŒŒμ‹± μ‹€νŒ¨: {str(e)}")

def get_youtube_script(url):
    logging.info(f"슀크립트 μΆ”μΆœ μ‹œμž‘: URL = {url}")
    client = Client("whispersound/YT_Ts_R")

    try:
        logging.debug("API 호좜 μ‹œμž‘")
        result = client.predict(youtube_url=url, api_name="/predict")
        logging.debug("API 호좜 μ™„λ£Œ")

        parsed_result = parse_api_response(result)
        
        title = parsed_result["data"][0]["title"]
        transcription_text = parsed_result["data"][0]["transcriptionAsText"]
        original_sections = parsed_result["data"][0]["sections"]
        
        merged_sections = merge_sections(original_sections)
        processed_sections = process_merged_sections_parallel(merged_sections)
        
        logging.info("슀크립트 μΆ”μΆœ 및 처리 μ™„λ£Œ")
        return title, transcription_text, processed_sections

    except Exception as e:
        error_msg = f"슀크립트 μΆ”μΆœ 쀑 였λ₯˜ λ°œμƒ: {str(e)}"
        logging.exception(error_msg)
        return "", "", []

def is_same_topic_tfidf(text1, text2, threshold=0.3):
    vectorizer = TfidfVectorizer().fit([text1, text2])
    vectors = vectorizer.transform([text1, text2])
    similarity = (vectors[0] * vectors[1].T).A[0][0]
    return similarity > threshold

def merge_sections(sections, min_duration=60, max_duration=300):
    merged_sections = []
    current_section = sections[0].copy()
    
    for section in sections[1:]:
        duration = current_section['end_time'] - current_section['start_time']
        
        if duration < min_duration:
            current_section['end_time'] = section['end_time']
            current_section['text'] += ' ' + section['text']
        elif duration >= max_duration:
            merged_sections.append(current_section)
            current_section = section.copy()
        else:
            if is_same_topic_tfidf(current_section['text'], section['text']):
                current_section['end_time'] = section['end_time']
                current_section['text'] += ' ' + section['text']
            else:
                merged_sections.append(current_section)
                current_section = section.copy()
    
    merged_sections.append(current_section)
    return merged_sections

def summarize_section(section_text):
    prompt = f"""
λ‹€μŒ 유튜브 λŒ€λ³Έ μ„Ήμ…˜μ˜ 핡심 λ‚΄μš©μ„ κ°„κ²°ν•˜κ²Œ μš”μ•½ν•˜μ„Έμš”:
1. ν•œκΈ€λ‘œ μž‘μ„±ν•˜μ„Έμš”.
2. μ£Όμš” 논점과 μ€‘μš”ν•œ 세뢀사항을 ν¬ν•¨ν•˜μ„Έμš”.
3. μš”μ•½μ€ 2-3λ¬Έμž₯으둜 μ œν•œν•˜μ„Έμš”.

μ„Ήμ…˜ λ‚΄μš©:
{section_text}
"""
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=150,
            temperature=0.3,
            top_p=0.9
        )
        return response['choices'][0]['message']['content']
    except Exception as e:
        logging.exception("μš”μ•½ 생성 쀑 였λ₯˜ λ°œμƒ")
        return "μš”μ•½μ„ μƒμ„±ν•˜λŠ” λ™μ•ˆ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€."

def process_section(section):
    summary = summarize_section(section['text'])
    return {
        'start_time': section['start_time'],
        'end_time': section['end_time'],
        'summary': summary
    }

def process_merged_sections_parallel(merged_sections):
    with Pool(processes=cpu_count()) as pool:
        return pool.map(process_section, merged_sections)

def format_time(seconds):
    minutes, seconds = divmod(seconds, 60)
    hours, minutes = divmod(minutes, 60)
    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"

def generate_timeline_summary(processed_sections):
    timeline_summary = ""
    for i, section in enumerate(processed_sections, 1):
        start_time = format_time(section['start_time'])
        end_time = format_time(section['end_time'])
        timeline_summary += f"{start_time} - {end_time} {i}. {section['summary']}\n\n"
    return timeline_summary

def display_script_and_summary(title, script, processed_sections):
    timeline_summary = generate_timeline_summary(processed_sections)
    
    script_html = f"""<h2 style='font-size:24px;'>{title}</h2>
    <h3>νƒ€μž„λΌμΈ μš”μ•½:</h3>
    <div style="white-space: pre-wrap; max-height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">
        {timeline_summary}
    </div>
    <details>
        <summary><h3>원문 슀크립트 (ν΄λ¦­ν•˜μ—¬ 펼치기)</h3></summary>
        <div style="white-space: pre-wrap;">{script}</div>
    </details>"""
    return script_html

with gr.Blocks() as demo:
    gr.Markdown("## YouTube 슀크립트 μΆ”μΆœ 및 μš”μ•½ 도ꡬ")

    youtube_url_input = gr.Textbox(label="YouTube URL μž…λ ₯")
    analyze_button = gr.Button("λΆ„μ„ν•˜κΈ°")
    output = gr.HTML(label="κ²°κ³Ό")
    
    cached_data = gr.State({"url": "", "title": "", "script": "", "processed_sections": []})

    def analyze(url, cache):
        if url == cache["url"]:
            return display_script_and_summary(cache["title"], cache["script"], cache["processed_sections"]), cache

        title, script, processed_sections = get_youtube_script(url)
        new_cache = {"url": url, "title": title, "script": script, "processed_sections": processed_sections}
        return display_script_and_summary(title, script, processed_sections), new_cache

    analyze_button.click(
        analyze, 
        inputs=[youtube_url_input, cached_data], 
        outputs=[output, cached_data]
    )

demo.launch(share=True)