File size: 5,897 Bytes
0669d1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59aa543
0669d1e
 
 
 
 
 
 
 
 
59aa543
0669d1e
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import gradio as gr
import hnswlib
from cryptography.fernet import Fernet
from sentence_transformers import SentenceTransformer, CrossEncoder
from dotenv import load_dotenv
import os
import gzip
import io
import pandas as pd
from bs4 import BeautifulSoup

load_dotenv()

fernet = Fernet(os.environ.get("KEY").encode("utf-8"))

#read data
with gzip.open("title_metadata.gz",'rb') as f:
    bytes_enc = f.read()
    pq_bytes = fernet.decrypt(bytes_enc)
    pq_file = io.BytesIO(pq_bytes)
    meta_title = pd.read_parquet(pq_file)

#read data
with gzip.open("corpus_metadata.gz",'rb') as f:
    bytes_enc = f.read()
    pq_bytes = fernet.decrypt(bytes_enc)
    pq_file = io.BytesIO(pq_bytes)
    meta_corpus = pd.read_parquet(pq_file)

#load models
model = SentenceTransformer("KennethTM/MiniLM-L6-danish-encoder", device="cpu")
embedding_size = model.get_sentence_embedding_dimension()

crossencoder = CrossEncoder("KennethTM/MiniLM-L6-danish-reranker", device="cpu")

#set up indexes
title_index_path = "title.index"
title_index = hnswlib.Index(space = 'cosine', dim = embedding_size)
title_index.load_index(title_index_path)
title_index.set_ef(40)

corpus_index_path = "corpus.index"
corpus_index = hnswlib.Index(space = 'cosine', dim = embedding_size)
corpus_index.load_index(corpus_index_path)
corpus_index.set_ef(40)

#create dict with metadata and index
data = {"title": {"index": title_index, "meta": meta_title, "column": "title"},
        "corpus": {"index": corpus_index, "meta": meta_corpus, "column": "text_chunks"}}

#init state dict
state = {"query": None}

#function find most similar candidates
def get_hits(query, index_name, top_k, top_k_multiplier = 2):
    #get nearest neightbor ids
    query_embedding = model.encode(query)
    ids, _ = data[index_name]["index"].knn_query(query_embedding, k = int(top_k*top_k_multiplier))
    ids = ids[0]

    #rerank candidates
    results = data[index_name]["meta"].iloc[ids].copy()
    column_name = data[index_name]["column"]
    rerank_list = [(query, i) for i in results[column_name]]
    results["scores"] = crossencoder.predict(rerank_list)
    results = results.sort_values("scores", ascending=False)
    results = results[:int(top_k)]

    return results

#functions for formatting hits
def format_hits(hits, index_name):
    
    if index_name == "title":
        formatted = "### {id}. " + hits['title'] + " ([Link til " + hits['type_label'] + "](" + hits['url'] + "))" + "\nSag ID: " + hits['id']
    elif index_name == "corpus":
        column_name = data[index_name]["column"]
        formatted = "### {id}. " + hits['title'] + " ([Link til " + hits['type_label'] + "](" + hits['url'] + "))" + "\nSag ID: " + hits['id'] + "\n\n" + hits[column_name]

    merged = "\n\n".join([text.format(id=i+1) for i, text in enumerate(formatted)])
    merged = f"## Resultater\n{merged}"

    return(merged)

#main entry function for search
def search(query, index_name, top_k):

    hits = get_hits(query, index_name, top_k)
    hits_formatted = format_hits(hits, index_name)

    state["query"] = query

    return(hits_formatted)

def update_description():
    return(f"Nuværende søgning: {state['query']}")

def analyse_doc(id):

    meta_doc = meta_title.query(f"id == '{id}'")

    if meta_doc.empty or state["query"] is None:
        return("<b>Ingen sager fundet...</b>")

    html_body = meta_doc["text_html"].iloc[0]
    html_title = meta_doc["title"].iloc[0]
    html = f"<html>\n<body>\n<h1>{html_title}</h1>\n{html_body}\n</body>\n</html>"

    soup = BeautifulSoup(html, 'lxml')

    min_characters = 100
    p_list = [i for i in soup.body.find_all('p', recursive=False) if len(i.get_text(strip=True)) > min_characters]
    rerank_list = [(state["query"], i.get_text(strip=True)) for i in p_list]
    rerank_scores = crossencoder.predict(rerank_list)
    rerank_scores_norm = (rerank_scores - rerank_scores.min()) / (rerank_scores.max() - rerank_scores.min())

    for element, score in zip(p_list, rerank_scores_norm):        
        element['style'] = f'background-color:rgba(173, 216, 230, {score});'

    html_doc = f"""<iframe style="width: 100%; height: 480px" srcdoc='{str(soup)}'></iframe>"""

    return(html_doc)

#define interface
with gr.Blocks() as demo:
    gr.Markdown("# DEMO Semantisk søgning i Miljø- og Fødevareklagenævnets afgørelser")

    with gr.Tab("Søgning"):
        gr.Markdown("## Søgning i afgørelser")
        gr.Markdown("Anvend søgefelt til at søge i titel eller tekst for afgørelser.")
        gr.Markdown("Brug spørgsmål, kort beskrivelser eller stikord til søgningen.")

        with gr.Row():
            textbox = gr.Textbox(placeholder="Skriv her...", lines=1, label="Søgning", scale=6)
            name = gr.Radio([("Titel", "title"), ("Tekst", "corpus")], value="corpus", label="Søgning i titel eller tekst?", scale=2)
            num = gr.Number(5, label="Antal hits", scale=1)
            btn = gr.Button("Søg!", size="sm", scale=1)

        with gr.Row():
            output = gr.Markdown()

        btn.click(search, [textbox, name, num], output)
        textbox.submit(search, [textbox, name, num], output)

    with gr.Tab("Analyse"):
        gr.Markdown("## Analyse af hele dokumenter")
        #description = gr.Markdown("Ingen søgning foretaget endnu - søg efter afgørelser ved at angive et sags ID.")
        gr.Markdown("Relevans for tekst angives ved farveintensitet - mere blå er mere relevant.")

        with gr.Row():
            id_textbox = gr.Textbox(placeholder="Indsæt sags ID her...", lines=1, label="Søgning", scale=8)
            id_btn = gr.Button("Søg!", size="sm", scale=2)

        with gr.Row():
            html_output = gr.HTML()

        #output.change(update_description, [], description)
        id_btn.click(analyse_doc, id_textbox, html_output)
        id_textbox.submit(analyse_doc, id_textbox, html_output)

demo.launch()