# app.py import gradio as gr from bs4 import BeautifulSoup from sentence_transformers import SentenceTransformer import faiss import numpy as np import requests import time import re import base64 import logging import os import sys import concurrent.futures from concurrent.futures import ThreadPoolExecutor import threading from collections import deque # Import OpenAI library import openai # Suppress only the single warning from urllib3 needed. import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Set up logging to output to the console logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Create a console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) # Create a formatter and set it for the handler formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s') console_handler.setFormatter(formatter) # Add the handler to the logger logger.addHandler(console_handler) # Initialize variables and models logger.info("Initializing variables and models") embedding_model = SentenceTransformer('all-MiniLM-L6-v2') faiss_index = None bookmarks = [] fetch_cache = {} # Lock for thread-safe operations lock = threading.Lock() # Define the categories CATEGORIES = [ "Social Media", "News and Media", "Education and Learning", "Entertainment", "Shopping and E-commerce", "Finance and Banking", "Technology", "Health and Fitness", "Travel and Tourism", "Food and Recipes", "Sports", "Arts and Culture", "Government and Politics", "Business and Economy", "Science and Research", "Personal Blogs and Journals", "Job Search and Careers", "Music and Audio", "Videos and Movies", "Reference and Knowledge Bases", "Dead Link", "Uncategorized", ] # Set up Groq Cloud API key and base URL GROQ_API_KEY = os.getenv('GROQ_API_KEY') if not GROQ_API_KEY: logger.error("GROQ_API_KEY environment variable not set.") openai.api_key = GROQ_API_KEY openai.api_base = "https://api.groq.com/openai/v1" # Initialize global variables for rate limiting api_lock = threading.Lock() last_api_call_time = 0 # Rate Limiter Configuration RPM_LIMIT = 30 # Requests per minute TPM_LIMIT = 40000 # Tokens per minute # Implementing a Token Bucket Rate Limiter class TokenBucket: def __init__(self, rate, capacity): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.timestamp = time.time() self.lock = threading.Lock() def consume(self, tokens=1): with self.lock: now = time.time() elapsed = now - self.timestamp # Refill tokens refill = elapsed * self.rate self.tokens = min(self.capacity, self.tokens + refill) self.timestamp = now if self.tokens >= tokens: self.tokens -= tokens return True else: return False def wait_for_token(self, tokens=1): while not self.consume(tokens): time.sleep(0.1) # Initialize rate limiters rpm_rate = RPM_LIMIT / 60 # tokens per second tpm_rate = TPM_LIMIT / 60 # tokens per second rpm_bucket = TokenBucket(rate=rpm_rate, capacity=RPM_LIMIT) tpm_bucket = TokenBucket(rate=tpm_rate, capacity=TPM_LIMIT) def extract_main_content(soup): """ Extract the main content from a webpage while filtering out boilerplate content. """ if not soup: return "" # Remove unwanted elements for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'form', 'noscript']): element.decompose() # Extract text from
tags p_tags = soup.find_all('p') if p_tags: content = ' '.join([p.get_text(strip=True, separator=' ') for p in p_tags]) else: # Fallback to body content content = soup.get_text(separator=' ', strip=True) # Clean up the text content = re.sub(r'\s+', ' ', content) # Truncate content to a reasonable length (e.g., 1500 words) words = content.split() if len(words) > 1500: content = ' '.join(words[:1500]) return content def get_page_metadata(soup): """ Extract metadata from the webpage including title, description, and keywords. """ metadata = { 'title': '', 'description': '', 'keywords': '' } if not soup: return metadata # Get title title_tag = soup.find('title') if title_tag and title_tag.string: metadata['title'] = title_tag.string.strip() # Get meta description meta_desc = ( soup.find('meta', attrs={'name': 'description'}) or soup.find('meta', attrs={'property': 'og:description'}) or soup.find('meta', attrs={'name': 'twitter:description'}) ) if meta_desc: metadata['description'] = meta_desc.get('content', '').strip() # Get meta keywords meta_keywords = soup.find('meta', attrs={'name': 'keywords'}) if meta_keywords: metadata['keywords'] = meta_keywords.get('content', '').strip() # Get OG title if main title is empty if not metadata['title']: og_title = soup.find('meta', attrs={'property': 'og:title'}) if og_title: metadata['title'] = og_title.get('content', '').strip() return metadata def generate_summary_and_assign_category(bookmark): """ Generate a concise summary and assign a category using a single LLM call. For slow links, always provide a summary. For dead links, provide a summary if possible; otherwise, ignore. """ logger.info(f"Generating summary and assigning category for bookmark: {bookmark.get('url')}") max_retries = 3 retry_count = 0 while retry_count < max_retries: try: # Rate Limiting rpm_bucket.wait_for_token() # Estimate tokens: prompt + max_tokens # Here, we assume max_tokens=150 tpm_bucket.wait_for_token(tokens=150) html_content = bookmark.get('html_content', '') soup = BeautifulSoup(html_content, 'html.parser') metadata = get_page_metadata(soup) main_content = extract_main_content(soup) # Prepare content for the prompt content_parts = [] if metadata['title']: content_parts.append(f"Title: {metadata['title']}") if metadata['description']: content_parts.append(f"Description: {metadata['description']}") if metadata['keywords']: content_parts.append(f"Keywords: {metadata['keywords']}") if main_content: content_parts.append(f"Main Content: {main_content}") content_text = '\n'.join(content_parts) # Detect insufficient or erroneous content error_keywords = ['Access Denied', 'Security Check', 'Cloudflare', 'captcha', 'unusual traffic'] if not content_text or len(content_text.split()) < 50: use_prior_knowledge = True logger.info(f"Content for {bookmark.get('url')} is insufficient. Instructing LLM to use prior knowledge.") elif any(keyword.lower() in content_text.lower() for keyword in error_keywords): use_prior_knowledge = True logger.info(f"Content for {bookmark.get('url')} contains error messages. Instructing LLM to use prior knowledge.") else: use_prior_knowledge = False if use_prior_knowledge: prompt = f""" You are a knowledgeable assistant with up-to-date information as of 2023. URL: {bookmark.get('url')} Provide: 1. A concise summary (max two sentences) about this website. 2. Assign the most appropriate category from the list below. Categories: {', '.join([f'"{cat}"' for cat in CATEGORIES])} Format: Summary: [Your summary] Category: [One category] """ else: prompt = f""" You are an assistant that creates concise webpage summaries and assigns categories. Content: {content_text} Provide: 1. A concise summary (max two sentences) focusing on the main topic. 2. Assign the most appropriate category from the list below. Categories: {', '.join([f'"{cat}"' for cat in CATEGORIES])} Format: Summary: [Your summary] Category: [One category] """ def estimate_tokens(text): return len(text) / 4 prompt_tokens = estimate_tokens(prompt) max_tokens = 150 total_tokens = prompt_tokens + max_tokens tokens_per_minute = 40000 tokens_per_second = tokens_per_minute / 60 required_delay = total_tokens / tokens_per_second sleep_time = max(required_delay, 2) response = openai.ChatCompletion.create( model='llama-3.1-70b-versatile', messages=[ {"role": "user", "content": prompt} ], max_tokens=int(max_tokens), temperature=0.5, ) content = response['choices'][0]['message']['content'].strip() if not content: raise ValueError("Empty response received from the model.") summary_match = re.search(r"Summary:\s*(.*)", content, re.IGNORECASE) category_match = re.search(r"Category:\s*(.*)", content, re.IGNORECASE) # Extract summary if summary_match: summary = summary_match.group(1).strip() if summary: bookmark['summary'] = summary else: # For dead links, only set summary if it's a slow link if bookmark.get('slow_link', False): bookmark['summary'] = metadata.get('description') or metadata.get('title') or 'No summary available.' else: # For dead links without summary, do not set 'summary' bookmark['summary'] = '' else: if bookmark.get('slow_link', False): bookmark['summary'] = metadata.get('description') or metadata.get('title') or 'No summary available.' else: bookmark['summary'] = '' # Extract category if category_match: category = category_match.group(1).strip().strip('"') bookmark['category'] = category if category in CATEGORIES else 'Uncategorized' else: bookmark['category'] = 'Uncategorized' # Simple keyword-based validation summary_lower = bookmark.get('summary', '').lower() url_lower = bookmark['url'].lower() if 'social media' in summary_lower or 'twitter' in summary_lower or 'x.com' in url_lower: bookmark['category'] = 'Social Media' elif 'wikipedia' in url_lower: bookmark['category'] = 'Reference and Knowledge Bases' logger.info("Successfully generated summary and assigned category") time.sleep(sleep_time) break except openai.error.RateLimitError as e: retry_count += 1 wait_time = int(e.headers.get("Retry-After", 5)) logger.warning(f"Rate limit reached. Waiting for {wait_time} seconds before retrying... (Attempt {retry_count}/{max_retries})") time.sleep(wait_time) except Exception as e: logger.error(f"Error generating summary and assigning category: {e}", exc_info=True) # For slow links, provide a summary from metadata or title if bookmark.get('slow_link', False): bookmark['summary'] = metadata.get('description') or metadata.get('title') or 'No summary available.' # For dead links, attempt to set summary; if not possible, leave it unset elif bookmark.get('dead_link', False): bookmark['summary'] = metadata.get('description') or metadata.get('title') or '' else: bookmark['summary'] = 'No summary available.' bookmark['category'] = 'Uncategorized' break def parse_bookmarks(file_content): """ Parse bookmarks from HTML file. """ logger.info("Parsing bookmarks") try: soup = BeautifulSoup(file_content, 'html.parser') extracted_bookmarks = [] for link in soup.find_all('a'): url = link.get('href') title = link.text.strip() if url and title: if url.startswith('http://') or url.startswith('https://'): extracted_bookmarks.append({'url': url, 'title': title}) else: logger.info(f"Skipping non-http/https URL: {url}") logger.info(f"Extracted {len(extracted_bookmarks)} bookmarks") return extracted_bookmarks except Exception as e: logger.error("Error parsing bookmarks: %s", e, exc_info=True) raise def fetch_url_info(bookmark): """ Fetch information about a URL. """ url = bookmark['url'] if url in fetch_cache: with lock: bookmark.update(fetch_cache[url]) return try: logger.info(f"Fetching URL info for: {url}") headers = { 'User-Agent': 'Mozilla/5.0', 'Accept-Language': 'en-US,en;q=0.9', } response = requests.get(url, headers=headers, timeout=5, verify=False, allow_redirects=True) bookmark['etag'] = response.headers.get('ETag', 'N/A') bookmark['status_code'] = response.status_code content = response.text logger.info(f"Fetched content length for {url}: {len(content)} characters") if response.status_code >= 500: bookmark['dead_link'] = True bookmark['html_content'] = content # Keep content to extract metadata if possible logger.warning(f"Dead link detected: {url} with status {response.status_code}") else: bookmark['dead_link'] = False bookmark['html_content'] = content logger.info(f"Fetched information for {url}") except requests.exceptions.Timeout: bookmark['dead_link'] = False bookmark['etag'] = 'N/A' bookmark['status_code'] = 'Timeout' bookmark['html_content'] = '' bookmark['slow_link'] = True logger.warning(f"Timeout while fetching {url}. Marking as 'Slow'.") except Exception as e: bookmark['dead_link'] = True bookmark['etag'] = 'N/A' bookmark['status_code'] = 'Error' bookmark['html_content'] = '' logger.error(f"Error fetching URL info for {url}: {e}", exc_info=True) finally: # Extract meta description for dead links if content is available if bookmark.get('dead_link', False) and bookmark.get('html_content'): soup = BeautifulSoup(bookmark['html_content'], 'html.parser') metadata = get_page_metadata(soup) bookmark['description'] = metadata.get('description', '') elif not bookmark.get('dead_link', False): # For active and slow links, attempt to extract description soup = BeautifulSoup(bookmark['html_content'], 'html.parser') metadata = get_page_metadata(soup) bookmark['description'] = metadata.get('description', '') else: bookmark['description'] = '' with lock: fetch_cache[url] = { 'etag': bookmark.get('etag'), 'status_code': bookmark.get('status_code'), 'dead_link': bookmark.get('dead_link'), 'description': bookmark.get('description'), 'html_content': bookmark.get('html_content', ''), 'slow_link': bookmark.get('slow_link', False), } def vectorize_and_index(bookmarks_list): """ Create vector embeddings for bookmarks and build FAISS index with ID mapping. """ global faiss_index logger.info("Vectorizing summaries and building FAISS index") try: # Safely access 'summary' using .get() to avoid KeyError summaries = [bookmark.get('summary', '') for bookmark in bookmarks_list] embeddings = embedding_model.encode(summaries) dimension = embeddings.shape[1] index = faiss.IndexIDMap(faiss.IndexFlatL2(dimension)) ids = np.array([bookmark['id'] for bookmark in bookmarks_list], dtype=np.int64) index.add_with_ids(np.array(embeddings).astype('float32'), ids) faiss_index = index logger.info("FAISS index built successfully with IDs") return index except Exception as e: logger.error(f"Error in vectorizing and indexing: {e}", exc_info=True) raise def display_bookmarks(): """ Generate HTML display for bookmarks. """ logger.info("Generating HTML display for bookmarks") cards = '' for i, bookmark in enumerate(bookmarks): index = i + 1 if bookmark.get('dead_link'): status = "❌ Dead Link" card_style = "border: 2px solid red;" text_style = "color: white;" # For dead links, use 'summary' if available summary = bookmark.get('summary', '') if not summary: # Provide a default message or leave it empty summary = 'No summary available.' elif bookmark.get('slow_link'): status = "⏳ Slow Response" card_style = "border: 2px solid orange;" text_style = "color: white;" # For slow links, always provide a summary summary = bookmark.get('summary', 'No summary available.') else: status = "✅ Active" card_style = "border: 2px solid green;" text_style = "color: white;" summary = bookmark.get('summary', 'No summary available.') title = bookmark['title'] url = bookmark['url'] etag = bookmark.get('etag', 'N/A') category = bookmark.get('category', 'Uncategorized') # Escape HTML content to prevent XSS attacks from html import escape title = escape(title) url = escape(url) summary = escape(summary) category = escape(category) card_html = f'''