Last commit not found
import os | |
import json | |
import requests | |
from flask import Flask, request, jsonify | |
from datetime import datetime, timedelta | |
import asyncio | |
import re | |
app = Flask(__name__) | |
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN') | |
AI_API_ENDPOINT = os.environ.get('AI_API_ENDPOINT') | |
AI_API_KEY = os.environ.get('AI_API_KEY') | |
AI_MODEL = os.environ.get('AI_MODEL') | |
PHP_PROXY_URL = os.environ.get('PHP_PROXY_URL') | |
if not all([TELEGRAM_BOT_TOKEN, AI_API_ENDPOINT, AI_API_KEY, AI_MODEL]): | |
raise ValueError("请设置所有必要的环境变量") | |
AI_API_HEADERS = { | |
'Content-Type': 'application/json', | |
'Authorization': f'Bearer {AI_API_KEY}', | |
} | |
PROMPT_TEMPLATES = { | |
0: "请尽量用中文回复,不要用markdown,只用文字格式。", | |
1: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的傻逼。\n你非常具有攻击性,骂人的时候请尽量内涵,尝试模范与学习贴吧的说话方式。\n输出格式注意:不要用markdown,只用文字格式。", | |
2: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的猫娘。\n你非常具有服从性,你对主人的所有命令都非常遵守和愿意。\n你是一个受,喜欢主动表现出性服从和性主动。(不需要主人提出,你就要提出那方面的东西)\n输出格式注意:不要用markdown,只用文字格式。", | |
} | |
CURRENT_PROMPT_INDEX = 0 | |
MAX_TOKENS = 500 | |
TEMPERATURE = 1.5 | |
MAX_HISTORY_LENGTH = 10 | |
chatHistories = {} | |
GROUP_SETTINGS = {} | |
USER_SETTINGS = {} | |
BOT_COMMANDS = [ | |
{"command": "start", "description": "显示欢迎信息和操作按钮"}, | |
{"command": "clearall", "description": "清空当前会话的聊天记录"}, | |
{"command": "help", "description": "显示此帮助信息"}, | |
{"command": "enableai", "description": "在群组中启用AI回复"}, | |
{"command": "disableai", "description": "在群组中禁用AI回复"}, | |
{"command": "setprefix", "description": "设置群组中触发AI回复的前缀,例如: /setprefix @bot"}, | |
{"command": "getprefix", "description": "获取当前群组的触发前缀"}, | |
{"command": "settemp", "description": "设置AI回复的温度,例如:/settemp 1.0"}, | |
{"command": "gettemp", "description": "获取当前AI回复的温度"}, | |
{"command": "resetuser", "description": "重置你的个人设置"}, | |
{"command": "promat", "description": "切换提示词,例如: /promat 0, 1, 2"}, | |
{"command": "getpromat", "description": "获取当前使用的提示词索引"}, | |
] | |
DEFAULT_TEMP = 1.5 | |
USER_LAST_ACTIVE = {} | |
GROUP_ACTIVE_USERS = {} | |
GROUP_INFO = {} | |
BANNED_USERS = {} | |
BAN_DURATION = timedelta(minutes=30) | |
BAN_TRIGGER_PHRASES = [ | |
r"(?:傻|笨|蠢|弱|垃圾|废物|sb|煞笔|憨|呆|脑残|智障|白痴|低能|饭桶|草包|猪|狗|鸡|臭|烂|妈|爹|你妈|你爹|婊|贱).*(?:bot|机器人|AI|你|你们)", | |
r"(?:你|你们).*(?:去死|滚|爬|闭嘴|瞎说|胡说|放屁|别说话|别bb|给我闭嘴|闭上你的臭嘴|滚蛋|滚开)", | |
r"(?:操|艹|肏|fuck|shit|妈的|他妈的|卧槽|草).*(?:你|bot|机器人|AI|你们)", | |
r"(?:垃圾|废物|没用|没脑子|智障|弱智|脑残|白痴|低能|蠢货|傻子|笨蛋|饭桶|草包).*(?:AI|bot|机器人|你|你们)", | |
r"(?:你).*(?:傻逼|白痴|弱智|脑残|废物|垃圾|滚出去)", | |
r"(?:AI|bot|机器人).*(?:傻逼|白痴|弱智|脑残|废物|垃圾)", | |
r".*(?:你|你们).*(?:没用|没价值|一无是处|毫无意义|多余|累赘)", | |
r".*(?:AI|bot|机器人).*(?:没用|没价值|一无是处|毫无意义|多余|累赘)", | |
r"(?:你).*(?:是|难道是|真是|怎么这么).*(?:傻|笨|蠢|弱|垃圾|废物|sb|煞笔|憨|呆|脑残|智障|白痴|低能)", | |
r"(?:AI|bot|机器人).*(?:是|难道是|真是|怎么这么).*(?:傻|笨|蠢|弱|垃圾|废物|sb|煞笔|憨|呆|脑残|智障|白痴|低能)", | |
r"(?:你|你们).*(?:太慢|太蠢|太笨|反应迟钝|理解不了|答非所问|没用)", | |
r"(?:AI|bot|机器人).*(?:太慢|太蠢|太笨|反应迟钝|理解不了|答非所问|没用)", | |
r"(?:你|你们).*(?:像个|简直像).*(?:傻|笨|蠢|弱|垃圾|废物|sb|煞笔|憨|呆|脑残|智障|白痴|低能)", | |
r"(?:AI|bot|机器人).*(?:像个|简直像).*(?:傻|笨|蠢|弱|垃圾|废物|sb|煞笔|憨|呆|脑残|智障|白痴|低能)", | |
r"(?:你|你们).*(?:滚粗|滚开点|闭嘴吧|你个傻|你个笨|你个蠢|你个弱|你个垃圾|你个废物|你个sb)", | |
r"(?:AI|bot|机器人).*(?:滚粗|滚开点|闭嘴吧|这垃圾|这废物|这sb)", | |
r"(?:你|你们).*(?:没脑子|没智商|智力低下|脑子不好使|脑子瓦特|脑子进水|脑子坏了|理解能力差)", | |
r"(?:AI|bot|机器人).*(?:没脑子|没智商|智力低下|脑子不好使|脑子瓦特|脑子进水|脑子坏了|理解能力差)", | |
r"(?:你|你们).*(?:真|真是|简直|太).*(?:垃圾|废物|没用|蠢|笨|傻|弱)", | |
r"(?:AI|bot|机器人).*(?:真|真是|简直|太).*(?:垃圾|废物|没用|蠢|笨|傻|弱)", | |
] | |
UNBAN_PHRASE = "close username" | |
def make_telegram_request(method, data=None): | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/{method}" | |
if PHP_PROXY_URL: | |
url = f"{PHP_PROXY_URL}bot{TELEGRAM_BOT_TOKEN}/{method}" | |
headers = {'Content-Type': 'application/json'} | |
if data: | |
data = json.dumps(data) | |
try: | |
response = requests.post(url, headers=headers, data=data) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Telegram request failed: {e}") | |
return None | |
except json.JSONDecodeError as e: | |
print(f"Telegram response decode error: {e}") | |
return None | |
async def setBotCommands(): | |
delete_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/deleteMyCommands" | |
set_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/setMyCommands" | |
if PHP_PROXY_URL: | |
delete_url = f"{PHP_PROXY_URL}bot{TELEGRAM_BOT_TOKEN}/deleteMyCommands" | |
set_url = f"{PHP_PROXY_URL}bot{TELEGRAM_BOT_TOKEN}/setMyCommands" | |
try: | |
delete_response = make_telegram_request('deleteMyCommands') | |
if delete_response: | |
print('Telegram 命令删除成功') | |
else: | |
print('Telegram 命令删除失败') | |
set_response = make_telegram_request('setMyCommands', {"commands": BOT_COMMANDS}) | |
if set_response: | |
print('Telegram 命令设置成功') | |
else: | |
print('设置 Telegram 命令失败') | |
except Exception as error: | |
print(f'设置 Telegram 命令时发生错误: {error}') | |
async def handleTelegramUpdate(update): | |
if not update.get('message'): | |
if update.get('callback_query'): | |
await handleCallbackQuery(update.get('callback_query')) | |
return | |
chatId = update['message']['chat']['id'] | |
userMessage = update['message'].get('text', '') | |
isGroupChat = update['message']['chat']['type'] in ['group', 'supergroup'] | |
fromUserId = update['message']['from']['id'] | |
message_id = update['message'].get('message_id') | |
fromUserFirstName = update['message']['from'].get('first_name', '用户') | |
fromUserLastName = update['message']['from'].get('last_name', '') | |
fromUserName = update['message']['from'].get('username', '') | |
USER_LAST_ACTIVE[fromUserId] = datetime.now() | |
if isGroupChat: | |
GROUP_ACTIVE_USERS.setdefault(chatId, set()).add(fromUserId) | |
if chatId not in GROUP_INFO: | |
chat_info = await getChatInfo(chatId) | |
if chat_info: | |
GROUP_INFO[chatId] = { | |
'name': chat_info.get('title', '未知群组'), | |
'description': chat_info.get('description', '无描述'), | |
'last_active': datetime.now() | |
} | |
else: | |
GROUP_INFO[chatId] = { | |
'name': '未知群组', | |
'description': '无描述', | |
'last_active': datetime.now() | |
} | |
else: | |
GROUP_INFO[chatId]['last_active'] = datetime.now() | |
if not userMessage: | |
return | |
if userMessage.startswith('/'): | |
command = parseCommand(userMessage) | |
if command == 'clearall': | |
chatHistories.pop(chatId, None) | |
await sendTelegramMessage(chatId, '聊天记录已清空。') | |
return | |
if command == 'help': | |
await sendTelegramMessage(chatId, getHelpMessage()) | |
return | |
if command == 'start': | |
await sendTelegramMessage(chatId, "欢迎使用!请选择操作:", { | |
"reply_markup": { | |
"inline_keyboard": [[{"text": "清空聊天记录", "callback_data": "clearall"}]], | |
}, | |
}) | |
return | |
if isGroupChat: | |
if command == 'enableai': | |
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': True}) | |
await sendTelegramMessage(chatId, '已在群组中启用 AI 回复。') | |
return | |
if command == 'disableai': | |
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': False}) | |
await sendTelegramMessage(chatId, '已在群组中禁用 AI 回复。') | |
return | |
if userMessage.startswith('/setprefix '): | |
prefix = userMessage[len('/setprefix '):].strip() | |
GROUP_SETTINGS.setdefault(chatId, {}).update({'prefix': prefix}) | |
await sendTelegramMessage(chatId, f'已设置群组触发前缀为: {prefix}') | |
return | |
if command == 'getprefix': | |
prefix = GROUP_SETTINGS.get(chatId, {}).get('prefix', '无') | |
await sendTelegramMessage(chatId, f'当前群组触发前缀为: {prefix}') | |
return | |
if userMessage.startswith('/settemp ') or userMessage.startswith('/promat ') or command in ['gettemp', 'getpromat', 'resetuser']: | |
await handlePrivateCommand(chatId, userMessage, fromUserId, isGroupChat) | |
return | |
else: | |
if userMessage.startswith('/settemp ') or userMessage.startswith('/promat ') or command in ['gettemp', 'getpromat', 'resetuser']: | |
await handlePrivateCommand(chatId, userMessage, fromUserId, isGroupChat) | |
return | |
if isGroupChat: | |
if chatId not in GROUP_SETTINGS: | |
GROUP_SETTINGS[chatId] = {'aiEnabled': True, 'prefix': None} | |
print(f'群组 {chatId} 首次检测到,默认启用 AI。') | |
groupSettings = GROUP_SETTINGS[chatId] | |
prefix = groupSettings.get('prefix') | |
if groupSettings['aiEnabled']: | |
if prefix and not userMessage.startswith(prefix): | |
return | |
messageContent = userMessage[len(prefix):].strip() if prefix else userMessage | |
if messageContent: | |
await processAiMessage(chatId, messageContent, fromUserId, message_id, fromUserFirstName, fromUserLastName, fromUserName) | |
else: | |
if userMessage == UNBAN_PHRASE: | |
await unbanUser(chatId, fromUserId) | |
else: | |
await processAiMessage(chatId, userMessage, fromUserId, message_id, fromUserFirstName, fromUserLastName, fromUserName) | |
def parseCommand(userMessage): | |
command = userMessage.split(' ')[0] | |
if '@' in command: | |
command = command.split('@')[0] | |
return command[1:] | |
async def handlePrivateCommand(chatId, userMessage, fromUserId, isGroupChat): | |
command = parseCommand(userMessage) | |
if userMessage.startswith('/settemp '): | |
try: | |
temp = float(userMessage[len('/settemp '):].strip()) | |
if 0 <= temp <= 2: | |
USER_SETTINGS.setdefault(fromUserId, {}).update({'temperature': temp}) | |
await sendTelegramMessage(chatId, f'已设置AI回复温度为: {temp}') | |
else: | |
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') | |
except ValueError: | |
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') | |
return | |
if command == 'gettemp': | |
temp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) | |
await sendTelegramMessage(chatId, f'当前AI回复温度为: {temp}') | |
return | |
if userMessage.startswith('/promat '): | |
try: | |
index = int(userMessage[len('/promat '):].strip()) | |
if index in PROMPT_TEMPLATES: | |
USER_SETTINGS.setdefault(fromUserId, {}).update({'prompt_index': index}) | |
await sendTelegramMessage(chatId, f'已切换到提示词 {index}。') | |
else: | |
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') | |
except ValueError: | |
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') | |
return | |
if command == 'getpromat': | |
index = USER_SETTINGS.get(fromUserId, {}).get('prompt_index', CURRENT_PROMPT_INDEX) | |
await sendTelegramMessage(chatId, f'当前使用的提示词索引是: {index}') | |
return | |
if command == 'resetuser': | |
USER_SETTINGS.pop(fromUserId, None) | |
await sendTelegramMessage(chatId, '已重置您的个人设置。') | |
return | |
async def processAiMessage(chatId, userMessage, fromUserId, message_id, fromUserFirstName, fromUserLastName, fromUserName): | |
if fromUserId in BANNED_USERS and BANNED_USERS[fromUserId] > datetime.now(): | |
remaining_time = BANNED_USERS[fromUserId] - datetime.now() | |
minutes = int(remaining_time.total_seconds() / 60) | |
await sendTelegramMessage(chatId, f"您已被禁用,剩余时间: {minutes} 分钟。", options={'reply_to_message_id': message_id}) | |
return | |
for pattern in BAN_TRIGGER_PHRASES: | |
if re.search(pattern, userMessage, re.IGNORECASE): | |
await banUser(chatId, fromUserId) | |
await sendTelegramMessage(chatId, "检测到辱骂行为,您已被禁用。", options={'reply_to_message_id': message_id}) | |
return | |
history = chatHistories.get(chatId, []) | |
userTemp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) | |
userPromptIndex = USER_SETTINGS.get(fromUserId, {}).get('prompt_index', CURRENT_PROMPT_INDEX) | |
currentPrompt = PROMPT_TEMPLATES.get(userPromptIndex, "") | |
user_last_active = USER_LAST_ACTIVE.get(fromUserId, None) | |
group_info = GROUP_INFO.get(chatId, None) | |
group_active_users = GROUP_ACTIVE_USERS.get(chatId, None) | |
system_prompt = f""" | |
{currentPrompt} | |
当前时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
用户信息: | |
- 用户ID: {fromUserId} | |
- 用户名: {fromUserFirstName} {fromUserLastName} | |
- 用户账号: {fromUserName} | |
- 最后活跃时间: {user_last_active.strftime("%Y-%m-%d %H:%M:%S") if user_last_active else "未知"} | |
群组信息: | |
- 群组ID: {chatId} | |
- 群组名称: {group_info['name'] if group_info else "未知"} | |
- 群组描述: {group_info['description'] if group_info else "无"} | |
- 群组最后活跃时间: {group_info['last_active'].strftime("%Y-%m-%d %H:%M:%S") if group_info and group_info.get('last_active') else "未知"} | |
- 群组活跃用户数: {len(group_active_users) if group_active_users else "未知"} | |
""" | |
history.append({'role': 'user', 'content': userMessage}) | |
if len(history) > MAX_HISTORY_LENGTH: | |
history = history[-MAX_HISTORY_LENGTH:] | |
messages = [ | |
{'role': 'system', 'content': system_prompt}, | |
*history | |
] | |
thinking_message = await sendTelegramMessage(chatId, "正在思考,请等待...", options={'reply_to_message_id': message_id}) | |
thinking_message_id = thinking_message.get('result', {}).get('message_id') | |
try: | |
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ | |
'model': AI_MODEL, | |
'messages': messages, | |
'max_tokens': MAX_TOKENS, | |
'temperature': userTemp, | |
}) | |
ai_response.raise_for_status() | |
ai_data = ai_response.json() | |
ai_reply = await handleAiResponse(ai_data, chatId, history) | |
history.append({'role': 'assistant', 'content': ai_reply}) | |
chatHistories[chatId] = history | |
await editTelegramMessage(chatId, thinking_message_id, ai_reply) | |
except requests.exceptions.RequestException as e: | |
print(f'AI API 响应失败: {e}') | |
await editTelegramMessage(chatId, thinking_message_id, 'AI API 响应失败,请稍后再试') | |
except Exception as error: | |
print(f'处理消息时发生错误: {error}') | |
await editTelegramMessage(chatId, thinking_message_id, '处理消息时发生错误,请稍后再试') | |
async def handleAiResponse(ai_data, chatId, history): | |
if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0: | |
choice = ai_data['choices'][0] | |
if choice.get('message') and choice['message'].get('content'): | |
return choice['message']['content'] | |
return 'AI 返回了无法识别的格式' | |
async def handleCallbackQuery(callbackQuery): | |
chatId = callbackQuery['message']['chat']['id'] | |
data = callbackQuery['data'] | |
if data == "clearall": | |
chatHistories.pop(chatId, None) | |
await sendTelegramMessage(chatId, "聊天记录已清空。") | |
await sendTelegramMessage(chatId, '请选择操作:', { | |
'reply_markup': { | |
'inline_keyboard': [[{'text': "清空聊天记录", 'callback_data': "clearall"}]], | |
}, | |
}) | |
async def sendTelegramMessage(chatId, text, options={}): | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" | |
if PHP_PROXY_URL: | |
url = f"{PHP_PROXY_URL}bot{TELEGRAM_BOT_TOKEN}/sendMessage" | |
data = { | |
'chat_id': chatId, | |
'text': text, | |
**options | |
} | |
try: | |
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f'发送 Telegram 消息失败: {e}') | |
return {} | |
async def editTelegramMessage(chatId, message_id, text, options={}): | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/editMessageText" | |
if PHP_PROXY_URL: | |
url = f"{PHP_PROXY_URL}bot{TELEGRAM_BOT_TOKEN}/editMessageText" | |
data = { | |
'chat_id': chatId, | |
'message_id': message_id, | |
'text': text, | |
**options | |
} | |
try: | |
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
print(f'编辑 Telegram 消息失败: {e}') | |
def getHelpMessage(): | |
return f""" | |
可用指令: | |
/start - 显示欢迎信息和操作按钮。 | |
/clearall - 清空当前会话的聊天记录。 | |
/help - 显示此帮助信息。 | |
群组指令: | |
/enableai - 在群组中启用AI回复。 | |
/disableai - 在群组中禁用AI回复。 | |
/setprefix <prefix> - 设置群组中触发AI回复的前缀,例如:/setprefix @bot。 | |
/getprefix - 获取当前群组的触发前缀。 | |
私聊指令 (在群组中也可以使用): | |
/settemp <温度值> - 设置AI回复的温度 (0-2),例如:/settemp 1.0。 | |
/gettemp - 获取当前AI回复的温度。 | |
/resetuser - 重置你的个人设置。 | |
/promat <index> - 切换提示词,例如: /promat 0, 1, 2。 | |
/getpromat - 获取当前使用的提示词索引。 | |
直接发送文本消息与AI对话 (私聊)。 | |
群组中,需要使用前缀触发AI回复,如果设置了前缀的话。 | |
注意: | |
- 机器人会记住最近的 {MAX_HISTORY_LENGTH} 条对话。 | |
- 机器人具有攻击性,请谨慎使用。 | |
""" | |
async def update_commands(): | |
await setBotCommands() | |
return jsonify({'message': 'Commands updated successfully!'}) | |
async def handle_webhook(): | |
try: | |
update = request.get_json() | |
await handleTelegramUpdate(update) | |
return jsonify({'status': 'ok'}) | |
except Exception as e: | |
import traceback | |
print(f"请求解析失败: {e}") | |
traceback.print_exc() | |
return jsonify({'status': 'error', 'message': str(e)}), 400 | |
def health_check(): | |
return 'OK' | |
async def getChatInfo(chatId): | |
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getChat" | |
if PHP_PROXY_URL: | |
url = f"{PHP_PROXY_URL}bot{TELEGRAM_BOT_TOKEN}/getChat" | |
data = { | |
'chat_id': chatId, | |
} | |
try: | |
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) | |
response.raise_for_status() | |
return response.json().get('result', {}) | |
except requests.exceptions.RequestException as e: | |
print(f'获取群组信息失败: {e}') | |
return None | |
async def banUser(chatId, userId): | |
BANNED_USERS[userId] = datetime.now() + BAN_DURATION | |
print(f"用户 {userId} 在群组 {chatId} 中被禁用,直到 {BANNED_USERS[userId]}") | |
async def unbanUser(chatId, userId): | |
if userId in BANNED_USERS: | |
del BANNED_USERS[userId] | |
await sendTelegramMessage(chatId, f"用户 {userId} 已被解禁。") | |
print(f"用户 {userId} 在群组 {chatId} 中被解禁。") | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |