import os import lxml.etree as ET import pandas as pd import numpy as np from tensorflow.keras.preprocessing.text import Tokenizer import spacy from tqdm import tqdm import configparser import pickle import re from transformers import pipeline import torch from tqdm import tqdm import Stemmer global stemmer import json #exclude_tags=['graphic', 'figure'] #include_tags=['note', 'notePara', 'para'] exclude_tags=['graphic'] include_tags=['note', 'notePara', 'para', 'title', 'warningAndCautionPara', 'techName', 'infoName'] add_colon_tags=['title', 'techName'] make_lower_parent_tags=['listItemDefinition'] PARSE_PATHS=['//dmodule/content[last()]/procedure[last()]/preliminaryRqmts[last()]', '//dmodule/content[last()]/procedure[last()]/mainProcedure[last()]', '//dmodule/content[last()]/description[last()]', '//dmodule/content[last()]/crew[last()]/crewRefCard[last()]/crewDrill[last()]', '//dmodule/identAndStatusSection[last()]/dmAddress[last()]/dmAddressItems[last()]/dmTitle[last()]'] PERSCENTAGE_IN_RATIO=0.5 THRESHOLD=0.1 global nlp, tokenizer_search, tokenizer_qa, device global search_df, qa_df, SEARCH_DATA global index_data_loaded, qa_index_data_loaded, qa_model_initialized global qa_model, qa_model_num PUBLICATION_DEMO_RU_PATH="publications/Demo publication in Russian" PUBLICATION_DEMO_EN_PATH="publications/Bike Data Set for Release number 5.0" PUBLICATION_PATH=PUBLICATION_DEMO_RU_PATH TOKENIZER_SEARCH_FILENAME='tokenizer_search.pickle' TOKENIZER_QA_FILENAME='tokenizer_qa.pickle' INDEX_FOLDER= PUBLICATION_PATH+ os.sep+ "index" #print('INDEX_FOLDER:', INDEX_FOLDER) TOKENIZER_SEARCH_PATH= INDEX_FOLDER+ os.sep+ TOKENIZER_SEARCH_FILENAME TOKENIZER_QA_PATH= INDEX_FOLDER+ os.sep+ TOKENIZER_QA_FILENAME #print('TOKENIZER_SEARCH_PATH:', TOKENIZER_SEARCH_PATH) PUBLICATION_LANGUAGE="ru" nlp=None search_df=None qa_df=None index_data_loaded=False qa_index_data_loaded=False SEARCH_DATA= None device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') qa_model_initialized=False def get_xpath_one(tree, xpath): res = tree.xpath(xpath) if res: return res[0] def get_dmc(doc): dmc="" node= get_xpath_one(doc, '//dmCode') dmc='DMC-'+'-'.join([node.get('modelIdentCode'), \ node.get('itemLocationCode'), \ node.get('systemCode'), \ node.get('subSystemCode')+node.get('subSubSystemCode'), \ node.get('assyCode'),\ node.get('disassyCode')+node.get('disassyCodeVariant'),\ node.get('infoCode')+node.get('infoCodeVariant'),\ node.get('systemDiffCode')]) #print('dmc: ', dmc) return dmc def is_float(string): if string.replace(".", "").replace(",", "").replace("+", "").replace("-", "").isnumeric(): return True else: return False def stringify_children(node, texts, pis, excludeDigits=True): s = node.text if (s != None) and (s.isspace()==False): if excludeDigits: if is_float(s)==False: texts.add(s) else: texts.add(s) for child in node: if child.tag not in exclude_tags: if child not in pis: stringify_children(child, texts, pis) return def stringify_children_incl(node, texts, pis, make_lower=False): ET.strip_tags(node, 'internalRef') ET.strip_tags(node, 'emphasis') s = node.text if s and make_lower==True: s= s.lower() if s and node.tag in add_colon_tags: s=s+':' #print('s', s) clear_s= clear_text(s) if (s != None) and (s.isspace()==False) and (clear_s!='') and (clear_s): print('s:', s) print('clear_text(s):', clear_text(s)) texts.append(s) for child in node: #print('child.tag:', child.tag) if (len(child.getchildren())>0) or (child.tag in include_tags): if (child not in pis) and (child.tag not in exclude_tags): make_lower=False if node.tag in make_lower_parent_tags: make_lower=True stringify_children_incl(child, texts, pis, make_lower) return def clear_text(text): #print('clear_text!') clean_text = re.sub(r'(?:(?!\u0301)[\W\d_])+', ' ', str(text).lower()) return clean_text def lemmatize_and_stemm(df_r): global nlp, stemmer #print('lemmatize_and_stemm!') disabled_pipes = [ "parser", "ner"] if PUBLICATION_LANGUAGE=="ru": nlp = spacy.load('ru_core_news_sm', disable=disabled_pipes) stemmer= Stemmer.Stemmer('ru')#russian else: nlp = spacy.load('en_core_web_sm', disable=disabled_pipes) stemmer= Stemmer.Stemmer('en')#english lemm_texts = [] stem_texts=[] for doc in tqdm(nlp.pipe(df_r['lemm_text'].values, disable = disabled_pipes), total=df_r.shape[0]): lemm_text = " ".join([i.lemma_ for i in doc]) lemm_texts.append(lemm_text) stem_text = " ".join([stemmer.stemWord(i.text) for i in doc]) stem_texts.append(stem_text) df_r['lemm_text']= lemm_texts df_r['stem_text']= stem_texts df_r=df_r.drop_duplicates() #print('lemmatization and stemming success!') return def tokenize_text(df_r, save_filename): #global tokenizer_search #print('tokenize_text!') #try: #with open('tokenizer.pickle', 'rb') as handle: #tokenizer = pickle.load(handle) #print('tokenizer loaded from file') #except Exception as e: tokenizer = Tokenizer(oov_token='') print('tokenizer created') texts= pd.concat([df_r['lemm_text'],df_r['stem_text']]) tokenizer.fit_on_texts(texts) total_words = len(tokenizer.word_index) + 1 print("Total number of words: ", total_words) with open(save_filename, 'wb') as handle: pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL) return tokenizer def make_final_index(df_r, tokenizer, index_filename='search_index.csv', includePlainText=True): print('make_final_index!') tokens=[] labels=[] dmcs=[] texts=[] for index, row in tqdm(df_r.iterrows()): #print('row:', row) text= row['text'] lemm_token= tokenizer.texts_to_sequences([row['lemm_text']])[0] stem_token= tokenizer.texts_to_sequences([row['stem_text']])[0] dmc= row['DMC'] #print(str(row['label_enc'])+':'+dmc) tokens.append(lemm_token) labels.append(row['label_enc']) dmcs.append(dmc) texts.append(text) tokens.append(stem_token) labels.append(row['label_enc']) dmcs.append(dmc) texts.append(text) columns= ['tokens', 'labels', 'DMC'] data= {'tokens': tokens, 'labels': labels, 'DMC': dmcs} if includePlainText==True: columns= ['tokens', 'labels', 'text', 'DMC'] data= {'tokens': tokens, 'labels': labels, 'text': texts, 'DMC': dmcs} s_df= pd.DataFrame(columns=columns, data= data) s_df= s_df.loc[s_df.astype(str).drop_duplicates().index] print('final index info:') print(s_df.info()) s_df.to_csv(index_filename, sep=';', index=False) #print(f'results saved to {index_filename}') return s_df def make_search_index(path): global nlp, tokenizer_search, search_df, index_data_loaded #print('make_search_index!') directory= path.replace('"', '') #print(f'path: {directory}') df_r= pd.DataFrame(columns=['text']) for file in os.listdir(directory): filename = file#os.fsdecode(file) if 'PMC' in filename: continue #print('filename: ', filename) if filename.lower().endswith(".xml")==False: continue filepath= directory+ os.sep+ filename print('filepath:', filepath) doc= ET.parse(filepath) dmc= get_dmc(doc) file_texts=set() pis = doc.xpath("//processing-instruction()") for node in doc.xpath('//dmodule'): stringify_children(node, file_texts, pis) #print('file_texts:', file_texts) df= pd.DataFrame(columns=['text'], data= file_texts) df['DMC']= dmc df_r= pd.concat([df_r, df], ignore_index=True) print('parsing results:') print(df_r.info()) #PARSING_INDEX_FILENAME='strings_with_DMC.csv' #print(f'parsing results saved to: {PARSING_INDEX_FILENAME}') #df_r.to_csv(PARSING_INDEX_FILENAME, index=False, sep = ';') df_r['lemm_text']=df_r['text'].apply(clear_text) lemmatize_and_stemm(df_r) df_r= df_r.reset_index(drop=True) df_r['label_enc']= df_r.index tokenizer_search= tokenize_text(df_r, TOKENIZER_SEARCH_PATH) #print('tokenizer before make_final_index:', tokenizer_search) search_df= make_final_index(df_r, tokenizer_search) index_data_loaded= True return len(search_df) def make_search_index_qa(path): global nlp, tokenizer_qa, qa_df, qa_index_data_loaded #print('make_search_index_qa!') directory= path.replace('"', '') #print(f'path: {directory}') df_r= pd.DataFrame(columns=['text']) for file in os.listdir(directory): filename = file#os.fsdecode(file) if 'PMC' in filename: continue #print('filename: ', filename) if filename.lower().endswith(".xml")==False: continue filepath= directory+ os.sep+ filename #print('filepath:', filepath) doc= ET.parse(filepath) dmc= get_dmc(doc) paths= PARSE_PATHS pis = doc.xpath("//processing-instruction()") for pi in pis: if pi.getparent()!=None: ET.strip_tags(pi.getparent(), pi.tag) cntr=1 for expr in paths: try: x_path_result = doc.xpath(expr) except ET.XPathEvalError: continue if not x_path_result: continue file_texts=[] dmc_with_chapter= f'{dmc}({cntr})' for node in x_path_result:#doc.xpath(expr): stringify_children_incl(node, file_texts, pis) cntr=cntr+1 #print('file_texts:',file_texts) #print('file_texts len:',len(file_texts)) if len(file_texts)==0: continue concat_texts=[' \n '.join(file_texts)] #print('file_texts:', file_texts) #df= pd.DataFrame(columns=['text'], data= file_texts) df= pd.DataFrame(columns=['text'], data= concat_texts) df['DMC']= dmc_with_chapter df_r= pd.concat([df_r, df], ignore_index=True) #print('parsing results:') #print(df_r.info()) #PARSING_INDEX_FILENAME='strings_with_DMC.csv' #print('parsing results saved to: {PARSING_INDEX_FILENAME}') #df_r.to_csv(PARSING_INDEX_FILENAME, index=False, sep = ';') df_r['lemm_text']=df_r['text'].apply(clear_text) lemmatize_and_stemm(df_r) df_r= df_r.reset_index(drop=True) df_r['label_enc']= df_r.index tokenizer_qa= tokenize_text(df_r, TOKENIZER_QA_PATH) qa_df= make_final_index(df_r, tokenizer_qa, index_filename='qa_index.csv') qa_index_data_loaded= True return len(qa_df) def convert2list(string): x = json.loads(string) lst=[] for n in x: #print(x) lst.append(int(n)) return lst def load_index_data(): global nlp, tokenizer_search, search_df, index_data_loaded print('load_index_data!') #spacy disabled_pipes = [ "parser", "ner"] if PUBLICATION_LANGUAGE=="ru": nlp = spacy.load('ru_core_news_sm', disable=disabled_pipes) stemmer= Stemmer.Stemmer('ru')#russian else: nlp = spacy.load('en_core_web_sm', disable=disabled_pipes) stemmer= Stemmer.Stemmer('en')#english #print('spacy loaded:', nlp) #tokenizer with open(TOKENIZER_SEARCH_PATH, 'rb') as handle: tokenizer_search = pickle.load(handle) #print('tokenizer loaded:', tokenizer) #index search_index_path= INDEX_FOLDER+os.sep+'search_index.csv' search_df= pd.read_csv(search_index_path, sep=';') print('index file loaded:', search_df.info()) search_df['tokens']= search_df['tokens'].apply(convert2list) index_data_loaded= True return nlp, tokenizer_search, search_df def load_index_data_qa(): global nlp, tokenizer_qa, qa_df, qa_index_data_loaded, stemmer #print('load_index_data_qa!') #spacy disabled_pipes = [ "parser", "ner"] if PUBLICATION_LANGUAGE=="ru": nlp = spacy.load('ru_core_news_sm', disable=disabled_pipes) stemmer= Stemmer.Stemmer('ru')#russian else: nlp = spacy.load('en_core_web_sm', disable=disabled_pipes) stemmer= Stemmer.Stemmer('en')#english print('spacy loaded:', nlp) #tokenizer with open(TOKENIZER_QA_PATH, 'rb') as handle: tokenizer_qa = pickle.load(handle) #print('tokenizer loaded:', tokenizer_qa) #index qa_index_path= INDEX_FOLDER+os.sep+'qa_index.csv' qa_df= pd.read_csv(qa_index_path, sep=';') #print('index qa file loaded:', qa_df.info()) qa_df['tokens']= qa_df['tokens'].apply(convert2list) qa_index_data_loaded= True return nlp, tokenizer_qa, qa_df def customIsIn(x , tokens): result= False cnt_in=0 for val in x: if val in tokens: cnt_in+=1 PERSCENTAGE_IN= cnt_in/len(tokens) if PERSCENTAGE_IN>=PERSCENTAGE_IN_RATIO: return True return result def get_lemmed_stemmed_text(text): global nlp, stemmer #print('nlp loaded or not:', nlp) if PUBLICATION_LANGUAGE=="ru": spacy_stopwords = spacy.lang.ru.stop_words.STOP_WORDS #russian stemmer= Stemmer.Stemmer('ru')#russian else: spacy_stopwords = nlp.Defaults.stop_words #english stemmer= Stemmer.Stemmer('en')#english #print('spacy_stopwords:', spacy_stopwords) doc = nlp(clear_text(text)) # Remove stop words doc_cleared = [token for token in doc if not token.is_stop] #print('doc_cleared:', doc_cleared) lemm_text = " ".join([i.lemma_ for i in doc_cleared if not i.lemma_ in spacy_stopwords]) print(f'lemm_text: {lemm_text}') stem_text = " ".join([stemmer.stemWord(i.text) for i in doc_cleared if not stemmer.stemWord(i.text) in spacy_stopwords]) print(f'stem_text: {stem_text}') return lemm_text, stem_text def search_query_any(query, df=None, tokenizer=None): global SEARCH_DATA, search_df, index_data_loaded, stemmer print('search_query_any!') print(f'query: {query}') if index_data_loaded==False: load_index_data() SEARCH_DATA= df if df is None: if index_data_loaded==False: load_index_data() SEARCH_DATA=search_df lemm_text, stem_text= get_lemmed_stemmed_text(query) if tokenizer==None: tokenizer= tokenizer_search token_list = tokenizer.texts_to_sequences([lemm_text])[0] #print(f'token_list: {token_list}') token_list_stem = tokenizer.texts_to_sequences([stem_text])[0] #print(f'token_list stem: {token_list_stem}') mask1 = SEARCH_DATA.tokens.apply(lambda x: customIsIn(x, token_list)) indexes1= SEARCH_DATA[mask1]['labels'].unique() mask2= SEARCH_DATA.tokens.apply(lambda x: customIsIn(x, token_list_stem)) indexes2= SEARCH_DATA[mask2]['labels'].unique() indexes= np.concatenate((indexes1, indexes2), axis=None) results_df= SEARCH_DATA[SEARCH_DATA['labels'].isin(indexes)].drop(['tokens', 'labels'], axis=1) results_df= results_df.drop_duplicates() result=[] regex = re.compile(r'\([^)]*\)') for index, row in results_df.iterrows(): text= row['text'] dmc= row['DMC'] dmc= re.sub(regex, '', dmc) result.append({'text': text, 'DMC':dmc}) return result def search_query_all(query, df=None, tokenizer=None): global SEARCH_DATA, search_df, index_data_loaded print('search_query_all!') print(f'query: {query}') SEARCH_DATA= df if df is None: if index_data_loaded==False: load_index_data() SEARCH_DATA=search_df print('SEARCH_DATA:', SEARCH_DATA.head()) print('nlp loaded or not:', nlp) doc = nlp(clear_text(query)) lemm_text, stem_text= get_lemmed_stemmed_text(query) if tokenizer==None: tokenizer= tokenizer_search token_list = tokenizer.texts_to_sequences([lemm_text])[0] print(f'token_list: {token_list}') token_list_stem = tokenizer.texts_to_sequences([stem_text])[0] print(f'token_list stem: {token_list_stem}') mask1= SEARCH_DATA['tokens'].map(set(token_list).issubset) mask2= SEARCH_DATA['tokens'].map(set(token_list_stem).issubset) indexes1= SEARCH_DATA[mask1]['labels'].unique() indexes2= SEARCH_DATA[mask2]['labels'].unique() indexes= np.concatenate((indexes1, indexes2), axis=None) results_df= SEARCH_DATA[SEARCH_DATA['labels'].isin(indexes)].drop(['tokens', 'labels'], axis=1) results_df= results_df.drop_duplicates() result=[] regex = re.compile(r'\([^)]*\)') for index, row in results_df.iterrows(): text= row['text'] dmc= row['DMC'] dmc= re.sub(regex, '', dmc) result.append({'text': text, 'DMC':dmc}) return result def concat_by_DMC(s_df): #print('concat_by_DMC!') #print(s_df.head()) #объединяем лемматизированную и стеммизированную часть датасета concat_tokens=[] for label in s_df['labels'].unique(): tokens_lists= s_df[s_df['labels']==label]['tokens'].to_list() joined_lst=[] for lst in tokens_lists: joined_lst+= lst concat_tokens.append(joined_lst) #print(concat_tokens[:5]) df= s_df.drop('tokens', axis=1) df= df.drop_duplicates() df['tokens']=concat_tokens #объединяем тексты и токены по DMC concat_tokens=[] DMCs=[] texts=[] for dmc_code in df['DMC'].unique(): DMCs.append(dmc_code) #объединяем списки токенов для одного модуля данных (DMC) tokens_lists= df[df['DMC']==dmc_code]['tokens'].to_list() joined_token_lst=[] for lst in tokens_lists: joined_token_lst+= lst concat_tokens.append(joined_token_lst) #объединяем тексты text_list= df[df['DMC']==dmc_code]['text'].to_list() concat_text=' \n '.join(str(txt) for txt in text_list) texts.append(concat_text) #print('concat_tokens',len(concat_tokens)) #print('DMCs',len(DMCs)) #print('texts',len(texts)) df= pd.DataFrame(columns=['DMC'], data=DMCs) df['text']= texts df['tokens']= concat_tokens df['labels']= df.index #print(df.head()) return df def initialize_qa_model(model): global qa_df, qa_model, qa_model_num qa_model_num= model print('initialize_qa_model!') if model==1 or str(model)=="1": qa_model= pipeline("question-answering", "dmibor/ietm_search_and_qa", device=device) print('initialized model number 1!') else:#model==2 (базовая) qa_model= pipeline("question-answering", "timpal0l/mdeberta-v3-base-squad2", device=device) print('initialized model number 2!') if qa_index_data_loaded==False: load_index_data_qa() #print('len(qa_df)', len(qa_df)) qa_df= concat_by_DMC(qa_df) #qa_df.to_csv('concat_index.csv', sep=';', index=False) #print('concat_by_DMC len(qa_df)', len(qa_df)) qa_model_initialized=True def get_best_and_longest_result(model_results, threshold, mode): print('get_best_and_longest_result!') print('mode:', mode) best_result=None longest_result=None if(type(model_results)!= list): return best_result, longest_result best_result= model_results[0] best_result_answer= best_result['answer'] print('best_result_answer: ',best_result_answer) best_answer_cleaned= (re.sub(r"[\W\d_]+$", "", best_result_answer)).strip() print('best_answer_cleaned: ',best_answer_cleaned) longest_answer='' longest_answer_len= len(best_answer_cleaned) longest_result= best_result print("type(mode)", type(mode)) print("mode=='strict'", mode=='strict') print("mode==\"strict\"", mode=="strict") if mode=='strict': return best_result, longest_result if best_result['score']>=threshold: print('best_result_answer: ',best_answer_cleaned) print('best_result score:', best_result['score']) for result in model_results: answer= result['answer'] answer_cleaned= re.sub(r"[\W\d_]+$", "", answer).strip() #print('answer_cleaned: ',answer_cleaned) if best_answer_cleaned in answer_cleaned: if len(answer_cleaned)>longest_answer_len: print('new longest answer: ',answer_cleaned) print('longest score:', result['score']) print() longest_answer= answer_cleaned longest_answer_len= len(answer_cleaned) longest_result= result #print('longest_answer:' , longest_answer) return best_result, longest_result def find_answer(question, context, threshold, max_answer_len=1000, top_k=20, verbose=True, mode='strict'): print('find_answer!') print('mode:', mode) found_answer=False #print('qa_model', qa_model) model_results= qa_model(question = question, context = context, max_answer_len=max_answer_len, top_k=top_k) #print(model_result) best_result, longest_result= get_best_and_longest_result(model_results, threshold, mode) #print('longest_result', longest_result) if best_result['score']>=threshold: longest_answer= longest_result['answer'] answer_cleaned= re.sub(r"[\W\d_]+$", '', longest_answer).strip() if verbose==True: prob_value= round(model_result['score'], 2) print(f'Ответ (уверенность= {prob_value}): {answer_cleaned}') longest_result['answer']= answer_cleaned found_answer=True if found_answer==False and verbose==True: print('Ответ не найден!') model_result= best_result model_result['answer']= longest_result['answer'] return model_result def answer_question(question, mode, model=1): global qa_model_initialized, qa_model_num, tokenizer_qa print('answer_question!') if qa_model_initialized==False or model!= qa_model_num: initialize_qa_model(model) print(f'question: {question}') print(f'mode: {mode}') print(f'model: {qa_model}') filtered_index= search_query_all(question, qa_df, tokenizer_qa) threshold= THRESHOLD #print('filtered_index все слова:', len(filtered_index)) if len(filtered_index)<1: filtered_index= search_query_any(question, qa_df, tokenizer_qa) threshold= THRESHOLD #print('filtered_index любое слово:', len(filtered_index)) found_answer=False best_answer="" best_score=0 best_DMC="" regex = re.compile(r'\([^)]*\)') for indx in filtered_index: result= find_answer(question, indx['text'], threshold=threshold, max_answer_len=1000, top_k=20, verbose=False, mode=mode) if result['score']>best_score: best_answer= result['answer'] best_score= result['score'] best_DMC= indx['DMC'] best_DMC= re.sub(regex, '', best_DMC) result= [{'score': best_score, 'answer': best_answer, 'DMC': best_DMC}] return result