Spaces:
Running
Running
# app.py | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
import requests | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import numpy as np | |
import pandas as pd | |
import asyncio | |
import aiohttp | |
# Initialize models and variables | |
embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
faiss_index = None | |
bookmarks = [] | |
fetch_cache = {} | |
def parse_bookmarks(file_content): | |
soup = BeautifulSoup(file_content, 'html.parser') | |
extracted_bookmarks = [] | |
for link in soup.find_all('a'): | |
url = link.get('href') | |
title = link.text.strip() | |
if url and title: | |
extracted_bookmarks.append({'url': url, 'title': title}) | |
return extracted_bookmarks | |
async def fetch_url_info(session, bookmark): | |
url = bookmark['url'] | |
if url in fetch_cache: | |
bookmark.update(fetch_cache[url]) | |
return bookmark | |
try: | |
async with session.get(url, timeout=5) as response: | |
bookmark['etag'] = response.headers.get('ETag', 'N/A') | |
bookmark['status_code'] = response.status | |
if response.status >= 400: | |
bookmark['dead_link'] = True | |
bookmark['description'] = '' | |
else: | |
bookmark['dead_link'] = False | |
content = await response.text() | |
soup = BeautifulSoup(content, 'html.parser') | |
# Extract meta description or Open Graph description | |
meta_description = soup.find('meta', attrs={'name': 'description'}) | |
og_description = soup.find('meta', attrs={'property': 'og:description'}) | |
if og_description and og_description.get('content'): | |
description = og_description.get('content') | |
elif meta_description and meta_description.get('content'): | |
description = meta_description.get('content') | |
else: | |
description = '' | |
bookmark['description'] = description | |
except Exception as e: | |
bookmark['dead_link'] = True | |
bookmark['etag'] = 'N/A' | |
bookmark['status_code'] = 'N/A' | |
bookmark['description'] = '' | |
finally: | |
fetch_cache[url] = { | |
'etag': bookmark.get('etag'), | |
'status_code': bookmark.get('status_code'), | |
'dead_link': bookmark.get('dead_link'), | |
'description': bookmark.get('description'), | |
} | |
return bookmark | |
async def process_bookmarks_async(bookmarks): | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for bookmark in bookmarks: | |
task = asyncio.ensure_future(fetch_url_info(session, bookmark)) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
def generate_summary(bookmark): | |
description = bookmark.get('description', '') | |
if description: | |
bookmark['summary'] = description | |
else: | |
title = bookmark.get('title', '') | |
if title: | |
bookmark['summary'] = title | |
else: | |
bookmark['summary'] = 'No summary available.' | |
return bookmark | |
def vectorize_and_index(bookmarks): | |
summaries = [bookmark['summary'] for bookmark in bookmarks] | |
embeddings = embedding_model.encode(summaries) | |
dimension = embeddings.shape[1] | |
faiss_idx = faiss.IndexFlatL2(dimension) | |
faiss_idx.add(np.array(embeddings)) | |
return faiss_idx, embeddings | |
def display_bookmarks(): | |
cards = '' | |
for i, bookmark in enumerate(bookmarks): | |
index = i + 1 # Start index at 1 | |
status = "Dead Link" if bookmark.get('dead_link') else "Active" | |
card_class = "card dead-link" if bookmark.get('dead_link') else "card" | |
title = bookmark['title'] | |
url = bookmark['url'] | |
etag = bookmark.get('etag', 'N/A') | |
summary = bookmark.get('summary', '') | |
card_html = f''' | |
<div class="{card_class}"> | |
<div class="card-content"> | |
<h3>{index}. {title}</h3> | |
<p><strong>URL:</strong> <a href="{url}" target="_blank">{url}</a></p> | |
<p><strong>Status:</strong> {status}</p> | |
<p><strong>ETag:</strong> {etag}</p> | |
<p><strong>Summary:</strong> {summary}</p> | |
</div> | |
</div> | |
''' | |
cards += card_html | |
return cards | |
def process_uploaded_file(file): | |
global bookmarks, faiss_index | |
if file is None: | |
return "Please upload a bookmarks HTML file.", '' | |
try: | |
file_content = file.decode('utf-8') | |
except UnicodeDecodeError: | |
return "Error decoding the file. Please ensure it's a valid HTML file.", '' | |
bookmarks = parse_bookmarks(file_content) | |
if not bookmarks: | |
return "No bookmarks found in the uploaded file.", '' | |
# Asynchronously fetch bookmark info | |
asyncio.run(process_bookmarks_async(bookmarks)) | |
# Generate summaries using descriptions | |
for bookmark in bookmarks: | |
generate_summary(bookmark) | |
faiss_index, embeddings = vectorize_and_index(bookmarks) | |
message = f"Successfully processed {len(bookmarks)} bookmarks." | |
bookmark_html = display_bookmarks() | |
return message, bookmark_html | |
def chatbot_response(user_query): | |
if faiss_index is None or not bookmarks: | |
return "No bookmarks available. Please upload and process your bookmarks first." | |
# Vectorize user query | |
user_embedding = embedding_model.encode([user_query]) | |
D, I = faiss_index.search(np.array(user_embedding), k=5) # Retrieve top 5 matches | |
# Generate response | |
response = "" | |
for idx in I[0]: | |
if idx < len(bookmarks): | |
bookmark = bookmarks[idx] | |
index = bookmarks.index(bookmark) + 1 # Start index at 1 | |
response += f"{index}. Title: {bookmark['title']}\nURL: {bookmark['url']}\nSummary: {bookmark['summary']}\n\n" | |
return response.strip() | |
def edit_bookmark(bookmark_idx, new_title, new_url): | |
global faiss_index | |
try: | |
bookmark_idx = int(bookmark_idx) - 1 # Adjust index to match list (starting at 0) | |
if bookmark_idx < 0 or bookmark_idx >= len(bookmarks): | |
return "Invalid bookmark index.", display_bookmarks() | |
bookmarks[bookmark_idx]['title'] = new_title | |
bookmarks[bookmark_idx]['url'] = new_url | |
# Re-fetch bookmark info | |
asyncio.run(process_bookmarks_async([bookmarks[bookmark_idx]])) | |
generate_summary(bookmarks[bookmark_idx]) | |
# Rebuild the FAISS index | |
faiss_index, embeddings = vectorize_and_index(bookmarks) | |
message = "Bookmark updated successfully." | |
updated_html = display_bookmarks() | |
return message, updated_html | |
except Exception as e: | |
return f"Error: {str(e)}", display_bookmarks() | |
def delete_bookmark(bookmark_idx): | |
global faiss_index | |
try: | |
bookmark_idx = int(bookmark_idx) - 1 # Adjust index to match list (starting at 0) | |
if bookmark_idx < 0 or bookmark_idx >= len(bookmarks): | |
return "Invalid bookmark index.", display_bookmarks() | |
bookmarks.pop(bookmark_idx) | |
# Rebuild the FAISS index | |
if bookmarks: | |
faiss_index, embeddings = vectorize_and_index(bookmarks) | |
else: | |
faiss_index = None | |
message = "Bookmark deleted successfully." | |
updated_html = display_bookmarks() | |
return message, updated_html | |
except Exception as e: | |
return f"Error: {str(e)}", display_bookmarks() | |
def build_app(): | |
with gr.Blocks(css="app.css") as demo: | |
gr.Markdown("<h1>Bookmark Manager App</h1>") | |
with gr.Tab("Upload and Process Bookmarks"): | |
upload = gr.File(label="Upload Bookmarks HTML File", type='binary') | |
process_button = gr.Button("Process Bookmarks") | |
output_text = gr.Textbox(label="Output") | |
bookmark_display = gr.HTML(label="Bookmarks") | |
def update_bookmark_display(file): | |
message, html_content = process_uploaded_file(file) | |
return message, html_content | |
process_button.click( | |
update_bookmark_display, | |
inputs=upload, | |
outputs=[output_text, bookmark_display] | |
) | |
with gr.Tab("Chat with Bookmarks"): | |
user_input = gr.Textbox(label="Ask about your bookmarks") | |
chat_output = gr.Textbox(label="Chatbot Response") | |
chat_button = gr.Button("Send") | |
chat_button.click( | |
chatbot_response, | |
inputs=user_input, | |
outputs=chat_output | |
) | |
with gr.Tab("Manage Bookmarks"): | |
manage_output = gr.Textbox(label="Manage Output") | |
bookmark_display_manage = gr.HTML(label="Bookmarks") | |
refresh_button = gr.Button("Refresh Bookmark List") | |
with gr.Row(): | |
index_input = gr.Number(label="Bookmark Index (Starting from 1)", precision=0) | |
new_title_input = gr.Textbox(label="New Title") | |
new_url_input = gr.Textbox(label="New URL") | |
edit_button = gr.Button("Edit Bookmark") | |
delete_button = gr.Button("Delete Bookmark") | |
def update_manage_display(): | |
html_content = display_bookmarks() | |
return html_content | |
refresh_button.click( | |
update_manage_display, | |
inputs=None, | |
outputs=bookmark_display_manage | |
) | |
edit_button.click( | |
edit_bookmark, | |
inputs=[index_input, new_title_input, new_url_input], | |
outputs=[manage_output, bookmark_display_manage] | |
) | |
delete_button.click( | |
delete_bookmark, | |
inputs=index_input, | |
outputs=[manage_output, bookmark_display_manage] | |
) | |
# Initial load of the bookmarks display | |
bookmark_display_manage.value = update_manage_display() | |
demo.launch() | |
if __name__ == "__main__": | |
build_app() | |