a-v-bely's picture
updates
dc801b3
import pandas as pd
import streamlit as st
from json import loads
from re import search, compile
from courier.client import Courier
from secrets import token_urlsafe
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher()
def check_usr_pass(user_log_in_database, user_name: str, password: str) -> bool:
"""
Authenticates the user_name and password.
"""
registered_user = user_log_in_database.select('*').eq('user_name', user_name).execute()
if not registered_user.data:
return False
else:
try:
passwd_verification_bool = ph.verify(registered_user.data[0]['password'], password)
if passwd_verification_bool:
return True
return False
except VerifyMismatchError:
pass
except IndexError:
pass
return False
def check_valid_name(name_sign_up: str) -> bool:
"""
Checks if the user entered a valid name while creating the account.
"""
name_regex_eng = r'^[A-Za-z_]\w *'
name_regex_rus = r'^[А-Яа-я_][А-Яа-я0-9_] *'
if search(name_regex_eng, name_sign_up) or search(name_regex_rus, name_sign_up):
return True
return False
def check_valid_email(email_sign_up: str) -> bool:
"""
Checks if the user entered a valid email while creating the account.
"""
regex = compile(r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+')
return True
# if re.fullmatch(regex, email_sign_up):
# return True
# return False
def check_unique_email(user_log_in_database, email_sign_up: str) -> bool:
"""
Checks if the email already exists (since email needs to be unique).
"""
authorized_users_data = user_log_in_database.select('*').eq('email', email_sign_up).execute().data
if len(authorized_users_data) == 0:
return True
return False
def non_empty_str_check(user_name_sign_up: str) -> bool:
"""
Checks for non-empty strings.
"""
empty_count = 0
for i in user_name_sign_up:
if i == ' ':
empty_count = empty_count + 1
if empty_count == len(user_name_sign_up):
return False
if not user_name_sign_up:
return False
return True
def check_unique_usr(user_log_in_database, user_name_sign_up: str):
"""
Checks if the user_name already exists (since user_name needs to be unique),
also checks for non-empty user_name.
"""
authorized_users_data = user_log_in_database.select('*').eq('user_name', user_name_sign_up).execute().data
if len(authorized_users_data) != 0:
return False
non_empty_check = non_empty_str_check(user_name_sign_up)
if not non_empty_check:
return None
return True
def register_new_usr(user_log_in_database, name_sign_up: str, email_sign_up: str, user_name_sign_up: str,
password_sign_up: str, professional_level: str, created_at: str) -> None:
"""
Saves the information of the new user in the _secret_auth.json file.
"""
new_usr_data = {'user_name': user_name_sign_up,
'name': name_sign_up,
'email': email_sign_up,
'password': ph.hash(password_sign_up),
'professional_level': professional_level,
'created_at': created_at}
return user_log_in_database.insert(new_usr_data).execute()
def check_user_name_exists(user_log_in_database, user_name: str) -> bool:
"""
Checks if the user_name exists in the _secret_auth.json file.
"""
authorized_users_data = user_log_in_database.select('*').eq('user_name', user_name).execute().data
if len(authorized_users_data) == 1:
return True
return False
def check_email_exists(user_log_in_database, email_forgot_passwd: str):
"""
Checks if the email entered is present in the _secret_auth.json file.
"""
authorized_users_data = user_log_in_database.select('*').eq('email', email_forgot_passwd).execute().data
if len(authorized_users_data) == 1:
return True, authorized_users_data[0]['user_name']
return False, None
def generate_random_passwd() -> str:
"""
Generates a random password to be sent in email.
"""
password_length = 10
return token_urlsafe(password_length)
def send_passwd_in_email(auth_token: str, user_name_forgot_passwd: str, email_forgot_passwd: str, company_name: str,
random_password: str) -> None:
"""
Triggers an email to the user containing the randomly generated password.
"""
client = Courier(auth_token=auth_token)
client.send_message(
message={
"to": {
"email": email_forgot_passwd
},
"content": {
"title": f'{company_name}: Login Password!',
"body": f'Hi! {user_name_forgot_passwd},\n\nYour temporary login password is: {random_password}\n\n'
+ '{{info}}'
},
"data": {
"info": "Please reset your password at the earliest for security reasons."
}
}
)
def change_passwd(user_log_in_database, email_forgot_passwd: str, random_password: str) -> None:
"""
Replaces the old password with the newly generated password.
"""
updates = {'password': ph.hash(random_password)}
return user_log_in_database.update(updates).eq('email', email_forgot_passwd).execute()
def check_current_passwd(user_log_in_database, email_reset_passwd: str, current_passwd: str = None) -> bool:
"""
Authenticates the password entered against the user_name when
resetting the password.
"""
authorized_user_data = user_log_in_database.select('*').eq('email', email_reset_passwd).execute().data[0]
if current_passwd is None:
current_passwd = 'b'
try:
if ph.verify(authorized_user_data['password'], current_passwd):
return True
except VerifyMismatchError:
pass
return False
def save_data_in_database(user_task_database, save_type, save_name, cefr_level, created_at, creator_name=None,
generated_result=None, test_taker_name=None, test_taker_answers=None, test_taker_result=None,
comments=None, distractor_model=None, allow=False):
already_saved = user_task_database.select('*').execute().data
already_saved_names = [task for task in already_saved
if (task['creator_name']==creator_name
and task['save_name']==save_name
and task['cefr_level']==cefr_level)]
already_saved_tasks = [task for task in already_saved
if (task['creator_name']==creator_name
and task['generated_result']==generated_result
and task['cefr_level']==cefr_level)]
already_saved_tests = [task for task in already_saved
if (task['test_taker_name']==test_taker_name
and task['save_name']==save_name
and task['cefr_level']==cefr_level)]
if save_name == '' and save_type == 'download':
save_name = generated_result['name']
if len(already_saved_names) != 0 and save_type == 'download':
return st.success('Файл с таким названием уже существует! Введите другое название и повторите попытку.')
elif len(already_saved_tasks) != 0 and save_type == 'download':
return st.error(f'Вы уже сохраняли эти задания под именем {already_saved_tasks[0]["save_name"]}. ')
elif (len(already_saved_tests) != 0
and save_type == 'online_test'): # and int(test_taker_result) == int(already_saved_tests[0]["user_points"])
return st.error('Вы уже решали данный тест!')
else:
if save_type == 'download':
new_save_data = {
'save_type': save_type,
'save_name': save_name,
'cefr_level': cefr_level,
'created_at': created_at,
'creator_name': creator_name,
'generated_result': generated_result,
'distractor_model': distractor_model
}
else:
new_save_data = {
'save_type': save_type,
'save_name': save_name,
'cefr_level': cefr_level,
'created_at': created_at,
'creator_name': creator_name,
'test_taker_name': test_taker_name,
'test_taker_answers': test_taker_answers,
'generated_result': generated_result,
'test_taker_result': test_taker_result,
'comments': comments}
user_task_database.insert(new_save_data).execute()
if save_type == 'download':
if allow:
return st.success('Задания успешно сохранены! Можете переходить на следующие вкладки')
elif save_type == 'online_test':
return st.success('Ответы успешно сохранены!')
def load_user_tasks_data(user_task_database, save_type, creator_name=None, test_taker_name=None):
if save_type == 'download':
user_data = user_task_database.select('*').eq('creator_name', creator_name).eq('save_type', save_type).execute().data
names = [item['save_name'] for item in user_data]
cefr_level = [item['cefr_level'] for item in user_data]
created_ats = [item['created_at'] for item in user_data]
return_data = pd.DataFrame([names, cefr_level, created_ats]).transpose()
return_data.columns = ['Название', 'Уровень', 'Время создания']
else:
user_data = user_task_database.select('*').eq('test_taker_name', test_taker_name).eq('save_type', save_type).execute().data
names = [item['save_name'] for item in user_data]
cefr_level = [item['cefr_level'] for item in user_data]
created_ats = [item['created_at'] for item in user_data]
creator_name = [item['creator_name'] for item in user_data]
test_taker_result = [item['test_taker_result'] for item in user_data]
return_data = pd.DataFrame([names, cefr_level, test_taker_result, created_ats, creator_name]).transpose()
return_data.columns = ['Название', 'Уровень', 'Оценка', 'Дата прохождения', 'Автор заданий']
return return_data
def load_users_particular_task(user_task_database, load_mode, creator_name, save_name, cefr_level,):
return_data = user_task_database.select('*').eq('creator_name', creator_name)\
.eq('save_name', save_name)\
.eq('save_type', load_mode)\
.eq('cefr_level',cefr_level).execute().data[0]['generated_result']
return_data = loads(return_data.replace("'", '"'), strict=False)
return return_data