|
|
|
"""
|
|
Created on Wed Sep 7 08:58:22 2022
|
|
|
|
@author: luol2
|
|
"""
|
|
|
|
|
|
"""
|
|
Created on Fri Jun 24 11:27:57 2022
|
|
|
|
@author: luol2
|
|
"""
|
|
|
|
|
|
import stanza
|
|
import sys
|
|
import os
|
|
import io
|
|
import json
|
|
import re
|
|
|
|
def pubtator_entitysort(infile):
|
|
|
|
fin=open(infile,'r',encoding='utf-8')
|
|
|
|
fout=io.StringIO()
|
|
all_in=fin.read().strip().split('\n\n')
|
|
fin.close()
|
|
error_dict={}
|
|
for doc in all_in:
|
|
entity_dict={}
|
|
lines=doc.split('\n')
|
|
fout.write(lines[0]+'\n'+lines[1]+'\n')
|
|
for i in range(2,len(lines)):
|
|
segs=lines[i].split('\t')
|
|
if len(segs)>=5:
|
|
if lines[i] not in entity_dict.keys():
|
|
entity_dict[lines[i]]=int(segs[1])
|
|
else:
|
|
print('entity have in',lines[i])
|
|
if segs[0] not in error_dict.keys():
|
|
error_dict[segs[0]]=[lines[i]]
|
|
else:
|
|
if lines[i] not in error_dict[segs[0]]:
|
|
error_dict[segs[0]].append(lines[i])
|
|
|
|
entity_sort=sorted(entity_dict.items(), key=lambda kv:(kv[1]), reverse=False)
|
|
for ele in entity_sort:
|
|
fout.write(ele[0]+'\n')
|
|
fout.write('\n')
|
|
return fout
|
|
|
|
def filter_overlap(infile):
|
|
|
|
fin=io.StringIO(infile.getvalue())
|
|
fout=io.StringIO()
|
|
|
|
documents=fin.read().strip().split('\n\n')
|
|
fin.close()
|
|
total_entity=0
|
|
over_entity=0
|
|
nest_entity=0
|
|
for doc in documents:
|
|
lines=doc.split('\n')
|
|
entity_list=[]
|
|
if len(lines)>2:
|
|
first_entity=lines[2].split('\t')
|
|
nest_list=[first_entity]
|
|
max_eid=int(first_entity[2])
|
|
total_entity+=len(lines)-2
|
|
for i in range(3,len(lines)):
|
|
segs=lines[i].split('\t')
|
|
if int(segs[1])> max_eid:
|
|
if len(nest_list)==1:
|
|
entity_list.append(nest_list[0])
|
|
nest_list=[]
|
|
nest_list.append(segs)
|
|
if int(segs[2])>max_eid:
|
|
max_eid=int(segs[2])
|
|
else:
|
|
|
|
nest_entity+=len(nest_list)-1
|
|
tem=find_max_entity(nest_list)
|
|
|
|
|
|
entity_list.extend(tem)
|
|
nest_list=[]
|
|
nest_list.append(segs)
|
|
if int(segs[2])>max_eid:
|
|
max_eid=int(segs[2])
|
|
|
|
else:
|
|
nest_list.append(segs)
|
|
if int(segs[2])>max_eid:
|
|
max_eid=int(segs[2])
|
|
if nest_list!=[]:
|
|
if len(nest_list)==1:
|
|
entity_list.append(nest_list[0])
|
|
|
|
else:
|
|
tem=find_max_entity(nest_list)
|
|
|
|
|
|
entity_list.extend(tem)
|
|
fout.write(lines[0]+'\n'+lines[1]+'\n')
|
|
for ele in entity_list:
|
|
fout.write('\t'.join(ele)+'\n')
|
|
fout.write('\n')
|
|
|
|
return fout
|
|
def find_max_entity(nest_list):
|
|
max_len=0
|
|
final_tem=[]
|
|
max_index=0
|
|
for i in range(0, len(nest_list)):
|
|
cur_len=int(nest_list[i][2])-int(nest_list[i][1])
|
|
if cur_len>max_len:
|
|
max_len=cur_len
|
|
max_index=i
|
|
|
|
final_tem.append(nest_list[max_index])
|
|
return final_tem
|
|
|
|
|
|
def pubtator_to_labeltext(infile):
|
|
|
|
fin=io.StringIO(infile.getvalue())
|
|
all_context=fin.read().strip().split('\n\n')
|
|
fin.close()
|
|
fout=io.StringIO()
|
|
label_dic={}
|
|
|
|
for doc in all_context:
|
|
lines=doc.split('\n')
|
|
ori_text=lines[0].split('|t|')[1]+' '+lines[1].split('|a|')[1]
|
|
pmid=lines[0].split('|t|')[0]
|
|
s_index=0
|
|
e_index=0
|
|
new_text=''
|
|
for i in range(2,len(lines)):
|
|
segs=lines[i].split('\t')
|
|
label_dic[segs[4].lower()]=segs[4]
|
|
if len(segs)==6:
|
|
e_index=int(segs[1])
|
|
new_text+=ori_text[s_index:e_index]+' ssss'+segs[4].lower()+' '+ori_text[int(segs[1]):int(segs[2])]+' eeee'+segs[4].lower()+' '
|
|
s_index=int(segs[2])
|
|
|
|
|
|
|
|
new_text+=ori_text[s_index:]
|
|
fout.write(pmid+'\t'+' '.join(new_text.strip().split())+'\n')
|
|
return fout, label_dic
|
|
|
|
|
|
def pre_token(sentence):
|
|
sentence=re.sub("([\=\/\(\)\<\>\+\-\_])"," \\1 ",sentence)
|
|
sentence=re.sub("[ ]+"," ",sentence);
|
|
return sentence
|
|
|
|
|
|
def labeltext_to_conll_fasttoken(infile,label_dic):
|
|
|
|
fin=io.StringIO(infile.getvalue())
|
|
all_context=fin.read().strip().split('\n')
|
|
fin.close()
|
|
fout=io.StringIO()
|
|
|
|
|
|
nlp = stanza.Pipeline(lang='en', processors={'tokenize': 'spacy'},package='None')
|
|
|
|
doc_i=0
|
|
for doc in all_context:
|
|
doc_text=doc.split('\t')[1]
|
|
doc_text=pre_token(doc_text)
|
|
doc_stanza = nlp(doc_text)
|
|
doc_i+=1
|
|
|
|
inentity_flag=0
|
|
last_label='O'
|
|
for sent in doc_stanza.sentences:
|
|
temp_sent=[]
|
|
word_num=0
|
|
for word in sent.words:
|
|
word_num+=1
|
|
|
|
if word.text.strip()=='':
|
|
continue
|
|
temp_sent.append(word.text)
|
|
if word.text.startswith('ssss')==True:
|
|
last_label=word.text
|
|
inentity_flag=1
|
|
elif word.text.startswith('eeee')==True:
|
|
last_label=word.text
|
|
inentity_flag=0
|
|
else:
|
|
if last_label=='O':
|
|
now_label='O'
|
|
elif last_label.startswith('ssss')==True:
|
|
now_label='B-'+label_dic[last_label[4:]]
|
|
|
|
elif last_label.startswith('B-')==True:
|
|
now_label='I-'+last_label[2:]
|
|
elif last_label.startswith('I-')==True:
|
|
now_label='I-'+last_label[2:]
|
|
elif last_label.startswith('eeee')==True:
|
|
now_label='O'
|
|
|
|
fout.write(word.text+'\t'+now_label+'\n')
|
|
last_label=now_label
|
|
if inentity_flag==1:
|
|
|
|
|
|
|
|
pass
|
|
else:
|
|
fout.write('\n')
|
|
return fout
|
|
|
|
def pubtator_to_conll(infile):
|
|
|
|
|
|
input_sort=pubtator_entitysort(infile)
|
|
|
|
|
|
|
|
input_nonest=filter_overlap(input_sort)
|
|
|
|
|
|
|
|
input_labtext,label_dic=pubtator_to_labeltext(input_nonest)
|
|
|
|
|
|
|
|
|
|
output = labeltext_to_conll_fasttoken(input_labtext,label_dic)
|
|
|
|
|
|
|
|
|
|
return output
|
|
|
|
if __name__=='__main__':
|
|
|
|
|
|
infile='../../TrainingSet/No100/NER.Train.txt'
|
|
output=pubtator_to_conll(infile)
|
|
fout=open('../../TrainingSet/No100/NER.Train.conll','w',encoding='utf-8')
|
|
fout.write(output.getvalue())
|
|
fout.close()
|
|
output.close()
|
|
|
|
|
|
|
|
|
|
|
|
|