Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
from appStore.prep_data import process_giz_worldwide | |
from appStore.prep_utils import create_documents, get_client | |
from appStore.embed import hybrid_embed_chunks | |
from appStore.search import hybrid_search | |
from appStore.region_utils import load_region_data, get_country_name | |
from appStore.tfidf_extraction import extract_top_keywords | |
from torch import cuda | |
import json | |
from datetime import datetime | |
# get the device to be used eithe gpu or cpu | |
device = 'cuda' if cuda.is_available() else 'cpu' | |
st.set_page_config(page_title="SEARCH IATI",layout='wide') | |
st.title("GIZ Project Database") | |
var = st.text_input("Enter Search Query") | |
# Load the region lookup CSV | |
region_lookup_path = "docStore/regions_lookup.csv" | |
region_df = load_region_data(region_lookup_path) | |
#################### Create the embeddings collection and save ###################### | |
# the steps below need to be performed only once and then commented out any unnecssary compute over-run | |
##### First we process and create the chunks for relvant data source | |
#chunks = process_giz_worldwide() | |
##### Convert to langchain documents | |
#temp_doc = create_documents(chunks,'chunks') | |
##### Embed and store docs, check if collection exist then you need to update the collection | |
collection_name = "giz_worldwide" | |
#hybrid_embed_chunks(docs= temp_doc, collection_name = collection_name) | |
################### Hybrid Search ###################################################### | |
client = get_client() | |
print(client.get_collections()) | |
# Fetch unique country codes and map to country names | |
def get_country_name_mapping(_client, collection_name, region_df): | |
results = hybrid_search(_client, "", collection_name) | |
country_set = set() | |
for res in results[0] + results[1]: | |
countries = res.payload.get('metadata', {}).get('countries', "[]") | |
try: | |
country_list = json.loads(countries.replace("'", '"')) | |
# ADD: only add codes of length 2 | |
two_digit_codes = [code.upper() for code in country_list if len(code) == 2] | |
country_set.update(two_digit_codes) | |
except json.JSONDecodeError: | |
pass | |
# Create a mapping of {CountryName -> ISO2Code} | |
# so you can display the name in the selectbox but store the 2-digit code | |
country_name_to_code = {} | |
for code in country_set: | |
name = get_country_name(code, region_df) | |
country_name_to_code[name] = code | |
return country_name_to_code | |
# Get country name mapping | |
client = get_client() | |
country_name_mapping = get_country_name_mapping(client, collection_name, region_df) | |
unique_country_names = sorted(country_name_mapping.keys()) # List of country names | |
# Layout filters in columns | |
col1, col2, col3 = st.columns([1, 1, 4]) | |
with col1: | |
country_filter = st.selectbox("Country", ["All/Not allocated"] + unique_country_names) # Display country names | |
with col2: | |
current_year = datetime.now().year | |
default_start_year = current_year - 5 # Default to 5 years ago | |
end_year_range = st.slider( | |
"Project End Year", | |
min_value=2010, | |
max_value=2030, | |
value=(default_start_year, current_year) | |
) | |
# Checkbox to control whether to show only exact matches | |
show_exact_matches = st.checkbox("Show only exact matches", value=False) | |
button = st.button("Search") | |
if button: | |
results = hybrid_search(client, var, collection_name) | |
def filter_results(results, country_filter, end_year_range): | |
filtered = [] | |
for res in results: | |
metadata = res.payload.get('metadata', {}) | |
countries = metadata.get('countries', "[]") | |
end_year = float(metadata.get('end_year', 0)) | |
# Process countries string to a list | |
try: | |
country_list = json.loads(countries.replace("'", '"')) | |
# Normalize to uppercase and filter only 2-digit ISO codes | |
country_list = [code.upper() for code in country_list if len(code) == 2] | |
except json.JSONDecodeError: | |
country_list = [] | |
# Translate selected country name back to 2-digit ISO code | |
selected_iso_code = country_name_mapping.get(country_filter, None) | |
# Apply country and year filters | |
if (country_filter == "All/Not allocated" or selected_iso_code in country_list) and (end_year_range[0] <= end_year <= end_year_range[1]): | |
filtered.append(res) | |
return filtered | |
# Check user preference for exact matches | |
if show_exact_matches: | |
st.write(f"Showing **Top 10 Lexical Search results** for query: {var}") | |
lexical_results = results[1] # Lexical results are in index 1 | |
filtered_lexical_results = filter_results(lexical_results, country_filter, end_year_range) | |
for res in filtered_lexical_results[:10]: | |
project_name = res.payload['metadata'].get('project_name', 'Project Link') | |
url = res.payload['metadata'].get('url', '#') | |
st.markdown(f"#### [{project_name}]({url})") | |
# ------- Display first 4 lines + expander ------- | |
full_text = res.payload['page_content'] | |
# Split the text by whitespace | |
words = full_text.split() | |
# For instance, show only the first 40 words | |
preview_word_count = 120 | |
# Create the short preview and the remainder | |
preview_text = " ".join(words[:preview_word_count]) | |
remainder_text = " ".join(words[preview_word_count:]) | |
# Always display the preview_text | |
st.write(preview_text + ("..." if remainder_text else "")) | |
# ------ Extract top 5 keywords and display ------ | |
top_keywords = extract_top_keywords(full_text, top_n=5) | |
# Join them with " 路 " and make them italic | |
if top_keywords: | |
st.write("") | |
st.markdown(f"_{' 路 '.join(top_keywords)}_") # e.g. _keyword1 路 keyword2 路 keyword3_ | |
# ------- Additional info below the text ------- | |
metadata = res.payload.get('metadata', {}) | |
countries = metadata.get('countries', "[]") | |
client = metadata.get('client', 'Unknown Client') | |
start_year = metadata.get('start_year', None) | |
end_year = metadata.get('end_year', None) | |
# Process countries | |
try: | |
country_list = json.loads(countries.replace("'", '"')) | |
# Normalize to uppercase and map to country names | |
country_names = [get_country_name(code.upper(), region_df) for code in country_list if len(code) == 2] | |
country_names = country_names if country_names else country_list # Fallback if no names found | |
except json.JSONDecodeError: | |
country_names = countries | |
# Format start and end year | |
start_year = f"{int(round(float(start_year)))}" if start_year else "Unknown" | |
end_year = f"{int(round(float(end_year)))}" if end_year else "Unknown" | |
# Generate additional text with Markdown for bold formatting | |
additional_text = f"**{', '.join(country_names)}**, commissioned by **{client}**, **{start_year}-{end_year}**" | |
st.markdown(additional_text) | |
st.divider() | |
else: | |
st.write(f"Showing **Top 10 Semantic Search results** for query: {var}") | |
semantic_results = results[0] # Semantic results are in index 0 | |
filtered_semantic_results = filter_results(semantic_results, country_filter, end_year_range) | |
for res in filtered_semantic_results[:10]: | |
project_name = res.payload['metadata'].get('project_name', 'Project Link') | |
url = res.payload['metadata'].get('url', '#') | |
st.markdown(f"#### [{project_name}]({url})") | |
# ------- Display first 4 lines + expander ------- | |
full_text = res.payload['page_content'] | |
# Split the text by whitespace | |
words = full_text.split() | |
# For instance, show only the first 40 words | |
preview_word_count = 40 | |
# Create the short preview and the remainder | |
preview_text = " ".join(words[:preview_word_count]) | |
remainder_text = " ".join(words[preview_word_count:]) | |
# Always display the preview_text | |
st.write(preview_text + ("..." if remainder_text else "")) | |
# ------ Extract top 5 keywords and display ------ | |
top_keywords = extract_top_keywords(full_text, top_n=5) | |
# Join them with " 路 " and make them italic | |
if top_keywords: | |
st.write("") # line break | |
st.markdown(f"_{' 路 '.join(top_keywords)}_") | |
# Additional text below the content | |
metadata = res.payload.get('metadata', {}) | |
countries = metadata.get('countries', "[]") | |
client = metadata.get('client', 'Unknown Client') | |
start_year = metadata.get('start_year', None) | |
end_year = metadata.get('end_year', None) | |
# Process countries | |
try: | |
country_list = json.loads(countries.replace("'", '"')) | |
country_names = [get_country_name(code.upper(), region_df) for code in country_list if len(code) == 2] | |
country_names = country_names if country_names else country_list | |
except json.JSONDecodeError: | |
country_names = countries | |
# Format start and end year | |
start_year = f"{int(round(float(start_year)))}" if start_year else "Unknown" | |
end_year = f"{int(round(float(end_year)))}" if end_year else "Unknown" | |
# Generate additional text with Markdown for bold formatting | |
additional_text = f"**{', '.join(country_names)}**, commissioned by **{client}**, **{start_year}-{end_year}**" | |
st.markdown(additional_text) | |
st.divider() | |
# for i in results: | |
# st.subheader(str(i.metadata['id'])+":"+str(i.metadata['title_main'])) | |
# st.caption(f"Status:{str(i.metadata['status'])}, Country:{str(i.metadata['country_name'])}") | |
# st.write(i.page_content) | |
# st.divider() | |