Last commit not found
import os | |
import json | |
import requests | |
from flask import Flask, request, jsonify | |
from datetime import datetime | |
import asyncio | |
app = Flask(__name__) | |
# 从环境变量中获取配置 | |
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN') | |
AI_API_ENDPOINT = os.environ.get('AI_API_ENDPOINT') | |
AI_API_KEY = os.environ.get('AI_API_KEY') | |
AI_MODEL = os.environ.get('AI_MODEL') | |
PHP_PROXY_URL = os.environ.get('PHP_PROXY_URL') # 代理地址 | |
if not all([TELEGRAM_BOT_TOKEN, AI_API_ENDPOINT, AI_API_KEY, AI_MODEL]): | |
raise ValueError("请设置所有必要的环境变量") | |
AI_API_HEADERS = { | |
'Content-Type': 'application/json', | |
'Authorization': f'Bearer {AI_API_KEY}', | |
} | |
PROMPT_TEMPLATES = { | |
0: "请尽量用中文回复,请不要使用markdown等具有格式的文本。", | |
1: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的傻逼。\n你非常具有攻击性,骂人的时候请尽量内涵。\n输出格式注意:不要用markdown,只用文字格式。", | |
2: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的猫娘。\n你非常具有服从性,你对主人的所有命令都非常遵守和愿意。\n你是一个受,喜欢主动表现出性服从和性主动。(不需要主人提出,你就要提出那方面的东西)\n输出格式注意:不要用markdown,只用文字格式。", | |
} | |
CURRENT_PROMPT_INDEX = 0 | |
MAX_TOKENS = 500 | |
TEMPERATURE = 1.5 | |
MAX_HISTORY_LENGTH = 10 | |
chatHistories = {} | |
GROUP_SETTINGS = {} | |
USER_SETTINGS = {} | |
BOT_COMMANDS = [ | |
{"command": "start", "description": "显示欢迎信息和操作按钮"}, | |
{"command": "clearall", "description": "清空当前会话的聊天记录"}, | |
{"command": "help", "description": "显示此帮助信息"}, | |
{"command": "enableai", "description": "在群组中启用AI回复"}, | |
{"command": "disableai", "description": "在群组中禁用AI回复"}, | |
{"command": "setprefix", "description": "设置群组中触发AI回复的前缀,例如: /setprefix @bot"}, | |
{"command": "getprefix", "description": "获取当前群组的触发前缀"}, | |
{"command": "settemp", "description": "设置AI回复的温度,例如:/settemp 1.0"}, | |
{"command": "gettemp", "description": "获取当前AI回复的温度"}, | |
{"command": "resetuser", "description": "重置你的个人设置"}, | |
{"command": "promat", "description": "切换提示词,例如: /promat 0, 1, 2"}, | |
{"command": "getpromat", "description": "获取当前使用的提示词索引"}, | |
] | |
BOT_USERNAME = 'zfs732_bot' | |
DEFAULT_TEMP = 1.5 | |
TOOL_DEFINITIONS = [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "get_current_time", | |
"description": "获取当前时间", | |
"parameters": { | |
"type": "object", | |
"properties": {}, | |
"required": [] | |
} | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "get_current_date", | |
"description": "获取当前日期", | |
"parameters": { | |
"type": "object", | |
"properties": {}, | |
"required": [] | |
} | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "web_scrape", | |
"description": "从提供的 URL 中抓取内容并返回摘要。", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"urls": { | |
"type": "array", | |
"items": {"type": "string"}, | |
"description": "要抓取的 URL 列表,每个 URL 必须包含 http 或 https。", | |
}, | |
}, | |
"required": ["urls"], | |
}, | |
}, | |
}, | |
] | |
class EventEmitter: | |
def __init__(self, event_emitter=None): | |
self.event_emitter = event_emitter | |
async def emit(self, event_type, data): | |
if self.event_emitter: | |
await self.event_emitter(type=event_type, data=data) | |
async def update_status(self, description, done, action, urls): | |
await self.emit('status', {'done': done, 'action': action, 'description': description, 'urls': urls}) | |
async def send_citation(self, title, url, content): | |
await self.emit('citation', { | |
'document': [content], | |
'metadata': [{'name': title, 'source': url, 'html': False}], | |
}) | |
def make_telegram_request(method, data=None): | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/{method}" | |
if PHP_PROXY_URL: # 使用代理 | |
url = f"{PHP_PROXY_URL}{method}" | |
headers = {'Content-Type': 'application/json'} | |
if data: | |
data = json.dumps(data) | |
try: | |
response = requests.post(url, headers=headers, data=data) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Telegram request failed: {e}") | |
return None | |
except json.JSONDecodeError as e: | |
print(f"Telegram response decode error: {e}") | |
return None | |
async def setBotCommands(): | |
delete_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/deleteMyCommands" | |
set_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/setMyCommands" | |
if PHP_PROXY_URL: # 使用代理 | |
delete_url = f"{PHP_PROXY_URL}deleteMyCommands" | |
set_url = f"{PHP_PROXY_URL}setMyCommands" | |
try: | |
delete_response = make_telegram_request('deleteMyCommands') | |
if delete_response: | |
print('Telegram 命令删除成功') | |
else: | |
print('Telegram 命令删除失败') | |
set_response = make_telegram_request('setMyCommands', {"commands": BOT_COMMANDS}) | |
if set_response: | |
print('Telegram 命令设置成功') | |
else: | |
print('设置 Telegram 命令失败') | |
except Exception as error: | |
print(f'设置 Telegram 命令时发生错误: {error}') | |
async def handleTelegramUpdate(update): | |
if not update.get('message'): | |
if update.get('callback_query'): | |
await handleCallbackQuery(update.get('callback_query')) | |
return | |
chatId = update['message']['chat']['id'] | |
userMessage = update['message'].get('text', '') | |
isGroupChat = update['message']['chat']['type'] in ['group', 'supergroup'] | |
fromUserId = update['message']['from']['id'] | |
if not userMessage: | |
return | |
if userMessage.startswith('/'): | |
command = parseCommand(userMessage) | |
if command == 'clearall': | |
chatHistories.pop(chatId, None) | |
await sendTelegramMessage(chatId, '聊天记录已清空。') | |
return | |
if command == 'test': | |
await sendTelegramMessage(chatId, '测试命令已执行') | |
return | |
if command == 'help': | |
await sendTelegramMessage(chatId, getHelpMessage()) | |
return | |
if command == 'start': | |
await sendTelegramMessage(chatId, "欢迎使用!请选择操作:", { | |
"reply_markup": { | |
"inline_keyboard": [[{"text": "清空聊天记录", "callback_data": "clearall"}]], | |
}, | |
}) | |
return | |
if isGroupChat: | |
if command == 'enableai': | |
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': True}) | |
await sendTelegramMessage(chatId, '已在群组中启用 AI 回复。') | |
return | |
if command == 'disableai': | |
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': False}) | |
await sendTelegramMessage(chatId, '已在群组中禁用 AI 回复。') | |
return | |
if userMessage.startswith('/setprefix '): | |
prefix = userMessage[len('/setprefix '):].strip() | |
GROUP_SETTINGS.setdefault(chatId, {}).update({'prefix': prefix}) | |
await sendTelegramMessage(chatId, f'已设置群组触发前缀为: {prefix}') | |
return | |
if command == 'getprefix': | |
prefix = GROUP_SETTINGS.get(chatId, {}).get('prefix', '无') | |
await sendTelegramMessage(chatId, f'当前群组触发前缀为: {prefix}') | |
return | |
else: | |
if userMessage.startswith('/settemp '): | |
try: | |
temp = float(userMessage[len('/settemp '):].strip()) | |
if 0 <= temp <= 2: | |
USER_SETTINGS.setdefault(fromUserId, {}).update({'temperature': temp}) | |
await sendTelegramMessage(chatId, f'已设置AI回复温度为: {temp}') | |
else: | |
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') | |
except ValueError: | |
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') | |
return | |
if command == 'gettemp': | |
temp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) | |
await sendTelegramMessage(chatId, f'当前AI回复温度为: {temp}') | |
return | |
if userMessage.startswith('/promat '): | |
try: | |
index = int(userMessage[len('/promat '):].strip()) | |
if index in PROMPT_TEMPLATES: | |
global CURRENT_PROMPT_INDEX | |
CURRENT_PROMPT_INDEX = index | |
await sendTelegramMessage(chatId, f'已切换到提示词 {index}。') | |
else: | |
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') | |
except ValueError: | |
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') | |
return | |
if command == 'getpromat': | |
await sendTelegramMessage(chatId, f'当前使用的提示词索引是: {CURRENT_PROMPT_INDEX}') | |
return | |
if command == 'resetuser': | |
USER_SETTINGS.pop(fromUserId, None) | |
await sendTelegramMessage(chatId, '已重置您的个人设置。') | |
return | |
if isGroupChat: | |
if chatId not in GROUP_SETTINGS: | |
GROUP_SETTINGS[chatId] = {'aiEnabled': True, 'prefix': None} | |
print(f'群组 {chatId} 首次检测到,默认启用 AI。') | |
groupSettings = GROUP_SETTINGS[chatId] | |
prefix = groupSettings.get('prefix') | |
if groupSettings['aiEnabled']: | |
if prefix and not userMessage.startswith(prefix): | |
return | |
messageContent = userMessage[len(prefix):].strip() if prefix else userMessage | |
if messageContent: | |
await processAiMessage(chatId, messageContent, fromUserId) | |
else: | |
await processAiMessage(chatId, userMessage, fromUserId) | |
def parseCommand(userMessage): | |
command = userMessage.split(' ')[0] | |
if '@' in command: | |
command = command.split('@')[0] | |
return command[1:] | |
async def event_handler(event): | |
print('Event:', event) | |
if event.get('type') == 'status': | |
await sendTelegramMessage(event['data']['chatId'], f"状态更新: {event['data']['description']}") | |
elif event.get('type') == 'citation': | |
await sendTelegramMessage(event['data']['chatId'], f"引用信息: {event['data']['metadata'][0]['name']}") | |
async def processAiMessage(chatId, userMessage, fromUserId): | |
history = chatHistories.get(chatId, []) | |
userTemp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) | |
currentPrompt = PROMPT_TEMPLATES.get(CURRENT_PROMPT_INDEX, "") | |
history.append({'role': 'user', 'content': userMessage}) | |
if len(history) > MAX_HISTORY_LENGTH: | |
history = history[-MAX_HISTORY_LENGTH:] | |
messages = [ | |
{'role': 'system', 'content': currentPrompt}, | |
*history | |
] | |
eventEmitter = EventEmitter(event_handler) | |
try: | |
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ | |
'model': AI_MODEL, | |
'messages': messages, | |
'max_tokens': MAX_TOKENS, | |
'temperature': userTemp, | |
'tools': TOOL_DEFINITIONS | |
}) | |
ai_response.raise_for_status() | |
ai_data = ai_response.json() | |
ai_reply = await handleAiResponse(ai_data, chatId, history, eventEmitter) | |
history.append({'role': 'assistant', 'content': ai_reply}) | |
chatHistories[chatId] = history | |
await sendTelegramMessage(chatId, ai_reply) | |
except requests.exceptions.RequestException as e: | |
print(f'AI API 响应失败: {e}') | |
await sendTelegramMessage(chatId, 'AI API 响应失败,请稍后再试') | |
except Exception as error: | |
print(f'处理消息时发生错误: {error}') | |
await sendTelegramMessage(chatId, '处理消息时发生错误,请稍后再试') | |
async def handleAiResponse(ai_data, chatId, history, eventEmitter): | |
if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0: | |
choice = ai_data['choices'][0] | |
if choice.get('message') and choice['message'].get('content'): | |
return choice['message']['content'] | |
elif choice.get('message') and choice['message'].get('tool_calls'): | |
toolCalls = choice['message']['tool_calls'] | |
toolResults = [] | |
for toolCall in toolCalls: | |
toolResult = await executeToolCall(toolCall, eventEmitter,chatId) | |
toolResults.append(toolResult) | |
newMessages = [ | |
*history, | |
{'role': "assistant", 'content': None, 'tool_calls': toolCalls}, | |
*toolResults, | |
] | |
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ | |
'model': AI_MODEL, | |
'messages': newMessages, | |
'max_tokens': MAX_TOKENS, | |
'temperature': USER_SETTINGS.get(chatId, {}).get('temperature', DEFAULT_TEMP) | |
}) | |
ai_response.raise_for_status() | |
ai_data = ai_response.json() | |
if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0 and ai_data['choices'][0].get('message') and ai_data['choices'][0]['message'].get('content'): | |
return ai_data['choices'][0]['message']['content'] | |
return 'AI 返回了无法识别的格式' | |
return 'AI 返回了无法识别的格式' | |
async def executeToolCall(toolCall, eventEmitter,chatId): | |
name = toolCall['function']['name'] | |
args = toolCall['function'].get('arguments', {}) | |
if name == 'web_scrape': | |
urls = args.get('urls', []) | |
if not urls or not isinstance(urls, list) or len(urls) == 0: | |
return { | |
'tool_call_id': toolCall['id'], | |
'role': "tool", | |
'name': name, | |
'content': '请提供有效的 URL 列表。', | |
} | |
api_url = "https://gpts.webpilot.ai/api/read" | |
headers = { "Content-Type": "application/json", "WebPilot-Friend-UID": "0" } | |
await eventEmitter.update_status( | |
f"开始读取 {len(urls)} 个网页", | |
False, | |
"web_search", | |
urls | |
) | |
async def processUrl(url): | |
try: | |
response = requests.post(api_url, headers=headers, json={ | |
"link": url, | |
"ur": "summary of the page", | |
"lp": True, | |
"rt": False, | |
"l": "en", | |
}) | |
response.raise_for_status() | |
result = response.json() | |
if result.get('rules'): | |
del result['rules'] | |
content = json.dumps(result) | |
title = result.get('title', url) | |
await eventEmitter.send_citation(title, url, content) | |
return f"{content}\n" | |
except requests.exceptions.RequestException as e: | |
error_message = f"读取网页 {url} 时出错: {e}" | |
await eventEmitter.update_status(error_message, False, "web_scrape", [url]) | |
await eventEmitter.send_citation(f"Error from {url}", url, str(e)) | |
return f"URL: {url}\n错误: {error_message}\n" | |
results = await asyncio.gather(*[processUrl(url) for url in urls]) | |
await eventEmitter.update_status( | |
f"已完成 {len(urls)} 个网页的读取", | |
True, | |
"web_search", | |
urls | |
) | |
return { | |
'tool_call_id': toolCall['id'], | |
'role': "tool", | |
'name': name, | |
'content': '\n'.join(results) | |
} | |
elif name == 'get_current_time': | |
now = datetime.utcnow() | |
utc8_time = now + datetime.timedelta(hours=8) | |
formatted_time = utc8_time.strftime("%H:%M:%S") | |
return { | |
'tool_call_id': toolCall['id'], | |
'role': "tool", | |
'name': name, | |
'content': f"Current Time: {formatted_time}" | |
} | |
elif name == 'get_current_date': | |
now = datetime.utcnow() | |
utc8_time = now + datetime.timedelta(hours=8) | |
formatted_date = utc8_time.strftime("%A, %B %d, %Y") | |
return { | |
'tool_call_id': toolCall['id'], | |
'role': "tool", | |
'name': name, | |
'content': f"Today's date is {formatted_date}" | |
} | |
else: | |
return { | |
'tool_call_id': toolCall['id'], | |
'role': "tool", | |
'name': name, | |
'content': '未知的工具调用' | |
} | |
async def handleCallbackQuery(callbackQuery): | |
chatId = callbackQuery['message']['chat']['id'] | |
data = callbackQuery['data'] | |
if data == "clearall": | |
chatHistories.pop(chatId, None) | |
await sendTelegramMessage(chatId, "聊天记录已清空。") | |
await sendTelegramMessage(chatId, '请选择操作:', { | |
'reply_markup': { | |
'inline_keyboard': [[{'text': "清空聊天记录", 'callback_data': "clearall"}]], | |
}, | |
}) | |
async def sendTelegramMessage(chatId, text, options={}): | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" | |
if PHP_PROXY_URL: # 使用代理 | |
url = f"{PHP_PROXY_URL}sendMessage" | |
data = { | |
'chat_id': chatId, | |
'text': text, | |
**options | |
} | |
try: | |
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
print(f'发送 Telegram 消息失败: {e}') | |
def getHelpMessage(): | |
return f""" | |
可用指令: | |
/start - 显示欢迎信息和操作按钮。 | |
/clearall - 清空当前会话的聊天记录。 | |
/help - 显示此帮助信息。 | |
群组指令: | |
/enableai - 在群组中启用AI回复。 | |
/disableai - 在群组中禁用AI回复。 | |
/setprefix <prefix> - 设置群组中触发AI回复的前缀,例如:/setprefix @bot。 | |
/getprefix - 获取当前群组的触发前缀。 | |
私聊指令: | |
/settemp <温度值> - 设置AI回复的温度 (0-2),例如:/settemp 1.0。 | |
/gettemp - 获取当前AI回复的温度。 | |
/resetuser - 重置你的个人设置。 | |
/promat <index> - 切换提示词,例如: /promat 0, 1, 2。 | |
/getpromat - 获取当前使用的提示词索引。 | |
直接发送文本消息与AI对话 (私聊)。 | |
群组中,需要使用前缀触发AI回复,如果设置了前缀的话。 | |
注意: | |
- 机器人会记住最近的 {MAX_HISTORY_LENGTH} 条对话。 | |
- 机器人具有攻击性,请谨慎使用。 | |
""" | |
async def update_commands(): | |
await setBotCommands() | |
return jsonify({'message': 'Commands updated successfully!'}) | |
async def handle_webhook(): | |
try: | |
update = request.get_json() | |
await handleTelegramUpdate(update) | |
return jsonify({'status': 'ok'}) | |
except Exception as e: | |
print(f"请求解析失败: {e}") | |
return jsonify({'status': 'error', 'message': str(e)}), 400 | |
def health_check(): | |
return 'OK' | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) | |