Spaces:
Running
Running
| import logging | |
| from PIL import Image | |
| from io import BytesIO | |
| import requests, os, json, time | |
| from google import genai | |
| prompt_base_path = "" | |
| client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) | |
| def encode_image(image_source): | |
| """ | |
| ์ด๋ฏธ์ง ๊ฒฝ๋ก๊ฐ URL์ด๋ ๋ก์ปฌ ํ์ผ์ด๋ Pillow Image ๊ฐ์ฒด์ด๋ ๋์ผํ๊ฒ ์ฒ๋ฆฌํ๋ ํจ์. | |
| ์ด๋ฏธ์ง๋ฅผ ์ด์ด google.genai.types.Part ๊ฐ์ฒด๋ก ๋ณํํฉ๋๋ค. | |
| Pillow์์ ์ง์๋์ง ์๋ ํฌ๋งท์ ๋ํด์๋ ์์ธ๋ฅผ ๋ฐ์์ํต๋๋ค. | |
| """ | |
| try: | |
| # ์ด๋ฏธ Pillow ์ด๋ฏธ์ง ๊ฐ์ฒด์ธ ๊ฒฝ์ฐ ๊ทธ๋๋ก ์ฌ์ฉ | |
| if isinstance(image_source, Image.Image): | |
| image = image_source | |
| else: | |
| # URL์์ ์ด๋ฏธ์ง ๋ค์ด๋ก๋ | |
| if isinstance(image_source, str) and ( | |
| image_source.startswith("http://") | |
| or image_source.startswith("https://") | |
| ): | |
| response = requests.get(image_source) | |
| image = Image.open(BytesIO(response.content)) | |
| # ๋ก์ปฌ ํ์ผ์์ ์ด๋ฏธ์ง ์ด๊ธฐ | |
| else: | |
| image = Image.open(image_source) | |
| # ์ด๋ฏธ์ง ํฌ๋งท์ด None์ธ ๊ฒฝ์ฐ (๋ฉ๋ชจ๋ฆฌ์์ ์์ฑ๋ ์ด๋ฏธ์ง ๋ฑ) | |
| if image.format is None: | |
| image_format = "JPEG" | |
| else: | |
| image_format = image.format | |
| # ์ด๋ฏธ์ง ํฌ๋งท์ด ์ง์๋์ง ์๋ ๊ฒฝ์ฐ ์์ธ ๋ฐ์ | |
| if image_format not in Image.registered_extensions().values(): | |
| raise ValueError(f"Unsupported image format: {image_format}.") | |
| buffered = BytesIO() | |
| # PIL์์ ์ง์๋์ง ์๋ ํฌ๋งท์ด๋ ๋ค์ํ ์ฑ๋์ RGB๋ก ๋ณํ ํ ์ ์ฅ | |
| if image.mode in ("RGBA", "P", "CMYK"): # RGBA, ํ๋ ํธ, CMYK ๋ฑ์ RGB๋ก ๋ณํ | |
| image = image.convert("RGB") | |
| image.save(buffered, format="JPEG") | |
| return genai.types.Part.from_bytes(data=buffered.getvalue(), mime_type="image/jpeg") | |
| except requests.exceptions.RequestException as e: | |
| raise ValueError(f"Failed to download the image from URL: {e}") | |
| except IOError as e: | |
| raise ValueError(f"Failed to process the image file: {e}") | |
| except ValueError as e: | |
| raise ValueError(e) | |
| def run_gemini( | |
| target_prompt: str, | |
| prompt_in_path: str, | |
| img_in_data: str = None, | |
| model: str = "gemini-2.0-flash", | |
| ) -> str: | |
| """ | |
| GEMINI API๋ฅผ ๋๊ธฐ ๋ฐฉ์์ผ๋ก ํธ์ถํ์ฌ ๋ฌธ์์ด ์๋ต์ ๋ฐ์ต๋๋ค. | |
| retry ๋ ผ๋ฆฌ๋ ์ ๊ฑฐ๋์์ต๋๋ค. | |
| """ | |
| with open(os.path.join(prompt_base_path, prompt_in_path), "r", encoding="utf-8") as file: | |
| prompt_dict = json.load(file) | |
| system_prompt = prompt_dict["system_prompt"] | |
| user_prompt_head = prompt_dict["user_prompt"]["head"] | |
| user_prompt_tail = prompt_dict["user_prompt"]["tail"] | |
| user_prompt_text = "\n".join([user_prompt_head, target_prompt, user_prompt_tail]) | |
| input_content = [user_prompt_text] | |
| if img_in_data is not None: | |
| encoded_image = encode_image(img_in_data) | |
| input_content.append(encoded_image) | |
| logging.info("Requested API for chat completion response (sync call)...") | |
| start_time = time.time() | |
| # ๋๊ธฐ ๋ฐฉ์: client.models.generate_content(...) | |
| chat_completion = client.models.generate_content( | |
| model=model, | |
| contents=input_content, | |
| config={ | |
| "system_instruction": system_prompt, | |
| } | |
| ) | |
| print(f"Chat Completion: {chat_completion}") | |
| chat_output = chat_completion.candidates[0].content.parts[0].text | |
| input_token = chat_completion.usage_metadata.prompt_token_count | |
| output_token = chat_completion.usage_metadata.candidates_token_count | |
| pricing = input_token / 1000000 * 0.1 * 1500 + output_token / 1000000 * 0.7 * 1500 | |
| logging.info( | |
| f"[GEMINI] Request completed (sync). Time taken: {time.time()-start_time:.2f}s / Pricing(KRW): {pricing:.2f}" | |
| ) | |
| return chat_output | |