N_B_analysis-3 / app.py
Kims12's picture
Create app.py
d5fb63f verified
raw
history blame
13.4 kB
import gradio as gr
import requests
from bs4 import BeautifulSoup
import urllib.parse # iframe ๊ฒฝ๋กœ ๋ณด์ •์„ ์œ„ํ•œ ๋ชจ๋“ˆ
import re
import logging
import tempfile
import pandas as pd
import mecab # pythonโ€‘mecabโ€‘ko ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ
import os
import time
import hmac
import hashlib
import base64
# ๋””๋ฒ„๊น…(๋กœ๊ทธ)์šฉ ํ•จ์ˆ˜
def debug_log(message: str):
print(f"[DEBUG] {message}")
# =============================================================================
# [๊ธฐ๋ณธ์ฝ”๋“œ]: ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ์—์„œ ์ œ๋ชฉ๊ณผ ๋ณธ๋ฌธ์„ ์ถ”์ถœํ•˜๋Š” ํ•จ์ˆ˜
# =============================================================================
def scrape_naver_blog(url: str) -> str:
debug_log("scrape_naver_blog ํ•จ์ˆ˜ ์‹œ์ž‘")
debug_log(f"์š”์ฒญ๋ฐ›์€ URL: {url}")
# ํ—ค๋” ์„ธํŒ…(ํฌ๋กค๋ง ์ฐจ๋‹จ ๋ฐฉ์ง€)
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/96.0.4664.110 Safari/537.36"
)
}
try:
# 1) ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ๋ฉ”์ธ ํŽ˜์ด์ง€ ์š”์ฒญ
response = requests.get(url, headers=headers)
debug_log("HTTP GET ์š”์ฒญ(๋ฉ”์ธ ํŽ˜์ด์ง€) ์™„๋ฃŒ")
if response.status_code != 200:
debug_log(f"์š”์ฒญ ์‹คํŒจ, ์ƒํƒœ์ฝ”๋“œ: {response.status_code}")
return f"์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์ƒํƒœ์ฝ”๋“œ: {response.status_code}"
soup = BeautifulSoup(response.text, "html.parser")
debug_log("HTML ํŒŒ์‹ฑ(๋ฉ”์ธ ํŽ˜์ด์ง€) ์™„๋ฃŒ")
# 2) iframe ํƒœ๊ทธ ์ฐพ๊ธฐ
iframe = soup.select_one("iframe#mainFrame")
if not iframe:
debug_log("iframe#mainFrame ํƒœ๊ทธ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
return "๋ณธ๋ฌธ iframe์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
iframe_src = iframe.get("src")
if not iframe_src:
debug_log("iframe src๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.")
return "๋ณธ๋ฌธ iframe์˜ src๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
# 3) iframe src๊ฐ€ ์ƒ๋Œ€๊ฒฝ๋กœ์ธ ๊ฒฝ์šฐ ์ ˆ๋Œ€๊ฒฝ๋กœ๋กœ ๋ณด์ •
parsed_iframe_url = urllib.parse.urljoin(url, iframe_src)
debug_log(f"iframe ํŽ˜์ด์ง€ ์š”์ฒญ URL: {parsed_iframe_url}")
# 4) iframe ํŽ˜์ด์ง€ ์žฌ์š”์ฒญ
iframe_response = requests.get(parsed_iframe_url, headers=headers)
debug_log("HTTP GET ์š”์ฒญ(iframe ํŽ˜์ด์ง€) ์™„๋ฃŒ")
if iframe_response.status_code != 200:
debug_log(f"iframe ์š”์ฒญ ์‹คํŒจ, ์ƒํƒœ์ฝ”๋“œ: {iframe_response.status_code}")
return f"iframe์—์„œ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์ƒํƒœ์ฝ”๋“œ: {iframe_response.status_code}"
iframe_soup = BeautifulSoup(iframe_response.text, "html.parser")
debug_log("HTML ํŒŒ์‹ฑ(iframe ํŽ˜์ด์ง€) ์™„๋ฃŒ")
# ์ œ๋ชฉ ์ถ”์ถœ
title_div = iframe_soup.select_one('.se-module.se-module-text.se-title-text')
title = title_div.get_text(strip=True) if title_div else "์ œ๋ชฉ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
debug_log(f"์ถ”์ถœ๋œ ์ œ๋ชฉ: {title}")
# ๋ณธ๋ฌธ ์ถ”์ถœ
content_div = iframe_soup.select_one('.se-main-container')
if content_div:
content = content_div.get_text("\n", strip=True)
else:
content = "๋ณธ๋ฌธ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
debug_log("๋ณธ๋ฌธ ์ถ”์ถœ ์™„๋ฃŒ")
# ๊ฒฐ๊ณผ ํ•ฉ์น˜๊ธฐ
result = f"[์ œ๋ชฉ]\n{title}\n\n[๋ณธ๋ฌธ]\n{content}"
debug_log("์ œ๋ชฉ๊ณผ ๋ณธ๋ฌธ์„ ํ•ฉ์ณ ๋ฐ˜ํ™˜ ์ค€๋น„ ์™„๋ฃŒ")
return result
except Exception as e:
debug_log(f"์—๋Ÿฌ ๋ฐœ์ƒ: {str(e)}")
return f"์Šคํฌ๋ž˜ํ•‘ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค: {str(e)}"
# =============================================================================
# [์ฐธ์กฐ์ฝ”๋“œ-1]: ํ˜•ํƒœ์†Œ ๋ถ„์„ ํ•จ์ˆ˜ (Mecab ์ด์šฉ)
# =============================================================================
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def analyze_text(text: str):
logger.debug("์›๋ณธ ํ…์ŠคํŠธ: %s", text)
# 1. ํ•œ๊ตญ์–ด๋งŒ ๋‚จ๊ธฐ๊ธฐ (๊ณต๋ฐฑ, ์˜์–ด, ๊ธฐํ˜ธ ๋“ฑ ์ œ๊ฑฐ)
filtered_text = re.sub(r'[^๊ฐ€-ํžฃ]', '', text)
logger.debug("ํ•„ํ„ฐ๋ง๋œ ํ…์ŠคํŠธ (ํ•œ๊ตญ์–ด๋งŒ, ๊ณต๋ฐฑ ์ œ๊ฑฐ): %s", filtered_text)
if not filtered_text:
logger.debug("์œ ํšจํ•œ ํ•œ๊ตญ์–ด ํ…์ŠคํŠธ๊ฐ€ ์—†์Œ.")
return pd.DataFrame(columns=["๋‹จ์–ด", "๋นˆ๋„์ˆ˜"]), ""
# 2. Mecab์„ ์ด์šฉํ•œ ํ˜•ํƒœ์†Œ ๋ถ„์„ (๋ช…์‚ฌ์™€ ๋ณตํ•ฉ๋ช…์‚ฌ๋งŒ ์ถ”์ถœ)
mecab_instance = mecab.MeCab() # ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
tokens = mecab_instance.pos(filtered_text)
logger.debug("ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ: %s", tokens)
freq = {}
for word, pos in tokens:
if word and word.strip():
if pos.startswith("NN"):
freq[word] = freq.get(word, 0) + 1
logger.debug("๋‹จ์–ด: %s, ํ’ˆ์‚ฌ: %s, ํ˜„์žฌ ๋นˆ๋„: %d", word, pos, freq[word])
# 3. ๋นˆ๋„์ˆ˜๋ฅผ ๋‚ด๋ฆผ์ฐจ์ˆœ ์ •๋ ฌ
sorted_freq = sorted(freq.items(), key=lambda x: x[1], reverse=True)
logger.debug("๋‚ด๋ฆผ์ฐจ์ˆœ ์ •๋ ฌ๋œ ๋‹จ์–ด ๋นˆ๋„: %s", sorted_freq)
# 4. ๊ฒฐ๊ณผ DataFrame ์ƒ์„ฑ
df = pd.DataFrame(sorted_freq, columns=["๋‹จ์–ด", "๋นˆ๋„์ˆ˜"])
logger.debug("๊ฒฐ๊ณผ DataFrame ์ƒ์„ฑ๋จ, shape: %s", df.shape)
# 5. Excel ํŒŒ์ผ ์ƒ์„ฑ (์ž„์‹œ ํŒŒ์ผ ์ €์žฅ)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx")
df.to_excel(temp_file.name, index=False, engine='openpyxl')
temp_file.close()
logger.debug("Excel ํŒŒ์ผ ์ƒ์„ฑ๋จ: %s", temp_file.name)
return df, temp_file.name
# =============================================================================
# [์ฐธ์กฐ์ฝ”๋“œ-2]: ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰๋Ÿ‰ ๋ฐ ๋ธ”๋กœ๊ทธ ๋ฌธ์„œ์ˆ˜ ์กฐํšŒ ๊ด€๋ จ ํ•จ์ˆ˜
# =============================================================================
def generate_signature(timestamp, method, uri, secret_key):
message = f"{timestamp}.{method}.{uri}"
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(digest).decode()
def get_header(method, uri, api_key, secret_key, customer_id):
timestamp = str(round(time.time() * 1000))
signature = generate_signature(timestamp, method, uri, secret_key)
return {
"Content-Type": "application/json; charset=UTF-8",
"X-Timestamp": timestamp,
"X-API-KEY": api_key,
"X-Customer": str(customer_id),
"X-Signature": signature
}
def fetch_related_keywords(keyword):
API_KEY = os.environ["NAVER_API_KEY"]
SECRET_KEY = os.environ["NAVER_SECRET_KEY"]
CUSTOMER_ID = os.environ["NAVER_CUSTOMER_ID"]
BASE_URL = "https://api.naver.com"
uri = "/keywordstool"
method = "GET"
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID)
params = {
"hintKeywords": [keyword],
"showDetail": "1"
}
response = requests.get(BASE_URL + uri, params=params, headers=headers)
data = response.json()
if "keywordList" not in data:
return pd.DataFrame()
df = pd.DataFrame(data["keywordList"])
if len(df) > 100:
df = df.head(100)
def parse_count(x):
try:
return int(str(x).replace(",", ""))
except:
return 0
df["PC์›”๊ฒ€์ƒ‰๋Ÿ‰"] = df["monthlyPcQcCnt"].apply(parse_count)
df["๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰"] = df["monthlyMobileQcCnt"].apply(parse_count)
df["ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰"] = df["PC์›”๊ฒ€์ƒ‰๋Ÿ‰"] + df["๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰"]
df.rename(columns={"relKeyword": "์ •๋ณดํ‚ค์›Œ๋“œ"}, inplace=True)
result_df = df[["์ •๋ณดํ‚ค์›Œ๋“œ", "PC์›”๊ฒ€์ƒ‰๋Ÿ‰", "๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰", "ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰"]]
return result_df
def fetch_blog_count(keyword):
client_id = os.environ["NAVER_SEARCH_CLIENT_ID"]
client_secret = os.environ["NAVER_SEARCH_CLIENT_SECRET"]
url = "https://openapi.naver.com/v1/search/blog.json"
headers = {
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
params = {"query": keyword, "display": 1}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
return data.get("total", 0)
else:
return 0
def create_excel_file(df):
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
excel_path = tmp.name
df.to_excel(excel_path, index=False)
return excel_path
def process_keyword(keywords: str, include_related: bool):
"""
์—ฌ๋Ÿฌ ํ‚ค์›Œ๋“œ๋ฅผ ์—”ํ„ฐ๋กœ ๊ตฌ๋ถ„ํ•˜์—ฌ ๋ฆฌ์ŠคํŠธ๋กœ ๋งŒ๋“ค๊ณ ,
๊ฐ ํ‚ค์›Œ๋“œ์— ๋Œ€ํ•ด ๋„ค์ด๋ฒ„ ๊ด‘๊ณ  API๋กœ ๊ฒ€์ƒ‰๋Ÿ‰ ์ •๋ณด๋ฅผ ์กฐํšŒํ•˜๋ฉฐ,
์ฒซ ๋ฒˆ์งธ ํ‚ค์›Œ๋“œ์˜ ๊ฒฝ์šฐ ์˜ต์…˜์— ๋”ฐ๋ผ ์—ฐ๊ด€๊ฒ€์ƒ‰์–ด๋„ ์ถ”๊ฐ€ํ•œ ํ›„,
๊ฐ ์ •๋ณดํ‚ค์›Œ๋“œ์— ๋Œ€ํ•ด ๋ธ”๋กœ๊ทธ ๋ฌธ์„œ์ˆ˜๋ฅผ ์กฐํšŒํ•˜์—ฌ DataFrame๊ณผ Excel ํŒŒ์ผ์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
"""
input_keywords = [k.strip() for k in keywords.splitlines() if k.strip()]
result_dfs = []
for idx, kw in enumerate(input_keywords):
df_kw = fetch_related_keywords(kw)
if df_kw.empty:
continue
row_kw = df_kw[df_kw["์ •๋ณดํ‚ค์›Œ๋“œ"] == kw]
if not row_kw.empty:
result_dfs.append(row_kw)
else:
result_dfs.append(df_kw.head(1))
if include_related and idx == 0:
df_related = df_kw[df_kw["์ •๋ณดํ‚ค์›Œ๋“œ"] != kw]
if not df_related.empty:
result_dfs.append(df_related)
if result_dfs:
result_df = pd.concat(result_dfs, ignore_index=True)
result_df.drop_duplicates(subset=["์ •๋ณดํ‚ค์›Œ๋“œ"], inplace=True)
else:
result_df = pd.DataFrame(columns=["์ •๋ณดํ‚ค์›Œ๋“œ", "PC์›”๊ฒ€์ƒ‰๋Ÿ‰", "๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰", "ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰"])
result_df["๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜"] = result_df["์ •๋ณดํ‚ค์›Œ๋“œ"].apply(fetch_blog_count)
result_df.sort_values(by="ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰", ascending=False, inplace=True)
return result_df, create_excel_file(result_df)
# =============================================================================
# ํ†ตํ•ฉ ์ฒ˜๋ฆฌ ํ•จ์ˆ˜: ๋ธ”๋กœ๊ทธ ๋‚ด์šฉ(ํ…์ŠคํŠธ)์— ๋Œ€ํ•ด ํ˜•ํƒœ์†Œ ๋ถ„์„์„ ์ˆ˜ํ–‰ํ•œ ํ›„,
# ํ‚ค์›Œ๋“œ์˜ ๊ฒ€์ƒ‰๋Ÿ‰ ๋ฐ ๋ธ”๋กœ๊ทธ ๋ฌธ์„œ์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ์ตœ์ข… ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•จ.
# =============================================================================
def process_blog_content(text: str):
debug_log("process_blog_content ํ•จ์ˆ˜ ์‹œ์ž‘")
# 1. ํ˜•ํƒœ์†Œ ๋ถ„์„ ์‹คํ–‰ ([์ฐธ์กฐ์ฝ”๋“œ-1] ํ™œ์šฉ)
df_morph, morph_excel = analyze_text(text)
debug_log("ํ˜•ํƒœ์†Œ ๋ถ„์„ ์™„๋ฃŒ")
if df_morph.empty:
debug_log("ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ๊ฐ€ ๋น„์–ด์žˆ์Œ")
return df_morph, ""
# 2. ํ˜•ํƒœ์†Œ ๋ถ„์„๋œ ๋‹จ์–ด ๋ชฉ๋ก ์ถ”์ถœ (ํ‚ค์›Œ๋“œ ์กฐํšŒ์šฉ)
keywords = "\n".join(df_morph["๋‹จ์–ด"].tolist())
debug_log(f"์ถ”์ถœ๋œ ๋‹จ์–ด ๋ชฉ๋ก: {keywords}")
# 3. ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰๋Ÿ‰ ๋ฐ ๋ธ”๋กœ๊ทธ ๋ฌธ์„œ์ˆ˜ ์กฐํšŒ ([์ฐธ์กฐ์ฝ”๋“œ-2] ํ™œ์šฉ)
df_keyword, keyword_excel = process_keyword(keywords, include_related=False)
debug_log("ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰ ์ •๋ณด ์กฐํšŒ ์™„๋ฃŒ")
# 4. ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ์™€ ํ‚ค์›Œ๋“œ ์ •๋ณด๋ฅผ ๋‹จ์–ด ๊ธฐ์ค€์œผ๋กœ ๋ณ‘ํ•ฉ
df_merged = pd.merge(df_morph, df_keyword, left_on="๋‹จ์–ด", right_on="์ •๋ณดํ‚ค์›Œ๋“œ", how="left")
debug_log("๋ฐ์ดํ„ฐ ๋ณ‘ํ•ฉ ์™„๋ฃŒ")
df_merged.drop(columns=["์ •๋ณดํ‚ค์›Œ๋“œ"], inplace=True)
# 5. ๋ณ‘ํ•ฉ ๊ฒฐ๊ณผ๋ฅผ Excel ํŒŒ์ผ๋กœ ์ƒ์„ฑ
merged_excel = create_excel_file(df_merged)
debug_log(f"๋ณ‘ํ•ฉ ๊ฒฐ๊ณผ Excel ํŒŒ์ผ ์ƒ์„ฑ๋จ: {merged_excel}")
return df_merged, merged_excel
# =============================================================================
# Gradio ์ธํ„ฐํŽ˜์ด์Šค ๊ตฌ์„ฑ (ํ—ˆ๊น…ํŽ˜์ด์Šค ๊ทธ๋ผ๋””์˜ค ํ™˜๊ฒฝ)
# =============================================================================
with gr.Blocks() as demo:
gr.Markdown("# ๋ธ”๋กœ๊ทธ ๊ธ€ ํ˜•ํƒœ์†Œ ๋ถ„์„ ๋ฐ ํ‚ค์›Œ๋“œ ์ •๋ณด ์กฐํšŒ")
with gr.Tab("๋ธ”๋กœ๊ทธ ๋‚ด์šฉ ์ž…๋ ฅ ๋ฐ ์Šคํฌ๋ž˜ํ•‘"):
with gr.Row():
blog_url = gr.Textbox(label="๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ๋งํฌ", placeholder="์˜ˆ: https://blog.naver.com/ssboost/222983068507")
fetch_button = gr.Button("๋ธ”๋กœ๊ทธ๋‚ด์šฉ๊ฐ€์ ธ์˜ค๊ธฐ")
blog_content = gr.Textbox(label="๋ธ”๋กœ๊ทธ ๋‚ด์šฉ (์ œ๋ชฉ ๋ฐ ๋ณธ๋ฌธ)", lines=10, placeholder="๋ธ”๋กœ๊ทธ ๋‚ด์šฉ์„ ๊ฐ€์ ธ์˜ค๊ฑฐ๋‚˜ ์ง์ ‘ ์ž…๋ ฅํ•˜์„ธ์š”.")
# '๋ธ”๋กœ๊ทธ๋‚ด์šฉ๊ฐ€์ ธ์˜ค๊ธฐ' ๋ฒ„ํŠผ ํด๋ฆญ ์‹œ ์Šคํฌ๋ž˜ํ•‘ ์‹คํ–‰ํ•˜์—ฌ blog_content์— ๋ฐ˜์˜
fetch_button.click(fn=scrape_naver_blog, inputs=blog_url, outputs=blog_content)
with gr.Tab("ํ˜•ํƒœ์†Œ ๋ถ„์„ ์‹คํ–‰"):
with gr.Row():
analysis_button = gr.Button("ํ˜•ํƒœ์†Œ๋ถ„์„")
# ๋ถ„์„ ๊ฒฐ๊ณผ๋Š” ์ˆ˜์ • ๊ฐ€๋Šฅํ•˜๋„๋ก interactive=True ์„ค์ •
output_table = gr.Dataframe(label="๋ถ„์„ ๊ฒฐ๊ณผ (ํ˜•ํƒœ์†Œ ๋ฐ ํ‚ค์›Œ๋“œ ์ •๋ณด)", interactive=True)
output_file = gr.File(label="Excel ๋‹ค์šด๋กœ๋“œ")
# 'ํ˜•ํƒœ์†Œ๋ถ„์„' ๋ฒ„ํŠผ ํด๋ฆญ ์‹œ process_blog_content ํ•จ์ˆ˜ ์‹คํ–‰
analysis_button.click(fn=process_blog_content, inputs=blog_content, outputs=[output_table, output_file])
if __name__ == "__main__":
debug_log("Gradio ์•ฑ ์‹คํ–‰ ์‹œ์ž‘")
demo.launch()
debug_log("Gradio ์•ฑ ์‹คํ–‰ ์ข…๋ฃŒ")