Spaces:
Running
Running
import json | |
import logging | |
import asyncio | |
from functools import wraps, partial | |
from os import listdir | |
from os.path import exists, normpath | |
from re import sub | |
from typing import Dict | |
from deep_translator import GoogleTranslator as Translator | |
try: | |
import extensions.telegram_bot.source.text_process as tp | |
import extensions.telegram_bot.source.const as const | |
from extensions.telegram_bot.source.conf import cfg | |
from extensions.telegram_bot.source.user import User as User | |
except ImportError: | |
import source.text_process as tp | |
import source.const as const | |
from source.conf import cfg | |
from source.user import User as User | |
def async_wrap(func): | |
async def run(*args, loop=None, executor=None, **kwargs): | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
target_func = partial(func, *args, **kwargs) | |
return await loop.run_in_executor(executor, target_func) | |
return run | |
def translate_text(text: str, source="en", target="en"): | |
return Translator(source=source, target=target).translate(text) | |
async def prepare_text(original_text: str, user: User, direction="to_user"): | |
text = original_text | |
# translate | |
if cfg.llm_lang != user.language: | |
try: | |
if direction == "to_model": | |
text = await translate_text(text=text, source=user.language, target=cfg.llm_lang) | |
elif direction == "to_user": | |
text = await translate_text(text=text, source=cfg.llm_lang, target=user.language) | |
except Exception as exception: | |
text = "can't translate text:" + str(text) | |
logging.error("translator_error:\n" + str(exception) + "\n" + str(exception.args)) | |
# Add HTML tags and other... | |
if direction not in ["to_model", "no_html"]: | |
text = text.replace("#", "#").replace("<", "<").replace(">", ">") | |
original_text = original_text.replace("#", "#").replace("<", "<").replace(">", ">") | |
if len(original_text) > 2000: | |
original_text = original_text[:2000] | |
if len(text) > 2000: | |
text = text[:2000] | |
if cfg.llm_lang != user.language and direction == "to_user" and cfg.translation_as_hidden_text == "on": | |
text = ( | |
cfg.html_tag[0] | |
+ original_text | |
+ cfg.html_tag[1] | |
+ "\n" | |
+ cfg.translate_html_tag[0] | |
+ text | |
+ cfg.translate_html_tag[1] | |
) | |
else: | |
if len(text) > 4000: | |
text = text[:4000] | |
text = cfg.html_tag[0] + text + cfg.html_tag[1] | |
return text | |
def parse_characters_dir() -> list: | |
char_list = [] | |
for f in listdir(cfg.characters_dir_path): | |
if f.endswith((".json", ".yaml", ".yml")): | |
char_list.append(f) | |
return char_list | |
def parse_presets_dir() -> list: | |
preset_list = [] | |
for f in listdir(cfg.presets_dir_path): | |
if f.endswith(".txt") or f.endswith(".yaml"): | |
preset_list.append(f) | |
return preset_list | |
# User checking rules | |
def check_user_permission(chat_id): | |
# Read admins list | |
if exists(cfg.users_file_path): | |
with open(normpath(cfg.users_file_path), "r") as users_file: | |
users_list = users_file.read().split() | |
else: | |
users_list = [] | |
# check | |
if str(chat_id) in users_list or len(users_list) == 0: | |
return True | |
else: | |
return False | |
def check_user_rule(chat_id, option): | |
if exists(cfg.user_rules_file_path): | |
with open(normpath(cfg.user_rules_file_path), "r") as user_rules_file: | |
user_rules = json.loads(user_rules_file.read()) | |
# if checked button with numeral postfix - delete numerals | |
option = sub(r"[0123456789-]", "", option) | |
if option.endswith(const.BTN_OPTION): | |
option = const.BTN_OPTION | |
# Read admins list | |
if exists(cfg.admins_file_path): | |
with open(normpath(cfg.admins_file_path), "r") as admins_file: | |
admins_list = admins_file.read().split() | |
else: | |
admins_list = [] | |
# check admin rules | |
if str(chat_id) in admins_list or cfg.bot_mode == const.MODE_ADMIN: | |
return bool(user_rules[option][const.MODE_ADMIN]) | |
else: | |
return bool(user_rules[option][cfg.bot_mode]) | |
def init_check_user(users: Dict[int, User], chat_id): | |
if chat_id not in users: | |
# Load default | |
users.update({chat_id: User()}) | |
users[chat_id].user_id = chat_id | |
users[chat_id].load_character_file( | |
characters_dir_path=cfg.characters_dir_path, | |
char_file=cfg.character_file, | |
) | |
users[chat_id].load_user_history(f"{cfg.history_dir_path}/{str(chat_id)}.json") | |
users[chat_id].find_and_load_user_char_history(chat_id, cfg.history_dir_path) | |
def get_conversation_info(user: User): | |
history_tokens = -1 | |
context_tokens = -1 | |
greeting_tokens = -1 | |
conversation_tokens = -1 | |
try: | |
history_tokens = tp.generator.get_tokens_count(user.history_as_str()) | |
context_tokens = tp.generator.get_tokens_count(user.context) | |
greeting_tokens = tp.generator.get_tokens_count(user.greeting) | |
conversation_tokens = history_tokens + context_tokens + greeting_tokens | |
except Exception as e: | |
logging.error("options_button tokens_count" + str(e)) | |
max_token_param = "truncation_length" | |
max_tokens = cfg.generation_params[max_token_param] if max_token_param in cfg.generation_params else "???" | |
return ( | |
f"{user.name2}\n" | |
f"Conversation length {str(conversation_tokens)}/{max_tokens} tokens.\n" | |
f"(context {(str(context_tokens))}, " | |
f"greeting {(str(greeting_tokens))}, " | |
f"messages {(str(history_tokens))})\n" | |
f"Voice: {user.silero_speaker}\n" | |
f"Language: {user.language}" | |
) | |