# app.py import gradio as gr from bs4 import BeautifulSoup from sentence_transformers import SentenceTransformer import faiss import numpy as np import requests import time import re import logging import os import sys import concurrent.futures from concurrent.futures import ThreadPoolExecutor import threading from queue import Queue, Empty import json # Import OpenAI library import openai # Suppress only the single warning from urllib3 needed. import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Set up logging to output to the console logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Create a console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) # Create a formatter and set it for the handler formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s') console_handler.setFormatter(formatter) # Add the handler to the logger logger.addHandler(console_handler) # Initialize variables and models logger.info("Initializing variables and models") embedding_model = SentenceTransformer('all-MiniLM-L6-v2') faiss_index = None bookmarks = [] fetch_cache = {} # Lock for thread-safe operations lock = threading.Lock() # Define the categories CATEGORIES = [ "Social Media", "News and Media", "Education and Learning", "Entertainment", "Shopping and E-commerce", "Finance and Banking", "Technology", "Health and Fitness", "Travel and Tourism", "Food and Recipes", "Sports", "Arts and Culture", "Government and Politics", "Business and Economy", "Science and Research", "Personal Blogs and Journals", "Job Search and Careers", "Music and Audio", "Videos and Movies", "Reference and Knowledge Bases", "Dead Link", "Uncategorized", ] # Set up Groq Cloud API key and base URL GROQ_API_KEY = os.getenv('GROQ_API_KEY') if not GROQ_API_KEY: logger.error("GROQ_API_KEY environment variable not set.") openai.api_key = GROQ_API_KEY openai.api_base = "https://api.groq.com/openai/v1" # Initialize global variables for rate limiting api_lock = threading.Lock() last_api_call_time = 0 # Rate Limiter Configuration RPM_LIMIT = 30 # Requests per minute TPM_LIMIT = 40000 # Tokens per minute # Implementing a Token Bucket Rate Limiter class TokenBucket: def __init__(self, rate, capacity): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.timestamp = time.time() self.lock = threading.Lock() def consume(self, tokens=1): with self.lock: now = time.time() elapsed = now - self.timestamp refill = elapsed * self.rate self.tokens = min(self.capacity, self.tokens + refill) self.timestamp = now if self.tokens >= tokens: self.tokens -= tokens return True else: return False def wait_for_token(self, tokens=1): while not self.consume(tokens): time.sleep(0.1) # Initialize rate limiters rpm_rate = RPM_LIMIT / 60 # tokens per second tpm_rate = TPM_LIMIT / 60 # tokens per second rpm_bucket = TokenBucket(rate=rpm_rate, capacity=RPM_LIMIT) tpm_bucket = TokenBucket(rate=tpm_rate, capacity=TPM_LIMIT) # Queue for LLM tasks llm_queue = Queue() def llm_worker(): """ Worker thread to process LLM tasks from the queue while respecting rate limits. """ logger.info("LLM worker started.") while True: try: bookmark = llm_queue.get(timeout=60) # Wait for a task except Empty: continue # No task, continue waiting if bookmark is None: logger.info("LLM worker shutting down.") break # Exit signal try: # Rate Limiting rpm_bucket.wait_for_token() # Estimate tokens: prompt + max_tokens # Here, we assume max_tokens=150 tpm_bucket.wait_for_token(tokens=150) html_content = bookmark.get('html_content', '') soup = BeautifulSoup(html_content, 'html.parser') metadata = get_page_metadata(soup) main_content = extract_main_content(soup) # Prepare content for the prompt content_parts = [] if metadata['title']: content_parts.append(f"Title: {metadata['title']}") if metadata['description']: content_parts.append(f"Description: {metadata['description']}") if metadata['keywords']: content_parts.append(f"Keywords: {metadata['keywords']}") if main_content: content_parts.append(f"Main Content: {main_content}") content_text = '\n'.join(content_parts) # Detect insufficient or erroneous content error_keywords = ['Access Denied', 'Security Check', 'Cloudflare', 'captcha', 'unusual traffic'] if not content_text or len(content_text.split()) < 50: use_prior_knowledge = True logger.info(f"Content for {bookmark.get('url')} is insufficient. Instructing LLM to use prior knowledge.") elif any(keyword.lower() in content_text.lower() for keyword in error_keywords): use_prior_knowledge = True logger.info(f"Content for {bookmark.get('url')} contains error messages. Instructing LLM to use prior knowledge.") else: use_prior_knowledge = False if use_prior_knowledge: prompt = f""" You are a knowledgeable assistant with up-to-date information as of 2023. URL: {bookmark.get('url')} Provide: 1. A concise summary (max two sentences) about this website. 2. Assign the most appropriate category from the list below. Categories: {', '.join([f'"{cat}"' for cat in CATEGORIES])} Format: Please provide your response in the following JSON format: {{ "summary": "Your summary here.", "category": "One category from the list." }} """ else: prompt = f""" You are an assistant that creates concise webpage summaries and assigns categories. Content: {content_text} Provide: 1. A concise summary (max two sentences) focusing on the main topic. 2. Assign the most appropriate category from the list below. Categories: {', '.join([f'"{cat}"' for cat in CATEGORIES])} Format: Please provide your response in the following JSON format: {{ "summary": "Your summary here.", "category": "One category from the list." }} """ response = openai.ChatCompletion.create( model='llama-3.1-70b-versatile', messages=[ {"role": "user", "content": prompt} ], max_tokens=150, temperature=0.5, ) content = response['choices'][0]['message']['content'].strip() if not content: raise ValueError("Empty response received from the model.") # Parse JSON response try: json_response = json.loads(content) summary = json_response.get('summary', '').strip() category = json_response.get('category', '').strip() # Validate and assign if not summary: summary = metadata.get('description') or metadata.get('title') or 'No summary available.' bookmark['summary'] = summary if category in CATEGORIES: bookmark['category'] = category else: # Fallback to keyword-based categorization bookmark['category'] = categorize_based_on_summary(summary, bookmark['url']) except json.JSONDecodeError: logger.error(f"Failed to parse JSON response for {bookmark.get('url')}. Using fallback methods.") # Fallback methods bookmark['summary'] = metadata.get('description') or metadata.get('title') or 'No summary available.' bookmark['category'] = categorize_based_on_summary(bookmark['summary'], bookmark['url']) # Additional keyword-based validation bookmark['category'] = validate_category(bookmark) logger.info("Successfully generated summary and assigned category") except openai.error.RateLimitError as e: logger.warning(f"LLM Rate limit reached while processing {bookmark.get('url')}. Retrying later...") # Re-enqueue the bookmark for retry llm_queue.put(bookmark) time.sleep(60) # Wait before retrying except Exception as e: logger.error(f"Error generating summary and assigning category for {bookmark.get('url')}: {e}", exc_info=True) # Assign default values on failure bookmark['summary'] = 'No summary available.' bookmark['category'] = 'Uncategorized' finally: llm_queue.task_done() def categorize_based_on_summary(summary, url): """ Assign category based on keywords in the summary or URL. """ summary_lower = summary.lower() url_lower = url.lower() if 'social media' in summary_lower or 'twitter' in summary_lower or 'x.com' in url_lower: return 'Social Media' elif 'wikipedia' in url_lower: return 'Reference and Knowledge Bases' elif 'cloud computing' in summary_lower or 'aws' in summary_lower: return 'Technology' elif 'news' in summary_lower or 'media' in summary_lower: return 'News and Media' elif 'education' in summary_lower or 'learning' in summary_lower: return 'Education and Learning' # Add more conditions as needed else: return 'Uncategorized' def validate_category(bookmark): """ Further validate and adjust the category if needed. """ # Example: Specific cases based on URL url_lower = bookmark['url'].lower() if 'facebook' in url_lower or 'x.com' in url_lower: return 'Social Media' elif 'wikipedia' in url_lower: return 'Reference and Knowledge Bases' elif 'aws.amazon.com' in url_lower: return 'Technology' # Add more specific cases as needed else: return bookmark['category'] def extract_main_content(soup): """ Extract the main content from a webpage while filtering out boilerplate content. """ if not soup: return "" # Remove unwanted elements for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'form', 'noscript']): element.decompose() # Extract text from
tags p_tags = soup.find_all('p') if p_tags: content = ' '.join([p.get_text(strip=True, separator=' ') for p in p_tags]) else: # Fallback to body content content = soup.get_text(separator=' ', strip=True) # Clean up the text content = re.sub(r'\s+', ' ', content) # Truncate content to a reasonable length (e.g., 1500 words) words = content.split() if len(words) > 1500: content = ' '.join(words[:1500]) return content def get_page_metadata(soup): """ Extract metadata from the webpage including title, description, and keywords. """ metadata = { 'title': '', 'description': '', 'keywords': '' } if not soup: return metadata # Get title title_tag = soup.find('title') if title_tag and title_tag.string: metadata['title'] = title_tag.string.strip() # Get meta description meta_desc = ( soup.find('meta', attrs={'name': 'description'}) or soup.find('meta', attrs={'property': 'og:description'}) or soup.find('meta', attrs={'name': 'twitter:description'}) ) if meta_desc: metadata['description'] = meta_desc.get('content', '').strip() # Get meta keywords meta_keywords = soup.find('meta', attrs={'name': 'keywords'}) if meta_keywords: metadata['keywords'] = meta_keywords.get('content', '').strip() # Get OG title if main title is empty if not metadata['title']: og_title = soup.find('meta', attrs={'property': 'og:title'}) if og_title: metadata['title'] = og_title.get('content', '').strip() return metadata def generate_summary_and_assign_category(bookmark): """ Generate a concise summary and assign a category using a single LLM call. This function decides whether to use metadata or enqueue an LLM call. """ # Check if metadata can provide a summary description = bookmark.get('description', '').strip() title = bookmark.get('title', '').strip() if description: # Use description as summary bookmark['summary'] = description # Assign category based on description or title bookmark['category'] = categorize_based_on_summary(description, bookmark['url']) logger.info(f"Summary derived from metadata for {bookmark.get('url')}") elif title: # Use title as summary bookmark['summary'] = title # Assign category based on title bookmark['category'] = categorize_based_on_summary(title, bookmark['url']) logger.info(f"Summary derived from title for {bookmark.get('url')}") else: # Enqueue for LLM processing logger.info(f"No sufficient metadata for {bookmark.get('url')}. Enqueuing for LLM summary generation.") llm_queue.put(bookmark) def parse_bookmarks(file_content): """ Parse bookmarks from HTML file. """ logger.info("Parsing bookmarks") try: soup = BeautifulSoup(file_content, 'html.parser') extracted_bookmarks = [] for link in soup.find_all('a'): url = link.get('href') title = link.text.strip() if url and title: if url.startswith('http://') or url.startswith('https://'): extracted_bookmarks.append({'url': url, 'title': title}) else: logger.info(f"Skipping non-http/https URL: {url}") logger.info(f"Extracted {len(extracted_bookmarks)} bookmarks") return extracted_bookmarks except Exception as e: logger.error("Error parsing bookmarks: %s", e, exc_info=True) raise def fetch_url_info(bookmark): """ Fetch information about a URL. """ url = bookmark['url'] if url in fetch_cache: with lock: bookmark.update(fetch_cache[url]) return try: logger.info(f"Fetching URL info for: {url}") headers = { 'User-Agent': 'Mozilla/5.0', 'Accept-Language': 'en-US,en;q=0.9', } response = requests.get(url, headers=headers, timeout=5, verify=False, allow_redirects=True) bookmark['etag'] = response.headers.get('ETag', 'N/A') bookmark['status_code'] = response.status_code content = response.text logger.info(f"Fetched content length for {url}: {len(content)} characters") if response.status_code >= 500: bookmark['dead_link'] = True bookmark['description'] = '' bookmark['html_content'] = '' logger.warning(f"Dead link detected: {url} with status {response.status_code}") else: bookmark['dead_link'] = False bookmark['html_content'] = content bookmark['description'] = '' logger.info(f"Fetched information for {url}") except requests.exceptions.Timeout: bookmark['dead_link'] = False bookmark['etag'] = 'N/A' bookmark['status_code'] = 'Timeout' bookmark['description'] = '' bookmark['html_content'] = '' bookmark['slow_link'] = True logger.warning(f"Timeout while fetching {url}. Marking as 'Slow'.") except Exception as e: bookmark['dead_link'] = True bookmark['etag'] = 'N/A' bookmark['status_code'] = 'Error' bookmark['description'] = '' bookmark['html_content'] = '' logger.error(f"Error fetching URL info for {url}: {e}", exc_info=True) finally: with lock: fetch_cache[url] = { 'etag': bookmark.get('etag'), 'status_code': bookmark.get('status_code'), 'dead_link': bookmark.get('dead_link'), 'description': bookmark.get('description'), 'html_content': bookmark.get('html_content', ''), 'slow_link': bookmark.get('slow_link', False), } def vectorize_and_index(bookmarks_list): """ Create vector embeddings for bookmarks and build FAISS index with ID mapping. """ global faiss_index logger.info("Vectorizing summaries and building FAISS index") try: summaries = [bookmark['summary'] for bookmark in bookmarks_list] embeddings = embedding_model.encode(summaries) dimension = embeddings.shape[1] index = faiss.IndexIDMap(faiss.IndexFlatL2(dimension)) ids = np.array([bookmark['id'] for bookmark in bookmarks_list], dtype=np.int64) index.add_with_ids(np.array(embeddings).astype('float32'), ids) faiss_index = index logger.info("FAISS index built successfully with IDs") return index except Exception as e: logger.error(f"Error in vectorizing and indexing: {e}", exc_info=True) raise def display_bookmarks(): """ Generate HTML display for bookmarks. """ logger.info("Generating HTML display for bookmarks") cards = '' for i, bookmark in enumerate(bookmarks): index = i + 1 if bookmark.get('dead_link'): status = "❌ Dead Link" card_style = "border: 2px solid red;" text_style = "color: white;" elif bookmark.get('slow_link'): status = "⏳ Slow Response" card_style = "border: 2px solid orange;" text_style = "color: white;" else: status = "✅ Active" card_style = "border: 2px solid green;" text_style = "color: white;" title = bookmark['title'] url = bookmark['url'] etag = bookmark.get('etag', 'N/A') summary = bookmark.get('summary', '') category = bookmark.get('category', 'Uncategorized') # Escape HTML content to prevent XSS attacks from html import escape title = escape(title) url = escape(url) summary = escape(summary) category = escape(category) card_html = f'''