from flask import jsonify import logging import os formatter = logging.Formatter('%(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) request_counts = {} password = os.environ['password'] def authenticate_request(request): auth_header = request.headers.get('Authorization') if not auth_header: return False, jsonify({'error': '缺少Authorization请求头'}), 401 try: auth_type, pass_word = auth_header.split(' ', 1) except ValueError: return False, jsonify({'error': 'Authorization请求头格式错误'}), 401 if auth_type.lower() != 'bearer': return False, jsonify({'error': 'Authorization类型必须为Bearer'}), 401 if pass_word != password: return False, jsonify({'error': '未授权'}), 401 return True, None, None def process_messages_for_gemini(messages, use_system_prompt=False): gemini_history = [] errors = [] system_instruction_text = "" is_system_phase = use_system_prompt #logger.info(f"Starting message processing. use_system_prompt: {use_system_prompt}") for i, message in enumerate(messages): ##logger.info(f"Processing message {i}: {message}") role = message.get('role') content = message.get('content') if isinstance(content, str): #logger.info(f"Message content is string.") if is_system_phase and role == 'system': #logger.info(f"System phase and system role detected.") if system_instruction_text: system_instruction_text += "\n" + content else: system_instruction_text = content #logger.info(f"Current system_instruction_text: {system_instruction_text}") else: is_system_phase = False #logger.info(f"System phase ended.") if role in ['user', 'system']: role_to_use = 'user' elif role == 'assistant': role_to_use = 'model' else: errors.append(f"Invalid role: {role}") logger.error(f"Invalid role: {role}") continue if gemini_history and gemini_history[-1]['role'] == role_to_use: gemini_history[-1]['parts'].append({"text": content}) #logger.info(f"Appended text to existing role '{role_to_use}'.") else: gemini_history.append({"role": role_to_use, "parts": [{"text": content}]}) #logger.info(f"Created new entry for role '{role_to_use}' with text.") elif isinstance(content, list): #logger.info(f"Message content is list (likely includes media).") parts = [] for item in content: #logger.info(f"Processing content item: {item}") if item.get('type') == 'text': parts.append({"text": item.get('text')}) #logger.info(f"Added text part: {item.get('text')}") elif item.get('type') == 'image_url': image_data = item.get('image_url', {}).get('url', '') #logger.info(f"Processing image_url: {image_data}") if image_data.startswith('data:image/'): try: mime_type, base64_data = image_data.split(';')[0].split(':')[1], image_data.split(',')[1] parts.append({ "inline_data": { "mime_type": mime_type, "data": base64_data } }) #logger.info(f"Added inline image data with mime_type: {mime_type}") except (IndexError, ValueError) as e: error_message = f"Invalid data URI for image: {image_data}. Error: {e}" errors.append(error_message) logger.error(error_message) else: error_message = f"Invalid image URL format for item: {item}. URL should start with 'data:image/' for inline images." errors.append(error_message) logger.error(error_message) elif item.get('type') == 'file_url': file_data = item.get('file_url', {}).get('url', '') #logger.info(f"Processing file_url: {file_data}") if file_data.startswith('data:'): try: mime_type, base64_data = file_data.split(';')[0].split(':')[1], file_data.split(',')[1] parts.append({ "inline_data": { "mime_type": mime_type, "data": base64_data } }) #logger.info(f"Added inline file data with mime_type: {mime_type}") except (IndexError, ValueError) as e: error_message = f"Invalid data URI for file: {file_data}. Error: {e}" errors.append(error_message) logger.error(error_message) else: error_message = f"Invalid file URL format for item: {item}. URL should start with 'data:' for inline files." errors.append(error_message) logger.error(error_message) if parts: if role in ['user', 'system']: role_to_use = 'user' elif role == 'assistant': role_to_use = 'model' else: errors.append(f"Invalid role: {role}") logger.error(f"Invalid role: {role}") continue if gemini_history and gemini_history[-1]['role'] == role_to_use: gemini_history[-1]['parts'].extend(parts) #logger.info(f"Extended parts to existing role '{role_to_use}'.") else: gemini_history.append({"role": role_to_use, "parts": parts}) #logger.info(f"Created new entry for role '{role_to_use}' with parts.") if errors: logger.warning(f"Errors encountered during message processing: {errors}") return gemini_history, {"parts": [{"text": system_instruction_text}]}, (jsonify({'error': errors}), 400) else: #logger.info(f"Message processing successful. Returning gemini_history and system_instruction_text.") return gemini_history, {"parts": [{"text": system_instruction_text}]}, None