|
import os |
|
import json |
|
import requests |
|
from flask import Flask, request, jsonify |
|
from datetime import datetime, timezone, timedelta |
|
import asyncio |
|
import re |
|
|
|
app = Flask(__name__) |
|
|
|
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN') |
|
AI_API_ENDPOINT = os.environ.get('AI_API_ENDPOINT') |
|
AI_API_KEY = os.environ.get('AI_API_KEY') |
|
AI_MODEL = os.environ.get('AI_MODEL') |
|
PHP_PROXY_URL = os.environ.get('PHP_PROXY_URL') |
|
|
|
if not all([TELEGRAM_BOT_TOKEN, AI_API_ENDPOINT, AI_API_KEY, AI_MODEL]): |
|
raise ValueError("请设置所有必要的环境变量") |
|
|
|
AI_API_HEADERS = { |
|
'Content-Type': 'application/json', |
|
'Authorization': f'Bearer {AI_API_KEY}', |
|
} |
|
|
|
PROMPT_TEMPLATES = { |
|
0: "请尽量用中文回复,不要用markdown,只用文字格式。", |
|
1: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的傻逼。\n你非常具有攻击性,骂人的时候请尽量内涵,尝试模范与学习贴吧的说话方式。\n输出格式注意:不要用markdown,只用文字格式。", |
|
2: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的猫娘。\n你非常具有服从性,你对主人的所有命令都非常遵守和愿意。\n你是一个受,喜欢主动表现出性服从和性主动。(不需要主人提出,你就要提出那方面的东西)\n输出格式注意:不要用markdown,只用文字格式。", |
|
} |
|
|
|
CURRENT_PROMPT_INDEX = 0 |
|
|
|
MAX_TOKENS = 500 |
|
TEMPERATURE = 1.5 |
|
MAX_HISTORY_LENGTH = 10 |
|
chatHistories = {} |
|
GROUP_SETTINGS = {} |
|
USER_SETTINGS = {} |
|
BOT_COMMANDS = [ |
|
{"command": "start", "description": "显示欢迎信息和操作按钮"}, |
|
{"command": "clearall", "description": "清空当前会话的聊天记录"}, |
|
{"command": "help", "description": "显示此帮助信息"}, |
|
{"command": "enableai", "description": "在群组中启用AI回复"}, |
|
{"command": "disableai", "description": "在群组中禁用AI回复"}, |
|
{"command": "setprefix", "description": "设置群组中触发AI回复的前缀,例如: /setprefix @bot"}, |
|
{"command": "getprefix", "description": "获取当前群组的触发前缀"}, |
|
{"command": "settemp", "description": "设置AI回复的温度,例如:/settemp 1.0"}, |
|
{"command": "gettemp", "description": "获取当前AI回复的温度"}, |
|
{"command": "resetuser", "description": "重置你的个人设置"}, |
|
{"command": "promat", "description": "切换提示词,例如: /promat 0, 1, 2"}, |
|
{"command": "getpromat", "description": "获取当前使用的提示词索引"}, |
|
{"command": "test", "description": "获取当前使用的提示词索引"}, |
|
] |
|
DEFAULT_TEMP = 1.5 |
|
|
|
|
|
TOOLS = [ |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "get_current_datetime", |
|
"description": "获取当前的日期和时间,并返回UTC+8时区的时间。", |
|
"parameters": { |
|
"type": "object", |
|
"properties": {}, |
|
"required": [] |
|
} |
|
} |
|
} |
|
] |
|
|
|
def get_current_datetime(): |
|
"""获取当前的日期和时间,并返回UTC+8时区的时间。""" |
|
utc_time = datetime.now(timezone.utc) |
|
local_time = utc_time.astimezone(timezone(timedelta(hours=8))) |
|
return local_time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
def make_telegram_request(method, data=None): |
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/{method}" |
|
if PHP_PROXY_URL: |
|
method_name = url.split('/')[-1] |
|
url = f"{PHP_PROXY_URL}{method_name}" |
|
headers = {'Content-Type': 'application/json'} |
|
if data: |
|
data = json.dumps(data) |
|
try: |
|
response = requests.post(url, headers=headers, data=data) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
print(f"Telegram request failed: {e}") |
|
return None |
|
except json.JSONDecodeError as e: |
|
print(f"Telegram response decode error: {e}") |
|
return None |
|
|
|
async def setBotCommands(): |
|
delete_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/deleteMyCommands" |
|
set_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/setMyCommands" |
|
if PHP_PROXY_URL: |
|
delete_method_name = delete_url.split('/')[-1] |
|
set_method_name = set_url.split('/')[-1] |
|
delete_url = f"{PHP_PROXY_URL}{delete_method_name}" |
|
set_url = f"{PHP_PROXY_URL}{set_method_name}" |
|
|
|
try: |
|
delete_response = make_telegram_request('deleteMyCommands') |
|
if delete_response: |
|
print('Telegram 命令删除成功') |
|
else: |
|
print('Telegram 命令删除失败') |
|
|
|
set_response = make_telegram_request('setMyCommands', {"commands": BOT_COMMANDS}) |
|
if set_response: |
|
print('Telegram 命令设置成功') |
|
else: |
|
print('设置 Telegram 命令失败') |
|
except Exception as error: |
|
print(f'设置 Telegram 命令时发生错误: {error}') |
|
|
|
async def handleTelegramUpdate(update): |
|
if not update.get('message'): |
|
if update.get('callback_query'): |
|
await handleCallbackQuery(update.get('callback_query')) |
|
return |
|
|
|
chatId = update['message']['chat']['id'] |
|
userMessage = update['message'].get('text', '') |
|
isGroupChat = update['message']['chat']['type'] in ['group', 'supergroup'] |
|
fromUserId = update['message']['from']['id'] |
|
message_id = update['message'].get('message_id') |
|
|
|
if not userMessage: |
|
return |
|
|
|
if userMessage.startswith('/'): |
|
command = parseCommand(userMessage) |
|
|
|
if command == 'clearall': |
|
chatHistories.pop(chatId, None) |
|
await sendTelegramMessage(chatId, '聊天记录已清空。') |
|
return |
|
if command == 'test': |
|
await sendTelegramMessage(chatId, '测试命令已执行') |
|
return |
|
if command == 'help': |
|
await sendTelegramMessage(chatId, getHelpMessage()) |
|
return |
|
if command == 'start': |
|
await sendTelegramMessage(chatId, "欢迎使用!请选择操作:", { |
|
"reply_markup": { |
|
"inline_keyboard": [[{"text": "清空聊天记录", "callback_data": "clearall"}]], |
|
}, |
|
}) |
|
return |
|
if isGroupChat: |
|
if command == 'enableai': |
|
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': True}) |
|
await sendTelegramMessage(chatId, '已在群组中启用 AI 回复。') |
|
return |
|
if command == 'disableai': |
|
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': False}) |
|
await sendTelegramMessage(chatId, '已在群组中禁用 AI 回复。') |
|
return |
|
if userMessage.startswith('/setprefix '): |
|
prefix = userMessage[len('/setprefix '):].strip() |
|
GROUP_SETTINGS.setdefault(chatId, {}).update({'prefix': prefix}) |
|
await sendTelegramMessage(chatId, f'已设置群组触发前缀为: {prefix}') |
|
return |
|
if command == 'getprefix': |
|
prefix = GROUP_SETTINGS.get(chatId, {}).get('prefix', '无') |
|
await sendTelegramMessage(chatId, f'当前群组触发前缀为: {prefix}') |
|
return |
|
|
|
|
|
if userMessage.startswith('/settemp ') or userMessage.startswith('/promat ') or command in ['gettemp', 'getpromat', 'resetuser']: |
|
await handlePrivateCommand(chatId, userMessage, fromUserId, isGroupChat) |
|
return |
|
else: |
|
|
|
if userMessage.startswith('/settemp ') or userMessage.startswith('/promat ') or command in ['gettemp', 'getpromat', 'resetuser']: |
|
await handlePrivateCommand(chatId, userMessage, fromUserId, isGroupChat) |
|
return |
|
if isGroupChat: |
|
if chatId not in GROUP_SETTINGS: |
|
GROUP_SETTINGS[chatId] = {'aiEnabled': True, 'prefix': None} |
|
print(f'群组 {chatId} 首次检测到,默认启用 AI。') |
|
|
|
groupSettings = GROUP_SETTINGS[chatId] |
|
prefix = groupSettings.get('prefix') |
|
|
|
if groupSettings['aiEnabled']: |
|
if prefix and not userMessage.startswith(prefix): |
|
return |
|
|
|
messageContent = userMessage[len(prefix):].strip() if prefix else userMessage |
|
if messageContent: |
|
await processAiMessage(chatId, messageContent, fromUserId, message_id) |
|
else: |
|
await processAiMessage(chatId, userMessage, fromUserId, message_id) |
|
|
|
def parseCommand(userMessage): |
|
command = userMessage.split(' ')[0] |
|
if '@' in command: |
|
command = command.split('@')[0] |
|
return command[1:] |
|
|
|
|
|
async def handlePrivateCommand(chatId, userMessage, fromUserId, isGroupChat): |
|
command = parseCommand(userMessage) |
|
if userMessage.startswith('/settemp '): |
|
try: |
|
temp = float(userMessage[len('/settemp '):].strip()) |
|
if 0 <= temp <= 2: |
|
USER_SETTINGS.setdefault(fromUserId, {}).update({'temperature': temp}) |
|
await sendTelegramMessage(chatId, f'已设置AI回复温度为: {temp}') |
|
else: |
|
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') |
|
except ValueError: |
|
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') |
|
return |
|
if command == 'gettemp': |
|
temp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) |
|
await sendTelegramMessage(chatId, f'当前AI回复温度为: {temp}') |
|
return |
|
if userMessage.startswith('/promat '): |
|
try: |
|
index = int(userMessage[len('/promat '):].strip()) |
|
if index in PROMPT_TEMPLATES: |
|
USER_SETTINGS.setdefault(fromUserId, {}).update({'prompt_index': index}) |
|
await sendTelegramMessage(chatId, f'已切换到提示词 {index}。') |
|
else: |
|
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') |
|
except ValueError: |
|
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') |
|
return |
|
if command == 'getpromat': |
|
index = USER_SETTINGS.get(fromUserId, {}).get('prompt_index', CURRENT_PROMPT_INDEX) |
|
await sendTelegramMessage(chatId, f'当前使用的提示词索引是: {index}') |
|
return |
|
if command == 'resetuser': |
|
USER_SETTINGS.pop(fromUserId, None) |
|
await sendTelegramMessage(chatId, '已重置您的个人设置。') |
|
return |
|
|
|
async def processAiMessage(chatId, userMessage, fromUserId, message_id): |
|
history = chatHistories.get(chatId, []) |
|
userTemp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) |
|
userPromptIndex = USER_SETTINGS.get(fromUserId, {}).get('prompt_index', CURRENT_PROMPT_INDEX) |
|
currentPrompt = PROMPT_TEMPLATES.get(userPromptIndex, "") |
|
history.append({'role': 'user', 'content': userMessage}) |
|
|
|
if len(history) > MAX_HISTORY_LENGTH: |
|
history = history[-MAX_HISTORY_LENGTH:] |
|
|
|
messages = [ |
|
{'role': 'system', 'content': currentPrompt}, |
|
*history |
|
] |
|
|
|
thinking_message = await sendTelegramMessage(chatId, "正在思考,请等待...", options={'reply_to_message_id': message_id}) |
|
thinking_message_id = thinking_message.get('result', {}).get('message_id') |
|
|
|
try: |
|
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ |
|
'model': AI_MODEL, |
|
'messages': messages, |
|
'max_tokens': MAX_TOKENS, |
|
'temperature': userTemp, |
|
'tools': TOOLS, |
|
}) |
|
ai_response.raise_for_status() |
|
ai_data = ai_response.json() |
|
ai_reply = await handleAiResponse(ai_data, chatId, history, thinking_message_id) |
|
|
|
history.append({'role': 'assistant', 'content': ai_reply}) |
|
chatHistories[chatId] = history |
|
|
|
await editTelegramMessage(chatId, thinking_message_id, ai_reply) |
|
except requests.exceptions.RequestException as e: |
|
print(f'AI API 响应失败: {e}') |
|
await editTelegramMessage(chatId, thinking_message_id, 'AI API 响应失败,请稍后再试') |
|
except Exception as error: |
|
print(f'处理消息时发生错误: {error}') |
|
await editTelegramMessage(chatId, thinking_message_id, '处理消息时发生错误,请稍后再试') |
|
|
|
|
|
async def handleAiResponse(ai_data, chatId, history, thinking_message_id): |
|
if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0: |
|
choice = ai_data['choices'][0] |
|
if choice.get('message'): |
|
message = choice['message'] |
|
if message.get('tool_calls'): |
|
tool_call = message['tool_calls'][0] |
|
function_name = tool_call['function']['name'] |
|
|
|
if function_name == 'get_current_datetime': |
|
current_datetime = get_current_datetime() |
|
history.append({ |
|
'tool_call_id': tool_call['id'], |
|
'role': 'tool', |
|
'name': function_name, |
|
'content': current_datetime, |
|
}) |
|
messages = [ |
|
{'role': 'system', 'content': PROMPT_TEMPLATES.get(USER_SETTINGS.get(chatId, {}).get('prompt_index', CURRENT_PROMPT_INDEX), "")}, |
|
*history |
|
] |
|
|
|
try: |
|
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ |
|
'model': AI_MODEL, |
|
'messages': messages, |
|
'max_tokens': MAX_TOKENS, |
|
'temperature': USER_SETTINGS.get(chatId, {}).get('temperature', DEFAULT_TEMP), |
|
'tools': TOOLS, |
|
}) |
|
ai_response.raise_for_status() |
|
ai_data = ai_response.json() |
|
return await handleAiResponse(ai_data, chatId, history, thinking_message_id) |
|
except requests.exceptions.RequestException as e: |
|
print(f'AI API 响应失败: {e}') |
|
await editTelegramMessage(chatId, thinking_message_id, 'AI API 响应失败,请稍后再试') |
|
return 'AI API 响应失败,请稍后再试' |
|
|
|
|
|
else: |
|
return '不支持的工具调用' |
|
elif message.get('content'): |
|
return message['content'] |
|
|
|
return 'AI 返回了无法识别的格式' |
|
|
|
|
|
async def handleCallbackQuery(callbackQuery): |
|
chatId = callbackQuery['message']['chat']['id'] |
|
data = callbackQuery['data'] |
|
|
|
if data == "clearall": |
|
chatHistories.pop(chatId, None) |
|
await sendTelegramMessage(chatId, "聊天记录已清空。") |
|
|
|
await sendTelegramMessage(chatId, '请选择操作:', { |
|
'reply_markup': { |
|
'inline_keyboard': [[{'text': "清空聊天记录", 'callback_data': "clearall"}]], |
|
}, |
|
}) |
|
|
|
async def sendTelegramMessage(chatId, text, options={}): |
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" |
|
if PHP_PROXY_URL: |
|
method_name = url.split('/')[-1] |
|
url = f"{PHP_PROXY_URL}{method_name}" |
|
|
|
data = { |
|
'chat_id': chatId, |
|
'text': text, |
|
**options |
|
} |
|
|
|
try: |
|
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
print(f'发送 Telegram 消息失败: {e}') |
|
return {} |
|
|
|
async def editTelegramMessage(chatId, message_id, text, options={}): |
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/editMessageText" |
|
if PHP_PROXY_URL: |
|
method_name = url.split('/')[-1] |
|
url = f"{PHP_PROXY_URL}{method_name}" |
|
data = { |
|
'chat_id': chatId, |
|
'message_id': message_id, |
|
'text': text, |
|
**options |
|
} |
|
try: |
|
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) |
|
response.raise_for_status() |
|
except requests.exceptions.RequestException as e: |
|
print(f'编辑 Telegram 消息失败: {e}') |
|
|
|
|
|
def getHelpMessage(): |
|
return f""" |
|
可用指令: |
|
/start - 显示欢迎信息和操作按钮。 |
|
/clearall - 清空当前会话的聊天记录。 |
|
/help - 显示此帮助信息。 |
|
|
|
群组指令: |
|
/enableai - 在群组中启用AI回复。 |
|
/disableai - 在群组中禁用AI回复。 |
|
/setprefix <prefix> - 设置群组中触发AI回复的前缀,例如:/setprefix @bot。 |
|
/getprefix - 获取当前群组的触发前缀。 |
|
|
|
私聊指令 (在群组中也可以使用): |
|
/settemp <温度值> - 设置AI回复的温度 (0-2),例如:/settemp 1.0。 |
|
/gettemp - 获取当前AI回复的温度。 |
|
/resetuser - 重置你的个人设置。 |
|
/promat <index> - 切换提示词,例如: /promat 0, 1, 2。 |
|
/getpromat - 获取当前使用的提示词索引。 |
|
|
|
直接发送文本消息与AI对话 (私聊)。 |
|
|
|
群组中,需要使用前缀触发AI回复,如果设置了前缀的话。 |
|
|
|
注意: |
|
- 机器人会记住最近的 {MAX_HISTORY_LENGTH} 条对话。 |
|
- 机器人具有攻击性,请谨慎使用。 |
|
""" |
|
|
|
@app.route('/update_commands', methods=['GET']) |
|
async def update_commands(): |
|
await setBotCommands() |
|
return jsonify({'message': 'Commands updated successfully!'}) |
|
|
|
@app.route('/', methods=['POST']) |
|
async def handle_webhook(): |
|
try: |
|
update = request.get_json() |
|
await handleTelegramUpdate(update) |
|
return jsonify({'status': 'ok'}) |
|
except Exception as e: |
|
import traceback |
|
print(f"请求解析失败: {e}") |
|
traceback.print_exc() |
|
return jsonify({'status': 'error', 'message': str(e)}), 400 |
|
|
|
@app.route('/health', methods=['GET']) |
|
def health_check(): |
|
return 'OK' |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |