#!/usr/bin/env python import os import re import tempfile import gc # garbage collector 추가 from collections.abc import Iterator from threading import Thread import json import requests import cv2 import base64 import logging import time from urllib.parse import quote # URL 인코딩을 위해 추가 import gradio as gr import spaces import torch from loguru import logger from PIL import Image from transformers import AutoProcessor, Gemma3ForConditionalGeneration, TextIteratorStreamer # CSV/TXT/PDF 분석 import pandas as pd import PyPDF2 # ============================================================================= # (신규) 이미지 API 관련 함수들 # ============================================================================= from gradio_client import Client API_URL = "http://211.233.58.201:7896" logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) def test_api_connection() -> str: """API 서버 연결 테스트""" try: client = Client(API_URL) return "API 연결 성공: 정상 작동 중" except Exception as e: logging.error(f"API 연결 테스트 실패: {e}") return f"API 연결 실패: {e}" def generate_image(prompt: str, width: float, height: float, guidance: float, inference_steps: float, seed: float): """이미지 생성 함수 (반환 형식에 유연하게 대응)""" if not prompt: return None, "오류: 프롬프트가 필요합니다." try: logging.info(f"프롬프트를 사용하여 이미지 생성 API 호출: {prompt}") client = Client(API_URL) result = client.predict( prompt=prompt, width=int(width), height=int(height), guidance=float(guidance), inference_steps=int(inference_steps), seed=int(seed), do_img2img=False, init_image=None, image2image_strength=0.8, resize_img=True, api_name="/generate_image" ) logging.info(f"이미지 생성 결과: {type(result)}, 길이: {len(result) if isinstance(result, (list, tuple)) else '알 수 없음'}") # 결과가 튜플이나 리스트 형태로 반환되는 경우 처리 if isinstance(result, (list, tuple)) and len(result) > 0: image_data = result[0] # 첫 번째 요소가 이미지 데이터 seed_info = result[1] if len(result) > 1 else "알 수 없는 시드" return image_data, seed_info else: # 다른 형태로 반환된 경우 (단일 값인 경우) return result, "알 수 없는 시드" except Exception as e: logging.error(f"이미지 생성 실패: {str(e)}") return None, f"오류: {str(e)}" # Base64 패딩 수정 함수 def fix_base64_padding(data): """Base64 문자열의 패딩을 수정합니다.""" if isinstance(data, bytes): data = data.decode('utf-8') # base64,로 시작하는 부분 제거 if "base64," in data: data = data.split("base64,", 1)[1] # 패딩 문자 추가 (4의 배수 길이가 되도록) missing_padding = len(data) % 4 if missing_padding: data += '=' * (4 - missing_padding) return data # ============================================================================= # 메모리 정리 함수 # ============================================================================= def clear_cuda_cache(): """CUDA 캐시를 명시적으로 비웁니다.""" if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() # ============================================================================= # SerpHouse 관련 함수 # ============================================================================= SERPHOUSE_API_KEY = os.getenv("SERPHOUSE_API_KEY", "") def extract_keywords(text: str, top_k: int = 5) -> str: """단순 키워드 추출: 한글, 영어, 숫자, 공백만 남김""" text = re.sub(r"[^a-zA-Z0-9가-힣\s]", "", text) tokens = text.split() return " ".join(tokens[:top_k]) def do_web_search(query: str) -> str: """SerpHouse LIVE API 호출하여 검색 결과 마크다운 반환""" try: url = "https://api.serphouse.com/serp/live" params = { "q": query, "domain": "google.com", "serp_type": "web", "device": "desktop", "lang": "en", "num": "20" } headers = {"Authorization": f"Bearer {SERPHOUSE_API_KEY}"} logger.info(f"SerpHouse API 호출 중... 검색어: {query}") response = requests.get(url, headers=headers, params=params, timeout=60) response.raise_for_status() data = response.json() results = data.get("results", {}) organic = None if isinstance(results, dict) and "organic" in results: organic = results["organic"] elif isinstance(results, dict) and "results" in results: if isinstance(results["results"], dict) and "organic" in results["results"]: organic = results["results"]["organic"] elif "organic" in data: organic = data["organic"] if not organic: logger.warning("응답에서 organic 결과를 찾을 수 없습니다.") return "웹 검색 결과가 없거나 API 응답 구조가 예상과 다릅니다." max_results = min(20, len(organic)) limited_organic = organic[:max_results] summary_lines = [] for idx, item in enumerate(limited_organic, start=1): title = item.get("title", "제목 없음") link = item.get("link", "#") snippet = item.get("snippet", "설명 없음") displayed_link = item.get("displayed_link", link) summary_lines.append( f"### 결과 {idx}: {title}\n\n" f"{snippet}\n\n" f"**출처**: [{displayed_link}]({link})\n\n" f"---\n" ) instructions = """ # 웹 검색 결과 아래는 검색 결과입니다. 질문에 답변할 때 이 정보를 활용하세요: 1. 각 결과의 제목, 내용, 출처 링크를 참고하세요. 2. 답변에 관련 정보의 출처를 명시적으로 인용하세요 (예: "[출처 제목](링크)"). 3. 응답에 실제 출처 링크를 포함하세요. 4. 여러 출처의 정보를 종합하여 답변하세요. 5. 마지막에 "참고 자료:" 섹션을 추가하고 주요 출처 링크를 나열하세요. """ return instructions + "\n".join(summary_lines) except Exception as e: logger.error(f"웹 검색 실패: {e}") return f"웹 검색 실패: {str(e)}" # ============================================================================= # 모델 및 프로세서 로딩 # ============================================================================= MAX_CONTENT_CHARS = 2000 MAX_INPUT_LENGTH = 2096 model_id = os.getenv("MODEL_ID", "VIDraft/Gemma-3-R1984-4B") processor = AutoProcessor.from_pretrained(model_id, padding_side="left") model = Gemma3ForConditionalGeneration.from_pretrained( model_id, device_map="auto", torch_dtype=torch.bfloat16, attn_implementation="eager" ) MAX_NUM_IMAGES = int(os.getenv("MAX_NUM_IMAGES", "5")) # ============================================================================= # CSV, TXT, PDF 분석 함수들 # ============================================================================= def analyze_csv_file(path: str) -> str: try: df = pd.read_csv(path) if df.shape[0] > 50 or df.shape[1] > 10: df = df.iloc[:50, :10] df_str = df.to_string() if len(df_str) > MAX_CONTENT_CHARS: df_str = df_str[:MAX_CONTENT_CHARS] + "\n...(일부 생략)..." return f"**[CSV 파일: {os.path.basename(path)}]**\n\n{df_str}" except Exception as e: return f"CSV 파일 읽기 실패 ({os.path.basename(path)}): {str(e)}" def analyze_txt_file(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: text = f.read() if len(text) > MAX_CONTENT_CHARS: text = text[:MAX_CONTENT_CHARS] + "\n...(일부 생략)..." return f"**[TXT 파일: {os.path.basename(path)}]**\n\n{text}" except Exception as e: return f"TXT 파일 읽기 실패 ({os.path.basename(path)}): {str(e)}" def pdf_to_markdown(pdf_path: str) -> str: text_chunks = [] try: with open(pdf_path, "rb") as f: reader = PyPDF2.PdfReader(f) max_pages = min(5, len(reader.pages)) for page_num in range(max_pages): page_text = reader.pages[page_num].extract_text() or "" page_text = page_text.strip() if page_text: if len(page_text) > MAX_CONTENT_CHARS // max_pages: page_text = page_text[:MAX_CONTENT_CHARS // max_pages] + "...(일부 생략)" text_chunks.append(f"## 페이지 {page_num+1}\n\n{page_text}\n") if len(reader.pages) > max_pages: text_chunks.append(f"\n...(전체 {len(reader.pages)}페이지 중 {max_pages}페이지만 표시)...") except Exception as e: return f"PDF 파일 읽기 실패 ({os.path.basename(pdf_path)}): {str(e)}" full_text = "\n".join(text_chunks) if len(full_text) > MAX_CONTENT_CHARS: full_text = full_text[:MAX_CONTENT_CHARS] + "\n...(일부 생략)..." return f"**[PDF 파일: {os.path.basename(pdf_path)}]**\n\n{full_text}" # ============================================================================= # 이미지/비디오 파일 제한 검사 # ============================================================================= def count_files_in_new_message(paths: list[str]) -> tuple[int, int]: image_count = 0 video_count = 0 for path in paths: if path.endswith(".mp4"): video_count += 1 elif re.search(r"\.(png|jpg|jpeg|gif|webp)$", path, re.IGNORECASE): image_count += 1 return image_count, video_count def count_files_in_history(history: list[dict]) -> tuple[int, int]: image_count = 0 video_count = 0 for item in history: if item["role"] != "user" or isinstance(item["content"], str): continue if isinstance(item["content"], list) and len(item["content"]) > 0: file_path = item["content"][0] if isinstance(file_path, str): if file_path.endswith(".mp4"): video_count += 1 elif re.search(r"\.(png|jpg|jpeg|gif|webp)$", file_path, re.IGNORECASE): image_count += 1 return image_count, video_count def validate_media_constraints(message: dict, history: list[dict]) -> bool: media_files = [f for f in message["files"] if re.search(r"\.(png|jpg|jpeg|gif|webp)$", f, re.IGNORECASE) or f.endswith(".mp4")] new_image_count, new_video_count = count_files_in_new_message(media_files) history_image_count, history_video_count = count_files_in_history(history) image_count = history_image_count + new_image_count video_count = history_video_count + new_video_count if video_count > 1: gr.Warning("비디오 파일은 하나만 지원됩니다.") return False if video_count == 1: if image_count > 0: gr.Warning("이미지와 비디오를 혼합하는 것은 허용되지 않습니다.") return False if "" in message["text"]: gr.Warning(" 태그와 비디오 파일은 함께 사용할 수 없습니다.") return False if video_count == 0 and image_count > MAX_NUM_IMAGES: gr.Warning(f"최대 {MAX_NUM_IMAGES}장의 이미지를 업로드할 수 있습니다.") return False if "" in message["text"]: image_files = [f for f in message["files"] if re.search(r"\.(png|jpg|jpeg|gif|webp)$", f, re.IGNORECASE)] image_tag_count = message["text"].count("") if image_tag_count != len(image_files): gr.Warning("텍스트에 있는 태그의 개수가 이미지 파일 개수와 일치하지 않습니다.") return False return True # ============================================================================= # 비디오 처리 함수 # ============================================================================= def downsample_video(video_path: str) -> list[tuple[Image.Image, float]]: vidcap = cv2.VideoCapture(video_path) fps = vidcap.get(cv2.CAP_PROP_FPS) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_interval = max(int(fps), int(total_frames / 10)) frames = [] for i in range(0, total_frames, frame_interval): vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) success, image = vidcap.read() if success: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (0, 0), fx=0.5, fy=0.5) pil_image = Image.fromarray(image) timestamp = round(i / fps, 2) frames.append((pil_image, timestamp)) if len(frames) >= 5: break vidcap.release() return frames def process_video(video_path: str) -> tuple[list[dict], list[str]]: content = [] temp_files = [] frames = downsample_video(video_path) for pil_image, timestamp in frames: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: pil_image.save(temp_file.name) temp_files.append(temp_file.name) content.append({"type": "text", "text": f"프레임 {timestamp}:"}) content.append({"type": "image", "url": temp_file.name}) return content, temp_files # ============================================================================= # interleaved 처리 함수 # ============================================================================= def process_interleaved_images(message: dict) -> list[dict]: parts = re.split(r"()", message["text"]) content = [] image_files = [f for f in message["files"] if re.search(r"\.(png|jpg|jpeg|gif|webp)$", f, re.IGNORECASE)] image_index = 0 for part in parts: if part == "" and image_index < len(image_files): content.append({"type": "image", "url": image_files[image_index]}) image_index += 1 elif part.strip(): content.append({"type": "text", "text": part.strip()}) else: if isinstance(part, str) and part != "": content.append({"type": "text", "text": part}) return content # ============================================================================= # 파일 처리 -> content 생성 # ============================================================================= def is_image_file(file_path: str) -> bool: return bool(re.search(r"\.(png|jpg|jpeg|gif|webp)$", file_path, re.IGNORECASE)) def is_video_file(file_path: str) -> bool: return file_path.endswith(".mp4") def is_document_file(file_path: str) -> bool: return file_path.lower().endswith(".pdf") or file_path.lower().endswith(".csv") or file_path.lower().endswith(".txt") def process_new_user_message(message: dict) -> tuple[list[dict], list[str]]: temp_files = [] if not message["files"]: return [{"type": "text", "text": message["text"]}], temp_files video_files = [f for f in message["files"] if is_video_file(f)] image_files = [f for f in message["files"] if is_image_file(f)] csv_files = [f for f in message["files"] if f.lower().endswith(".csv")] txt_files = [f for f in message["files"] if f.lower().endswith(".txt")] pdf_files = [f for f in message["files"] if f.lower().endswith(".pdf")] content_list = [{"type": "text", "text": message["text"]}] for csv_path in csv_files: content_list.append({"type": "text", "text": analyze_csv_file(csv_path)}) for txt_path in txt_files: content_list.append({"type": "text", "text": analyze_txt_file(txt_path)}) for pdf_path in pdf_files: content_list.append({"type": "text", "text": pdf_to_markdown(pdf_path)}) if video_files: video_content, video_temp_files = process_video(video_files[0]) content_list += video_content temp_files.extend(video_temp_files) return content_list, temp_files if "" in message["text"] and image_files: interleaved_content = process_interleaved_images({"text": message["text"], "files": image_files}) if content_list and content_list[0]["type"] == "text": content_list = content_list[1:] return interleaved_content + content_list, temp_files else: for img_path in image_files: content_list.append({"type": "image", "url": img_path}) return content_list, temp_files # ============================================================================= # history -> LLM 메시지 변환 # ============================================================================= def process_history(history: list[dict]) -> list[dict]: messages = [] current_user_content = [] for item in history: if item["role"] == "assistant": if current_user_content: messages.append({"role": "user", "content": current_user_content}) current_user_content = [] messages.append({"role": "assistant", "content": [{"type": "text", "text": item["content"]}]}) else: content = item["content"] if isinstance(content, str): current_user_content.append({"type": "text", "text": content}) elif isinstance(content, list) and len(content) > 0: file_path = content[0] if is_image_file(file_path): current_user_content.append({"type": "image", "url": file_path}) else: current_user_content.append({"type": "text", "text": f"[파일: {os.path.basename(file_path)}]"}) if current_user_content: messages.append({"role": "user", "content": current_user_content}) return messages # ============================================================================= # 모델 생성 함수 (OOM 캐치) # ============================================================================= def _model_gen_with_oom_catch(**kwargs): try: model.generate(**kwargs) except torch.cuda.OutOfMemoryError: raise RuntimeError("[OutOfMemoryError] GPU 메모리가 부족합니다.") finally: clear_cuda_cache() # ============================================================================= # 메인 추론 함수 # ============================================================================= @spaces.GPU(duration=120) def run( message: dict, history: list[dict], system_prompt: str = "", max_new_tokens: int = 512, use_web_search: bool = False, web_search_query: str = "", age_group: str = "20대", mbti_personality: str = "INTP", sexual_openness: int = 2, image_gen: bool = False # "Image Gen" 체크 여부 ) -> Iterator[str]: if not validate_media_constraints(message, history): yield "" return temp_files = [] try: # 시스템 프롬프트에 페르소나 정보 추가 persona = ( f"{system_prompt.strip()}\n\n" f"성별: 여성\n" f"연령대: {age_group}\n" f"MBTI 페르소나: {mbti_personality}\n" f"섹슈얼 개방성 (1~5): {sexual_openness}\n" ) combined_system_msg = f"[시스템 프롬프트]\n{persona.strip()}\n\n" if use_web_search: user_text = message["text"] ws_query = extract_keywords(user_text) if ws_query.strip(): logger.info(f"[자동 웹 검색 키워드] {ws_query!r}") ws_result = do_web_search(ws_query) combined_system_msg += f"[검색 결과 (상위 20개 항목)]\n{ws_result}\n\n" combined_system_msg += ( "[참고: 위 검색 결과 링크를 출처로 인용하여 답변]\n" "[중요 지시사항]\n" "1. 답변에 검색 결과에서 찾은 정보의 출처를 반드시 인용하세요.\n" "2. 출처 인용 시 \"[출처 제목](링크)\" 형식의 마크다운 링크를 사용하세요.\n" "3. 여러 출처의 정보를 종합하여 답변하세요.\n" "4. 답변 마지막에 \"참고 자료:\" 섹션을 추가하고 사용한 주요 출처 링크를 나열하세요.\n" ) else: combined_system_msg += "[유효한 키워드가 없어 웹 검색을 건너뜁니다]\n\n" messages = [] if combined_system_msg.strip(): messages.append({"role": "system", "content": [{"type": "text", "text": combined_system_msg.strip()}]}) messages.extend(process_history(history)) user_content, user_temp_files = process_new_user_message(message) temp_files.extend(user_temp_files) for item in user_content: if item["type"] == "text" and len(item["text"]) > MAX_CONTENT_CHARS: item["text"] = item["text"][:MAX_CONTENT_CHARS] + "\n...(일부 생략)..." messages.append({"role": "user", "content": user_content}) inputs = processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ).to(device=model.device, dtype=torch.bfloat16) if inputs.input_ids.shape[1] > MAX_INPUT_LENGTH: inputs.input_ids = inputs.input_ids[:, -MAX_INPUT_LENGTH:] if 'attention_mask' in inputs: inputs.attention_mask = inputs.attention_mask[:, -MAX_INPUT_LENGTH:] streamer = TextIteratorStreamer(processor, timeout=30.0, skip_prompt=True, skip_special_tokens=True) gen_kwargs = dict(inputs, streamer=streamer, max_new_tokens=max_new_tokens) t = Thread(target=_model_gen_with_oom_catch, kwargs=gen_kwargs) t.start() output_so_far = "" for new_text in streamer: output_so_far += new_text yield output_so_far except Exception as e: logger.error(f"run 함수 에러: {str(e)}") yield f"죄송합니다. 오류가 발생했습니다: {str(e)}" finally: for tmp in temp_files: try: if os.path.exists(tmp): os.unlink(tmp) logger.info(f"임시 파일 삭제됨: {tmp}") except Exception as ee: logger.warning(f"임시 파일 {tmp} 삭제 실패: {ee}") try: del inputs, streamer except Exception: pass clear_cuda_cache() # 수정된 모델 실행 함수 - 이미지 생성 및 갤러리 출력 처리 def modified_run(message, history, system_prompt, max_new_tokens, use_web_search, web_search_query, age_group, mbti_personality, sexual_openness, image_gen): # 갤러리 초기화 및 숨기기 output_so_far = "" gallery_update = gr.Gallery(visible=False, value=[]) yield output_so_far, gallery_update # 기존 run 함수 로직 text_generator = run(message, history, system_prompt, max_new_tokens, use_web_search, web_search_query, age_group, mbti_personality, sexual_openness, image_gen) for text_chunk in text_generator: output_so_far = text_chunk yield output_so_far, gallery_update # 이미지 생성이 활성화된 경우 갤러리 업데이트 if image_gen and message["text"].strip(): try: width, height = 512, 512 guidance, steps, seed = 7.5, 30, 42 logger.info(f"갤러리용 이미지 생성 호출, 프롬프트: {message['text']}") # API 호출해서 이미지 생성 image_result, seed_info = generate_image( prompt=message["text"].strip(), width=width, height=height, guidance=guidance, inference_steps=steps, seed=seed ) if image_result: # 직접 이미지 데이터 처리: base64 문자열인 경우 if isinstance(image_result, str) and ( image_result.startswith('data:') or len(image_result) > 100 and '/' not in image_result ): # base64 이미지 문자열을 파일로 변환 try: # data:image 접두사 제거 if image_result.startswith('data:'): content_type, b64data = image_result.split(';base64,') else: b64data = image_result content_type = "image/webp" # 기본값으로 가정 # base64 디코딩 image_bytes = base64.b64decode(b64data) # 임시 파일로 저장 with tempfile.NamedTemporaryFile(delete=False, suffix=".webp") as temp_file: temp_file.write(image_bytes) temp_path = temp_file.name # 갤러리 표시 및 이미지 추가 gallery_update = gr.Gallery(visible=True, value=[temp_path]) yield output_so_far + "\n\n*이미지가 생성되어 아래 갤러리에 표시됩니다.*", gallery_update except Exception as e: logger.error(f"Base64 이미지 처리 오류: {e}") yield output_so_far + f"\n\n(이미지 처리 중 오류: {e})", gallery_update # 파일 경로인 경우 elif isinstance(image_result, str) and os.path.exists(image_result): # 로컬 파일 경로를 그대로 사용 gallery_update = gr.Gallery(visible=True, value=[image_result]) yield output_so_far + "\n\n*이미지가 생성되어 아래 갤러리에 표시됩니다.*", gallery_update # /tmp 경로인 경우 (API 서버에만 존재하는 파일) elif isinstance(image_result, str) and '/tmp/' in image_result: # API에서 반환된 파일 경로에서 이미지 정보 추출 try: # API 응답을 base64 인코딩된 문자열로 처리 client = Client(API_URL) result = client.predict( prompt=message["text"].strip(), api_name="/generate_base64_image" # base64 반환 API ) if isinstance(result, str) and (result.startswith('data:') or len(result) > 100): # base64 이미지 처리 if result.startswith('data:'): content_type, b64data = result.split(';base64,') else: b64data = result # base64 디코딩 image_bytes = base64.b64decode(b64data) # 임시 파일로 저장 with tempfile.NamedTemporaryFile(delete=False, suffix=".webp") as temp_file: temp_file.write(image_bytes) temp_path = temp_file.name # 갤러리 표시 및 이미지 추가 gallery_update = gr.Gallery(visible=True, value=[temp_path]) yield output_so_far + "\n\n*이미지가 생성되어 아래 갤러리에 표시됩니다.*", gallery_update else: yield output_so_far + "\n\n(이미지 생성 실패: 올바른 형식이 아닙니다)", gallery_update except Exception as e: logger.error(f"대체 API 호출 중 오류: {e}") yield output_so_far + f"\n\n(이미지 생성 실패: {e})", gallery_update # URL인 경우 elif isinstance(image_result, str) and ( image_result.startswith('http://') or image_result.startswith('https://') ): try: # URL에서 이미지 다운로드 response = requests.get(image_result, timeout=10) response.raise_for_status() # 임시 파일로 저장 with tempfile.NamedTemporaryFile(delete=False, suffix=".webp") as temp_file: temp_file.write(response.content) temp_path = temp_file.name # 갤러리 표시 및 이미지 추가 gallery_update = gr.Gallery(visible=True, value=[temp_path]) yield output_so_far + "\n\n*이미지가 생성되어 아래 갤러리에 표시됩니다.*", gallery_update except Exception as e: logger.error(f"URL 이미지 다운로드 오류: {e}") yield output_so_far + f"\n\n(이미지 다운로드 중 오류: {e})", gallery_update # 이미지 객체인 경우 (PIL Image 등) elif hasattr(image_result, 'save'): try: with tempfile.NamedTemporaryFile(delete=False, suffix=".webp") as temp_file: image_result.save(temp_file.name) temp_path = temp_file.name # 갤러리 표시 및 이미지 추가 gallery_update = gr.Gallery(visible=True, value=[temp_path]) yield output_so_far + "\n\n*이미지가 생성되어 아래 갤러리에 표시됩니다.*", gallery_update except Exception as e: logger.error(f"이미지 객체 저장 오류: {e}") yield output_so_far + f"\n\n(이미지 객체 저장 중 오류: {e})", gallery_update else: # 다른 형식의 이미지 결과 yield output_so_far + f"\n\n(지원되지 않는 이미지 형식: {type(image_result)})", gallery_update else: yield output_so_far + f"\n\n(이미지 생성 실패: {seed_info})", gallery_update except Exception as e: logger.error(f"갤러리용 이미지 생성 중 오류: {e}") yield output_so_far + f"\n\n(이미지 생성 중 오류: {e})", gallery_update # ============================================================================= # 예시들: 기존 이미지/비디오 예제 12개 + AI 데이팅 시나리오 예제 6개 # ============================================================================= examples = [ [ { "text": "두 PDF 파일의 내용을 비교하세요.", "files": [ "assets/additional-examples/before.pdf", "assets/additional-examples/after.pdf", ], } ], [ { "text": "CSV 파일의 내용을 요약 및 분석하세요.", "files": ["assets/additional-examples/sample-csv.csv"], } ], [ { "text": "친절하고 이해심 많은 여자친구 역할을 맡으세요. 이 영상을 설명해 주세요.", "files": ["assets/additional-examples/tmp.mp4"], } ], [ { "text": "표지를 설명하고 그 위의 글씨를 읽어 주세요.", "files": ["assets/additional-examples/maz.jpg"], } ], [ { "text": "저는 이미 이 보충제를 가지고 있고 이 제품도 구매할 계획입니다. 함께 복용할 때 주의할 점이 있나요?", "files": [ "assets/additional-examples/pill1.png", "assets/additional-examples/pill2.png" ], } ], [ { "text": "이 적분 문제를 풀어 주세요.", "files": ["assets/additional-examples/4.png"], } ], [ { "text": "이 티켓은 언제 발행되었고, 가격은 얼마인가요?", "files": ["assets/additional-examples/2.png"], } ], [ { "text": "이 이미지들의 순서를 바탕으로 짧은 이야기를 만들어 주세요.", "files": [ "assets/sample-images/09-1.png", "assets/sample-images/09-2.png", "assets/sample-images/09-3.png", "assets/sample-images/09-4.png", "assets/sample-images/09-5.png", ], } ], [ { "text": "이 이미지와 일치하는 막대 차트를 그리기 위한 matplotlib를 사용하는 Python 코드를 작성해 주세요.", "files": ["assets/additional-examples/barchart.png"], } ], [ { "text": "이미지의 텍스트를 읽고 Markdown 형식으로 작성해 주세요.", "files": ["assets/additional-examples/3.png"], } ], [ { "text": "이 표지판에 무슨 글자가 쓰여 있나요?", "files": ["assets/sample-images/02.png"], } ], [ { "text": "두 이미지를 비교하고 유사점과 차이점을 설명해 주세요.", "files": ["assets/sample-images/03.png"], } ], [ { "text": "롤플레이 해봅시다. 당신은 저와 더 알아가고 싶은 새로운 온라인 데이트 상대입니다. 다정하고 배려 깊은 방식으로 자기 소개를 해주세요!", } ], [ { "text": "해변을 걷는 두 번째 데이트 중입니다. 장난스러운 대화와 부드러운 플러팅으로 장면을 이어나가 주세요.", } ], [ { "text": "좋아하는 사람에게 메시지를 보내는 것이 불안합니다. 격려의 말이나 접근 방법에 대한 제안을 해줄 수 있나요?", } ], [ { "text": "관계에서 어려움을 극복한 두 사람에 대한 로맨틱한 이야기를 들려주세요.", } ], [ { "text": "시적인 방식으로 사랑을 표현하고 싶습니다. 제 파트너를 위한 진심이 담긴 시를 작성하는 데 도움을 줄 수 있나요?", } ], [ { "text": "작은 다툼이 있었습니다. 진심으로 사과하면서 제 감정을 표현할 수 있는 방법을 찾아주세요.", } ], ] # ============================================================================= # Gradio UI (Blocks) 구성 # ============================================================================= # 1. Gradio Blocks UI 수정 - 갤러리 컴포넌트 추가 css = """ .gradio-container { background: rgba(255, 255, 255, 0.7); padding: 30px 40px; margin: 20px auto; width: 100% !important; max-width: none !important; } """ title_html = """

💘 HeartSync 💘

✅FLUX 이미지 생성 ✅추론 ✅검열 해제 ✅멀티모달 & VLM ✅실시간 웹 검색 ✅RAG

""" with gr.Blocks(css=css, title="HeartSync") as demo: gr.Markdown(title_html) # 생성된 이미지를 저장할 갤러리 컴포넌트 (이 부분이 새로 추가됨) generated_images = gr.Gallery( label="생성된 이미지", show_label=True, visible=False, elem_id="generated_images", columns=2, height="auto", object_fit="contain" ) with gr.Row(): web_search_checkbox = gr.Checkbox(label="심도 있는 연구", value=False) image_gen_checkbox = gr.Checkbox(label="이미지 생성", value=False) base_system_prompt_box = gr.Textbox( lines=3, value="당신은 깊이 사고하는 AI입니다. 항상 논리적이고 창의적으로 문제를 해결합니다.\n페르소나: 당신은 다정하고 사랑이 넘치는 여자친구입니다.", label="기본 시스템 프롬프트", visible=False ) with gr.Row(): age_group_dropdown = gr.Dropdown( label="연령대 선택 (기본 20대)", choices=["10대", "20대", "30~40대", "50~60대", "70대 이상"], value="20대", interactive=True ) mbti_choices = [ "INTJ (용의주도한 전략가)", "INTP (논리적인 사색가)", "ENTJ (대담한 통솔자)", "ENTP (뜨거운 논쟁가)", "INFJ (선의의 옹호자)", "INFP (열정적인 중재자)", "ENFJ (정의로운 사회운동가)", "ENFP (재기발랄한 활동가)", "ISTJ (청렴결백한 논리주의자)", "ISFJ (용감한 수호자)", "ESTJ (엄격한 관리자)", "ESFJ (사교적인 외교관)", "ISTP (만능 재주꾼)", "ISFP (호기심 많은 예술가)", "ESTP (모험을 즐기는 사업가)", "ESFP (자유로운 영혼의 연예인)" ] mbti_dropdown = gr.Dropdown( label="AI 페르소나 MBTI (기본 INTP)", choices=mbti_choices, value="INTP (논리적인 사색가)", interactive=True ) sexual_openness_slider = gr.Slider( minimum=1, maximum=5, step=1, value=2, label="섹슈얼 관심도/개방성 (1~5, 기본=2)", interactive=True ) max_tokens_slider = gr.Slider( label="최대 생성 토큰 수", minimum=100, maximum=8000, step=50, value=1000, visible=False ) web_search_text = gr.Textbox( lines=1, label="웹 검색 쿼리 (미사용)", placeholder="직접 입력할 필요 없음", visible=False ) # 채팅 인터페이스 생성 - 수정된 run 함수 사용 chat = gr.ChatInterface( fn=modified_run, # 여기서 수정된 함수 사용 type="messages", chatbot=gr.Chatbot(type="messages", scale=1, allow_tags=["image"]), textbox=gr.MultimodalTextbox( file_types=[".webp", ".png", ".jpg", ".jpeg", ".gif", ".mp4", ".csv", ".txt", ".pdf"], file_count="multiple", autofocus=True ), multimodal=True, additional_inputs=[ base_system_prompt_box, max_tokens_slider, web_search_checkbox, web_search_text, age_group_dropdown, mbti_dropdown, sexual_openness_slider, image_gen_checkbox, ], additional_outputs=[ generated_images, # 갤러리 컴포넌트를 출력으로 추가 ], stop_btn=False, title='https://discord.gg/openfreeai', examples=examples, run_examples_on_click=False, cache_examples=False, css_paths=None, delete_cache=(1800, 1800), ) with gr.Row(elem_id="examples_row"): with gr.Column(scale=12, elem_id="examples_container"): gr.Markdown("### 예제 입력 (클릭하여 불러오기)") if __name__ == "__main__": demo.launch(share=True)