|
import json |
|
import os |
|
import time |
|
import tempfile |
|
from PIL import Image |
|
import gradio as gr |
|
import logging |
|
import io |
|
|
|
from google import genai |
|
from google.genai import types |
|
|
|
|
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, |
|
format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def save_binary_file(file_name, data): |
|
logger.debug(f"ํ์ผ์ ์ด์ง ๋ฐ์ดํฐ ์ ์ฅ ์ค: {file_name}") |
|
with open(file_name, "wb") as f: |
|
f.write(data) |
|
logger.debug(f"ํ์ผ ์ ์ฅ ์๋ฃ: {file_name}") |
|
|
|
|
|
def generate_image_from_prompt(prompt, model="gemini-2.0-flash-exp-image-generation"): |
|
"""ํ
์คํธ ํ๋กฌํํธ๋ง์ผ๋ก ์ด๋ฏธ์ง ์์ฑ""" |
|
logger.debug(f"generate_image_from_prompt ํจ์ ์์ - ํ๋กฌํํธ: '{prompt}'") |
|
|
|
try: |
|
|
|
effective_api_key = os.environ.get("GEMINI_API_KEY") |
|
if effective_api_key: |
|
logger.debug("ํ๊ฒฝ๋ณ์์์ API ํค ๋ถ๋ฌ์ด") |
|
else: |
|
logger.error("API ํค๊ฐ ํ๊ฒฝ๋ณ์์ ์ค์ ๋์ง ์์์ต๋๋ค.") |
|
raise ValueError("API ํค๊ฐ ํ์ํฉ๋๋ค.") |
|
|
|
client = genai.Client(api_key=effective_api_key) |
|
logger.debug("Gemini ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ์๋ฃ.") |
|
|
|
|
|
if not prompt or not prompt.strip(): |
|
prompt = "A creative and visually appealing image that captures imagination. Use vibrant colors and interesting composition." |
|
|
|
|
|
if not any(ord(c) < 128 for c in prompt): |
|
prompt += " Create a highly detailed, visually stunning image that captures the essence of the description." |
|
|
|
|
|
parts = [ |
|
types.Part.from_text(text=prompt) |
|
] |
|
|
|
logger.debug(f"์ปจํ
์ธ ๊ฐ์ฒด ์์ฑ ์๋ฃ: {len(parts)} ์์ดํ
") |
|
|
|
|
|
generate_content_config = types.GenerateContentConfig( |
|
temperature=1, |
|
top_p=0.95, |
|
top_k=40, |
|
max_output_tokens=8192, |
|
response_modalities=["image", "text"], |
|
) |
|
logger.debug(f"์์ฑ ์ค์ : {generate_content_config}") |
|
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: |
|
temp_path = tmp.name |
|
logger.debug(f"์์ ํ์ผ ์์ฑ๋จ: {temp_path}") |
|
|
|
|
|
response_stream = client.models.generate_content_stream( |
|
model=model, |
|
contents=[ |
|
types.Content( |
|
role="user", |
|
parts=parts, |
|
), |
|
], |
|
config=generate_content_config, |
|
) |
|
|
|
logger.debug("์๋ต ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์...") |
|
|
|
|
|
image_saved = False |
|
response_text = "" |
|
|
|
for chunk in response_stream: |
|
logger.debug(f"chunk ์์ : {chunk}") |
|
|
|
|
|
if not hasattr(chunk, 'candidates') or not chunk.candidates or len(chunk.candidates) == 0: |
|
logger.warning("chunk์ candidates๊ฐ ์์ต๋๋ค. ๊ฑด๋๋๋๋ค.") |
|
continue |
|
|
|
if not hasattr(chunk.candidates[0], 'content') or chunk.candidates[0].content is None: |
|
logger.warning("chunk.candidates[0]์ content๊ฐ ์์ต๋๋ค. ๊ฑด๋๋๋๋ค.") |
|
continue |
|
|
|
if not hasattr(chunk.candidates[0].content, 'parts') or not chunk.candidates[0].content.parts: |
|
logger.warning("chunk.candidates[0].content์ parts๊ฐ ์์ต๋๋ค. ๊ฑด๋๋๋๋ค.") |
|
continue |
|
|
|
for part in chunk.candidates[0].content.parts: |
|
if hasattr(part, 'text') and part.text: |
|
response_text += part.text |
|
logger.info(f"์์ ๋ ํ
์คํธ: {part.text}") |
|
elif hasattr(part, 'inline_data') and part.inline_data: |
|
save_binary_file(temp_path, part.inline_data.data) |
|
logger.info(f"MIME ํ์
{part.inline_data.mime_type}์ ํ์ผ์ด ์ ์ฅ๋จ: {temp_path}") |
|
image_saved = True |
|
|
|
if not image_saved: |
|
logger.warning("์ด๋ฏธ์ง๊ฐ ์์ฑ๋์ง ์์์ต๋๋ค.") |
|
return None, response_text or "์ด๋ฏธ์ง๊ฐ ์์ฑ๋์ง ์์์ต๋๋ค. ๋ค๋ฅธ ํ๋กฌํํธ๋ก ์๋ํด๋ณด์ธ์." |
|
|
|
logger.debug("์ด๋ฏธ์ง ์์ฑ ์๋ฃ.") |
|
return temp_path, response_text |
|
|
|
except Exception as e: |
|
logger.exception("์ด๋ฏธ์ง ์์ฑ ์ค ์ค๋ฅ ๋ฐ์:") |
|
return None, f"์ค๋ฅ ๋ฐ์: {str(e)}" |
|
|
|
|
|
def merge_images(person_img_path, product_img_path, background_img_path, prompt, model="gemini-2.0-flash-exp-image-generation"): |
|
"""๊ธฐ์กด ์ด๋ฏธ์ง ํฉ์ฑ ํจ์ (์ด์ ์ฝ๋์ ๋์ผ)""" |
|
logger.debug(f"merge_images ํจ์ ์์ - ํ๋กฌํํธ: '{prompt}'") |
|
|
|
try: |
|
|
|
effective_api_key = os.environ.get("GEMINI_API_KEY") |
|
if effective_api_key: |
|
logger.debug("ํ๊ฒฝ๋ณ์์์ API ํค ๋ถ๋ฌ์ด") |
|
else: |
|
logger.error("API ํค๊ฐ ํ๊ฒฝ๋ณ์์ ์ค์ ๋์ง ์์์ต๋๋ค.") |
|
raise ValueError("API ํค๊ฐ ํ์ํฉ๋๋ค.") |
|
|
|
client = genai.Client(api_key=effective_api_key) |
|
logger.debug("Gemini ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ์๋ฃ.") |
|
|
|
|
|
person_file = client.files.upload(file=person_img_path) |
|
logger.debug(f"์ฌ๋ ์ด๋ฏธ์ง ์
๋ก๋ ์๋ฃ: {person_file.uri}") |
|
|
|
product_file = client.files.upload(file=product_img_path) |
|
logger.debug(f"์ํ ์ด๋ฏธ์ง ์
๋ก๋ ์๋ฃ: {product_file.uri}") |
|
|
|
|
|
parts = [] |
|
|
|
|
|
parts.append( |
|
types.Part.from_uri( |
|
file_uri=person_file.uri, |
|
mime_type=person_file.mime_type |
|
) |
|
) |
|
|
|
|
|
parts.append( |
|
types.Part.from_uri( |
|
file_uri=product_file.uri, |
|
mime_type=product_file.mime_type |
|
) |
|
) |
|
|
|
|
|
if background_img_path: |
|
background_file = client.files.upload(file=background_img_path) |
|
logger.debug(f"๋ฐฐ๊ฒฝ ์ด๋ฏธ์ง ์
๋ก๋ ์๋ฃ: {background_file.uri}") |
|
parts.append( |
|
types.Part.from_uri( |
|
file_uri=background_file.uri, |
|
mime_type=background_file.mime_type |
|
) |
|
) |
|
logger.debug("๋ฐฐ๊ฒฝ ์ด๋ฏธ์ง ์ถ๊ฐ๋จ") |
|
|
|
|
|
parts.append( |
|
types.Part.from_text(text=prompt) |
|
) |
|
|
|
logger.debug(f"์ปจํ
์ธ ๊ฐ์ฒด ์์ฑ ์๋ฃ: {len(parts)} ์์ดํ
") |
|
|
|
|
|
generate_content_config = types.GenerateContentConfig( |
|
temperature=1, |
|
top_p=0.95, |
|
top_k=40, |
|
max_output_tokens=8192, |
|
response_modalities=["image", "text"], |
|
) |
|
logger.debug(f"์์ฑ ์ค์ : {generate_content_config}") |
|
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: |
|
temp_path = tmp.name |
|
logger.debug(f"์์ ํ์ผ ์์ฑ๋จ: {temp_path}") |
|
|
|
|
|
response_stream = client.models.generate_content_stream( |
|
model=model, |
|
contents=[ |
|
types.Content( |
|
role="user", |
|
parts=parts, |
|
), |
|
], |
|
config=generate_content_config, |
|
) |
|
|
|
logger.debug("์๋ต ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์...") |
|
|
|
|
|
image_saved = False |
|
response_text = "" |
|
|
|
for chunk in response_stream: |
|
logger.debug(f"chunk ์์ : {chunk}") |
|
|
|
|
|
if not hasattr(chunk, 'candidates') or not chunk.candidates or len(chunk.candidates) == 0: |
|
logger.warning("chunk์ candidates๊ฐ ์์ต๋๋ค. ๊ฑด๋๋๋๋ค.") |
|
continue |
|
|
|
if not hasattr(chunk.candidates[0], 'content') or chunk.candidates[0].content is None: |
|
logger.warning("chunk.candidates[0]์ content๊ฐ ์์ต๋๋ค. ๊ฑด๋๋๋๋ค.") |
|
continue |
|
|
|
if not hasattr(chunk.candidates[0].content, 'parts') or not chunk.candidates[0].content.parts: |
|
logger.warning("chunk.candidates[0].content์ parts๊ฐ ์์ต๋๋ค. ๊ฑด๋๋๋๋ค.") |
|
continue |
|
|
|
for part in chunk.candidates[0].content.parts: |
|
if hasattr(part, 'text') and part.text: |
|
response_text += part.text |
|
logger.info(f"์์ ๋ ํ
์คํธ: {part.text}") |
|
elif hasattr(part, 'inline_data') and part.inline_data: |
|
save_binary_file(temp_path, part.inline_data.data) |
|
logger.info(f"MIME ํ์
{part.inline_data.mime_type}์ ํ์ผ์ด ์ ์ฅ๋จ: {temp_path}") |
|
image_saved = True |
|
|
|
if not image_saved: |
|
logger.warning("์ด๋ฏธ์ง๊ฐ ์์ฑ๋์ง ์์์ต๋๋ค.") |
|
return None, response_text or "์ด๋ฏธ์ง๊ฐ ์์ฑ๋์ง ์์์ต๋๋ค. ๋ค๋ฅธ ํ๋กฌํํธ๋ ์ด๋ฏธ์ง๋ก ์๋ํด๋ณด์ธ์." |
|
|
|
logger.debug("์ด๋ฏธ์ง ์์ฑ ์๋ฃ.") |
|
return temp_path, response_text |
|
|
|
except Exception as e: |
|
logger.exception("์ด๋ฏธ์ง ์์ฑ ์ค ์ค๋ฅ ๋ฐ์:") |
|
return None, f"์ค๋ฅ ๋ฐ์: {str(e)}" |
|
|
|
|
|
def process_images_and_prompt(person_pil, product_pil, background_pil, prompt): |
|
""" |
|
์ด๋ฏธ์ง ๋ฐ ํ๋กฌํํธ๋ฅผ ์ฒ๋ฆฌํ์ฌ ์ด๋ฏธ์ง๋ฅผ ์์ฑํ๋ ํจ์ |
|
|
|
์ด๋ฏธ์ง ์
๋ ฅ์ด ์๋ ๊ฒฝ์ฐ ํ
์คํธ ๊ธฐ๋ฐ ์ด๋ฏธ์ง ์์ฑ์ผ๋ก ์ ํ |
|
์ด๋ฏธ์ง ์
๋ ฅ์ด ์๋ ๊ฒฝ์ฐ ๊ธฐ์กด์ ์ด๋ฏธ์ง ํฉ์ฑ ๋ก์ง ์ ์ง |
|
""" |
|
logger.debug(f"process_images_and_prompt ํจ์ ์์ - ํ๋กฌํํธ: '{prompt}'") |
|
|
|
try: |
|
|
|
if person_pil is None and product_pil is None and background_pil is None: |
|
logger.debug("์ด๋ฏธ์ง ์์. ํ
์คํธ ๊ธฐ๋ฐ ์ด๋ฏธ์ง ์์ฑ์ผ๋ก ์ ํ") |
|
result_path, response_text = generate_image_from_prompt(prompt) |
|
|
|
if result_path: |
|
try: |
|
result_img = Image.open(result_path) |
|
if result_img.mode == "RGBA": |
|
result_img = result_img.convert("RGB") |
|
|
|
return [result_img], response_text or "์ด๋ฏธ์ง๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์์ฑ๋์์ต๋๋ค." |
|
except Exception as e: |
|
logger.exception(f"๊ฒฐ๊ณผ ์ด๋ฏธ์ง ๋ก๋ ์ค ์ค๋ฅ: {str(e)}") |
|
return [], f"๊ฒฐ๊ณผ ์ด๋ฏธ์ง ์ฒ๋ฆฌ ์ค ์ค๋ฅ: {str(e)}" |
|
else: |
|
return [], response_text or "์ด๋ฏธ์ง ์์ฑ์ ์คํจํ์ต๋๋ค." |
|
|
|
|
|
if person_pil is None: |
|
return [], "์ฌ๋ ์ด๋ฏธ์ง๋ฅผ ์
๋ก๋ํด์ฃผ์ธ์." |
|
if product_pil is None: |
|
return [], "์ํ ์ด๋ฏธ์ง๋ฅผ ์
๋ก๋ํด์ฃผ์ธ์." |
|
|
|
|
|
if not prompt or not prompt.strip(): |
|
if background_pil: |
|
prompt = "์ด ๋ฐฐ๊ฒฝ์ ์ด ์ฌ๋์ด ์ด ์ํ์ ์ฌ์ฉํ๋ ๋ชจ์ต์ ์์ฐ์ค๋ฝ๊ฒ ๋ณด์ฌ์ฃผ์ธ์. ์ํ์ ์ ๋ณด์ด๊ฒ ํด์ฃผ์ธ์. Create a natural composite image showing this person using this product in this background setting. Make sure the product is clearly visible." |
|
else: |
|
prompt = "์ด ์ฌ๋์ด ์ด ์ํ์ ์ฌ์ฉํ๋ ๋ชจ์ต์ ์์ฐ์ค๋ฝ๊ฒ ๋ณด์ฌ์ฃผ์ธ์. ์ํ์ ์ ๋ณด์ด๊ฒ ํด์ฃผ์ธ์. Create a natural composite image showing this person using this product. Make sure the product is clearly visible." |
|
|
|
|
|
if not any(ord(c) < 128 for c in prompt): |
|
if background_pil: |
|
prompt += " Create a realistic composite image of this person with this product in this background." |
|
else: |
|
prompt += " Create a realistic composite image of this person with this product." |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_person: |
|
person_path = tmp_person.name |
|
|
|
person_img = person_pil |
|
if person_img.mode != 'RGB': |
|
person_img = person_img.convert('RGB') |
|
person_img.save(person_path, 'JPEG') |
|
logger.debug(f"์ฌ๋ ์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {person_path}") |
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_product: |
|
product_path = tmp_product.name |
|
|
|
product_img = product_pil |
|
if product_img.mode != 'RGB': |
|
product_img = product_img.convert('RGB') |
|
product_img.save(product_path, 'JPEG') |
|
logger.debug(f"์ํ ์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {product_path}") |
|
|
|
|
|
background_path = None |
|
if background_pil is not None: |
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_bg: |
|
background_path = tmp_bg.name |
|
|
|
background_img = background_pil |
|
if background_img.mode != 'RGB': |
|
background_img = background_img.convert('RGB') |
|
background_img.save(background_path, 'JPEG') |
|
logger.debug(f"๋ฐฐ๊ฒฝ ์ด๋ฏธ์ง ์ ์ฅ ์๋ฃ: {background_path}") |
|
|
|
|
|
result_path, response_text = merge_images( |
|
person_img_path=person_path, |
|
product_img_path=product_path, |
|
background_img_path=background_path, |
|
prompt=prompt |
|
) |
|
|
|
|
|
if result_path: |
|
logger.debug(f"์ด๋ฏธ์ง ์์ฑ ์๋ฃ. ๊ฒฝ๋ก: {result_path}") |
|
try: |
|
result_img = Image.open(result_path) |
|
if result_img.mode == "RGBA": |
|
result_img = result_img.convert("RGB") |
|
|
|
|
|
try: |
|
os.unlink(person_path) |
|
os.unlink(product_path) |
|
if background_path: |
|
os.unlink(background_path) |
|
except Exception as e: |
|
logger.warning(f"์์ ํ์ผ ์ญ์ ์ค ์ค๋ฅ: {str(e)}") |
|
|
|
return [result_img], response_text or "์ด๋ฏธ์ง๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์์ฑ๋์์ต๋๋ค." |
|
except Exception as e: |
|
logger.exception(f"๊ฒฐ๊ณผ ์ด๋ฏธ์ง ๋ก๋ ์ค ์ค๋ฅ: {str(e)}") |
|
return [], f"๊ฒฐ๊ณผ ์ด๋ฏธ์ง ์ฒ๋ฆฌ ์ค ์ค๋ฅ: {str(e)}" |
|
else: |
|
logger.error("merge_images ํจ์์์ None ๋ฐํ๋จ.") |
|
return [], response_text or "์ด๋ฏธ์ง ์์ฑ์ ์คํจํ์ต๋๋ค. ๋ค๋ฅธ ์ด๋ฏธ์ง๋ ํ๋กฌํํธ๋ก ์๋ํด๋ณด์ธ์." |
|
|
|
except Exception as e: |
|
logger.exception("process_images_and_prompt ํจ์์์ ์ค๋ฅ ๋ฐ์:") |
|
return [], f"์ค๋ฅ ๋ฐ์: {str(e)}" |