Spaces:
Running
Running
| import os | |
| import json | |
| import re | |
| import gradio as gr | |
| import pandas as pd | |
| import requests | |
| import random | |
| import feedparser | |
| import urllib.parse | |
| from tempfile import NamedTemporaryFile | |
| from typing import List | |
| from bs4 import BeautifulSoup | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.document_loaders import PyPDFLoader, PDFMinerLoader | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.llms import HuggingFaceHub | |
| from langchain_core.runnables import RunnableParallel, RunnablePassthrough | |
| from langchain_core.documents import Document | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from openpyxl import load_workbook | |
| from openpyxl.utils.dataframe import dataframe_to_rows | |
| import camelot | |
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
| # Memory database to store question-answer pairs | |
| memory_database = {} | |
| conversation_history = [] | |
| news_database = [] | |
| def load_and_split_document_basic(file): | |
| """Loads and splits the document into pages.""" | |
| loader = PyPDFLoader(file.name) | |
| data = loader.load_and_split() | |
| return data | |
| def load_and_split_document_recursive(file: NamedTemporaryFile) -> List[Document]: | |
| """Loads and splits the document into chunks.""" | |
| loader = PyPDFLoader(file.name) | |
| pages = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| ) | |
| chunks = text_splitter.split_documents(pages) | |
| return chunks | |
| def load_and_split_document_basic(file: NamedTemporaryFile, parser: str) -> List[Document]: | |
| """Loads and splits the document into pages.""" | |
| if parser == "PyPDF": | |
| loader = PyPDFLoader(file.name) | |
| elif parser == "PDFMiner": | |
| loader = PDFMinerLoader(file.name) | |
| elif parser == "Camelot": | |
| return load_and_split_document_camelot(file) | |
| else: | |
| raise ValueError(f"Unknown parser: {parser}") | |
| return loader.load_and_split() | |
| def load_and_split_document_recursive(file: NamedTemporaryFile, parser: str) -> List[Document]: | |
| """Loads and splits the document into chunks using recursive character text splitter.""" | |
| if parser == "PyPDF": | |
| loader = PyPDFLoader(file.name) | |
| elif parser == "PDFMiner": | |
| loader = PDFMinerLoader(file.name) | |
| elif parser == "Camelot": | |
| return load_and_split_document_camelot(file) | |
| else: | |
| raise ValueError(f"Unknown parser: {parser}") | |
| pages = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| ) | |
| chunks = text_splitter.split_documents(pages) | |
| return chunks | |
| def load_and_split_document_camelot(file: NamedTemporaryFile) -> List[Document]: | |
| """Loads and splits the document using Camelot for tables and charts.""" | |
| tables = camelot.read_pdf(file.name, pages='all') | |
| documents = [] | |
| for i, table in enumerate(tables): | |
| df = table.df | |
| content = df.to_string(index=False) | |
| documents.append(Document(page_content=content, metadata={"source": file.name, "table_number": i+1})) | |
| return documents | |
| def load_document(file: NamedTemporaryFile, parser: str, use_recursive_splitter: bool) -> List[Document]: | |
| """Loads the document using the specified parser and splitting method.""" | |
| if parser == "Camelot": | |
| return load_and_split_document_camelot(file) | |
| elif use_recursive_splitter: | |
| return load_and_split_document_recursive(file, parser) | |
| else: | |
| return load_and_split_document_basic(file, parser) | |
| def update_vectors(files, use_recursive_splitter, selected_parser): | |
| if not files: | |
| return "Please upload at least one PDF file." | |
| embed = get_embeddings() | |
| total_chunks = 0 | |
| all_data = [] | |
| for file in files: | |
| data = load_document(file, selected_parser, use_recursive_splitter) | |
| all_data.extend(data) | |
| total_chunks += len(data) | |
| if os.path.exists("faiss_database"): | |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
| database.add_documents(all_data) | |
| else: | |
| database = FAISS.from_documents(all_data, embed) | |
| database.save_local("faiss_database") | |
| splitting_method = "recursive splitting" if use_recursive_splitter else "page-by-page splitting" | |
| return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files using {selected_parser} parser with {splitting_method}." | |
| def get_embeddings(): | |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
| def create_or_update_database(data, embeddings): | |
| if os.path.exists("faiss_database"): | |
| db = FAISS.load_local("faiss_database", embeddings, allow_dangerous_deserialization=True) | |
| db.add_documents(data) | |
| else: | |
| db = FAISS.from_documents(data, embeddings) | |
| db.save_local("faiss_database") | |
| def clear_cache(): | |
| if os.path.exists("faiss_database"): | |
| os.remove("faiss_database") | |
| return "Cache cleared successfully." | |
| else: | |
| return "No cache to clear." | |
| def get_similarity(text1, text2): | |
| vectorizer = TfidfVectorizer().fit_transform([text1, text2]) | |
| return cosine_similarity(vectorizer[0:1], vectorizer[1:2])[0][0] | |
| prompt = """ | |
| Answer the question based on the following information: | |
| Conversation History: | |
| {history} | |
| Context from documents: | |
| {context} | |
| Current Question: {question} | |
| If the question is referring to the conversation history, use that information to answer. | |
| If the question is not related to the conversation history, use the context from documents to answer. | |
| If you don't have enough information to answer, say so. | |
| Provide a concise and direct answer to the question: | |
| """ | |
| def get_model(temperature, top_p, repetition_penalty): | |
| return HuggingFaceHub( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.3", | |
| model_kwargs={ | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "repetition_penalty": repetition_penalty, | |
| "max_length": 1000 | |
| }, | |
| huggingfacehub_api_token=huggingface_token | |
| ) | |
| def generate_chunked_response(model, prompt, max_tokens=1000, max_chunks=5): | |
| full_response = "" | |
| for i in range(max_chunks): | |
| chunk = model(prompt + full_response, max_new_tokens=max_tokens) | |
| chunk = chunk.strip() | |
| if chunk.endswith((".", "!", "?")): | |
| full_response += chunk | |
| break | |
| full_response += chunk | |
| return full_response.strip() | |
| def manage_conversation_history(question, answer, history, max_history=5): | |
| history.append({"question": question, "answer": answer}) | |
| if len(history) > max_history: | |
| history.pop(0) | |
| return history | |
| def is_related_to_history(question, history, threshold=0.3): | |
| if not history: | |
| return False | |
| history_text = " ".join([f"{h['question']} {h['answer']}" for h in history]) | |
| similarity = get_similarity(question, history_text) | |
| return similarity > threshold | |
| def extract_text_from_webpage(html): | |
| soup = BeautifulSoup(html, 'html.parser') | |
| for script in soup(["script", "style"]): | |
| script.extract() # Remove scripts and styles | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text | |
| _useragent_list = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
| ] | |
| def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): | |
| escaped_term = urllib.parse.quote_plus(term) | |
| start = 0 | |
| all_results = [] | |
| max_chars_per_page = 8000 # Limit the number of characters from each webpage to stay under the token limit | |
| print(f"Starting Google search for term: '{term}'") | |
| with requests.Session() as session: | |
| while start < num_results: | |
| try: | |
| user_agent = random.choice(_useragent_list) | |
| headers = { | |
| 'User-Agent': user_agent | |
| } | |
| resp = session.get( | |
| url="https://www.google.com/search", | |
| headers=headers, | |
| params={ | |
| "q": term, | |
| "num": num_results - start, | |
| "hl": lang, | |
| "start": start, | |
| "safe": safe, | |
| }, | |
| timeout=timeout, | |
| verify=ssl_verify, | |
| ) | |
| resp.raise_for_status() | |
| print(f"Successfully retrieved search results page (start={start})") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error retrieving search results: {e}") | |
| break | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| result_block = soup.find_all("div", attrs={"class": "g"}) | |
| if not result_block: | |
| print("No results found on this page") | |
| break | |
| print(f"Found {len(result_block)} results on this page") | |
| for result in result_block: | |
| link = result.find("a", href=True) | |
| if link: | |
| link = link["href"] | |
| print(f"Processing link: {link}") | |
| try: | |
| webpage = session.get(link, headers=headers, timeout=timeout) | |
| webpage.raise_for_status() | |
| visible_text = extract_text_from_webpage(webpage.text) | |
| if len(visible_text) > max_chars_per_page: | |
| visible_text = visible_text[:max_chars_per_page] + "..." | |
| all_results.append({"link": link, "text": visible_text}) | |
| print(f"Successfully extracted text from {link}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error retrieving webpage content: {e}") | |
| all_results.append({"link": link, "text": None}) | |
| else: | |
| print("No link found for this result") | |
| all_results.append({"link": None, "text": None}) | |
| start += len(result_block) | |
| print(f"Search completed. Total results: {len(all_results)}") | |
| print("Search results:") | |
| for i, result in enumerate(all_results, 1): | |
| print(f"Result {i}:") | |
| print(f" Link: {result['link']}") | |
| if result['text']: | |
| print(f" Text: {result['text'][:100]}...") # Print first 100 characters | |
| else: | |
| print(" Text: None") | |
| print("End of search results") | |
| if not all_results: | |
| print("No search results found. Returning a default message.") | |
| return [{"link": None, "text": "No information found in the web search results."}] | |
| return all_results | |
| def fetch_google_news_rss(query, num_results=10): | |
| base_url = "https://news.google.com/rss/search" | |
| params = { | |
| "q": query, | |
| "hl": "en-US", | |
| "gl": "US", | |
| "ceid": "US:en" | |
| } | |
| url = f"{base_url}?{urllib.parse.urlencode(params)}" | |
| try: | |
| feed = feedparser.parse(url) | |
| articles = [] | |
| for entry in feed.entries[:num_results]: | |
| article = { | |
| "published_date": entry.get("published", "N/A"), | |
| "title": entry.get("title", "N/A"), | |
| "url": entry.get("link", "N/A"), | |
| "content": entry.get("summary", "N/A") | |
| } | |
| articles.append(article) | |
| return articles | |
| except Exception as e: | |
| print(f"Error fetching news: {str(e)}") | |
| return [] | |
| def summarize_news_content(content, model): | |
| prompt_template = """ | |
| Summarize the following news article in a concise manner: | |
| {content} | |
| Summary: | |
| """ | |
| prompt = ChatPromptTemplate.from_template(prompt_template) | |
| formatted_prompt = prompt.format(content=content) | |
| full_response = generate_chunked_response(model, formatted_prompt, max_tokens=200) | |
| # Extract only the summary part | |
| summary_parts = full_response.split("Summary:") | |
| if len(summary_parts) > 1: | |
| summary = summary_parts[-1].strip() | |
| else: | |
| summary = full_response.strip() | |
| # Create a cleaned version of the summary | |
| lines = summary.split('\n') | |
| cleaned_lines = [line for line in lines if not line.strip().startswith(("Human:", "Assistant:", "Summary:"))] | |
| cleaned_summary = ' '.join(cleaned_lines).strip() | |
| return summary, cleaned_summary | |
| def process_news(query, temperature, top_p, repetition_penalty, news_source): | |
| model = get_model(temperature, top_p, repetition_penalty) | |
| embed = get_embeddings() | |
| if news_source in website_configs: | |
| articles = fetch_news_from_website(news_source) | |
| else: | |
| return f"Invalid news source selected: {news_source}" | |
| if not articles: | |
| return f"No news articles found for {news_source}." | |
| processed_articles = [] | |
| for article in articles: | |
| try: | |
| # Remove HTML tags from content | |
| clean_content = BeautifulSoup(article["content"], "html.parser").get_text() | |
| # If content is very short, use the title as content | |
| if len(clean_content) < 50: | |
| clean_content = article["title"] | |
| full_summary, cleaned_summary = summarize_news_content(clean_content, model) | |
| relevance_score = calculate_relevance_score(cleaned_summary, model) | |
| processed_article = { | |
| "published_date": article["published_date"], | |
| "title": article["title"], | |
| "url": article["url"], | |
| "content": clean_content, | |
| "summary": full_summary, | |
| "cleaned_summary": cleaned_summary, | |
| "relevance_score": relevance_score | |
| } | |
| processed_articles.append(processed_article) | |
| except Exception as e: | |
| print(f"Error processing article: {str(e)}") | |
| if not processed_articles: | |
| return f"Failed to process any news articles from {news_source}. Please try again or check the summarization process." | |
| # Add processed articles to the database | |
| docs = [Document(page_content=article["cleaned_summary"], metadata={ | |
| "source": article["url"], | |
| "title": article["title"], | |
| "published_date": article["published_date"], | |
| "relevance_score": article["relevance_score"] | |
| }) for article in processed_articles] | |
| try: | |
| if os.path.exists("faiss_database"): | |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
| database.add_documents(docs) | |
| else: | |
| database = FAISS.from_documents(docs, embed) | |
| database.save_local("faiss_database") | |
| # Update news_database for excel export | |
| global news_database | |
| news_database = processed_articles | |
| return f"Processed and added {len(processed_articles)} news articles from {news_source} to the database." | |
| except Exception as e: | |
| return f"Error adding articles to the database: {str(e)}" | |
| website_configs = { | |
| "Golomt Bank": { | |
| "base_url": "https://golomtbank.com/en/rnews", | |
| "article_selector": 'div.entry-post.gt-box-shadow-2', | |
| "title_selector": 'h2.entry-title', | |
| "date_selector": 'div.entry-date.gt-meta', | |
| "link_selector": 'a', | |
| "content_selector": 'div.entry-content', | |
| "next_page_selector": 'a.next', | |
| "url_prefix": "https://golomtbank.com" | |
| }, | |
| "Bank of America": { | |
| "base_url": "https://newsroom.bankofamerica.com/content/newsroom/press-releases.html?page=1&year=all&category=press-release-categories/corporate-and-financial-news&categTitle=Corporate%20and%20Financial%20News", | |
| "article_selector": 'card bg-bank-gray-2', | |
| "title_selector": 'pr-list-head', | |
| "date_selector": 'prlist-date', | |
| "link_selector": 'a', | |
| "content_selector": 'richtext text', | |
| "next_page_selector": 'brand-SystemRight', | |
| "url_prefix": "https://newsroom.bankofamerica.com" | |
| }, | |
| # Add more banks as needed | |
| } | |
| def fetch_articles_from_page(url, config): | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| articles = soup.find_all(config['article_selector'].split('.')[0], class_=config['article_selector'].split('.')[-1]) | |
| return articles, soup | |
| def extract_articles(articles, config): | |
| article_data = [] | |
| for article in articles: | |
| title_div = article.find(config['title_selector'].split('.')[0], class_=config['title_selector'].split('.')[-1]) | |
| title = title_div.get_text(strip=True) if title_div else "No Title" | |
| date_div = article.find(config['date_selector'].split('.')[0], class_=config['date_selector'].split('.')[-1]) | |
| date = date_div.get_text(strip=True) if date_div else "No Date" | |
| link_tag = article.find(config['link_selector']) | |
| link = link_tag['href'] if link_tag else "No Link" | |
| if not link.startswith('http'): | |
| link = config['url_prefix'] + link | |
| article_response = requests.get(link) | |
| article_response.raise_for_status() | |
| article_soup = BeautifulSoup(article_response.content, 'html.parser') | |
| article_content_div = article_soup.find(config['content_selector'].split('.')[0], class_=config['content_selector'].split('.')[-1]) | |
| article_content = article_content_div.get_text(strip=True) if article_content_div else "No content found" | |
| article_data.append({ | |
| 'title': title, | |
| 'date': date, | |
| 'link': link, | |
| 'content': article_content | |
| }) | |
| return article_data | |
| def fetch_news_from_website(website_key, num_results=20): | |
| config = website_configs.get(website_key) | |
| if not config: | |
| return f"No configuration found for website: {website_key}" | |
| base_url = config['base_url'] | |
| current_page_url = base_url | |
| all_articles = [] | |
| try: | |
| while len(all_articles) < num_results: | |
| print(f"Fetching articles from: {current_page_url}") | |
| articles, soup = fetch_articles_from_page(current_page_url, config) | |
| if not articles: | |
| print("No articles found on this page.") | |
| break | |
| all_articles.extend(extract_articles(articles, config)) | |
| print(f"Total articles fetched so far: {len(all_articles)}") | |
| if len(all_articles) >= num_results: | |
| all_articles = all_articles[:num_results] | |
| break | |
| next_page_link = soup.find(config['next_page_selector']) | |
| if not next_page_link: | |
| print("No next page link found.") | |
| break | |
| current_page_url = next_page_link['href'] | |
| if not current_page_url.startswith('http'): | |
| current_page_url = config['url_prefix'] + current_page_url | |
| return [ | |
| { | |
| "published_date": article['date'], | |
| "title": article['title'], | |
| "url": article['link'], | |
| "content": article['content'] | |
| } for article in all_articles | |
| ] | |
| except Exception as e: | |
| print(f"Error fetching news from {website_key}: {str(e)}") | |
| return [] | |
| def export_news_to_excel(): | |
| global news_database | |
| if not news_database: | |
| return "No articles to export. Please fetch news first." | |
| print("Exporting the following articles:") | |
| for article in news_database: | |
| print(f"Title: {article['title']}, Score: {article.get('relevance_score', 'N/A')}") | |
| df = pd.DataFrame(news_database) | |
| # Ensure relevance_score is present and convert to float | |
| if 'relevance_score' not in df.columns: | |
| df['relevance_score'] = 0.0 | |
| else: | |
| df['relevance_score'] = pd.to_numeric(df['relevance_score'], errors='coerce').fillna(0.0) | |
| # Use the cleaned summary for the Excel export | |
| if 'cleaned_summary' in df.columns: | |
| df['summary'] = df['cleaned_summary'] | |
| df = df.drop(columns=['cleaned_summary']) | |
| # Reorder columns to put relevance_score after summary | |
| columns = ['published_date', 'title', 'url', 'content', 'summary', 'relevance_score'] | |
| df = df[[col for col in columns if col in df.columns]] | |
| print("Final DataFrame before export:") | |
| print(df[['title', 'relevance_score']]) | |
| with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
| excel_path = tmp.name | |
| df.to_excel(excel_path, index=False, engine='openpyxl') | |
| print(f"Excel file saved to: {excel_path}") | |
| print("Final relevance scores before export:") | |
| for article in news_database: | |
| print(f"Title: {article['title']}, Score: {article.get('relevance_score', 'N/A')}") | |
| return excel_path | |
| def calculate_relevance_score(summary, model): | |
| prompt_template = PromptTemplate( | |
| input_variables=["summary"], | |
| template="""You are a financial analyst tasked with providing a relevance score to news summaries. | |
| The score should be based on the financial significance and impact of the news. | |
| Consider the following factors when assigning relevance: | |
| - Earnings reports and financial performance | |
| - Debt issuance or restructuring | |
| - Mergers, acquisitions, or divestments | |
| - Changes in key leadership (e.g., CEO, CFO) | |
| - Regulatory changes or legal issues affecting the company | |
| - Major product launches or market expansion | |
| - Significant shifts in market share or competitive landscape | |
| - Macroeconomic factors directly impacting the company or industry | |
| - Stock price movements and trading volume changes | |
| - Dividend announcements or changes in capital allocation | |
| - Credit rating changes | |
| - Material financial events (e.g., bankruptcy, major contracts) | |
| Use the following scoring guide: | |
| - 0.00-0.20: Not relevant to finance or economics | |
| - 0.21-0.40: Slightly relevant, but minimal financial impact | |
| - 0.41-0.60: Moderately relevant, some financial implications | |
| - 0.61-0.80: Highly relevant, significant financial impact | |
| - 0.81-1.00: Extremely relevant, major financial implications | |
| Provide a score between 0.00 and 1.00, where 0.00 is not relevant at all, and 1.00 is extremely relevant from a financial perspective. | |
| Summary: {summary} | |
| Relevance Score:""" | |
| ) | |
| chain = LLMChain(llm=model, prompt=prompt_template) | |
| response = chain.run(summary=summary) | |
| print(f"Raw relevance score response: {response}") # Debug print | |
| try: | |
| # Extract the score from the response | |
| score_match = re.search(r'Relevance Score:\s*(\d+\.\d+)', response) | |
| if score_match: | |
| score = float(score_match.group(1)) | |
| final_score = min(max(score, 0.00), 1.00) # Ensure the score is between 0.00 and 1.00 | |
| print(f"Processed relevance score: {final_score}") # Debug print | |
| return final_score | |
| else: | |
| raise ValueError("No relevance score found in the response") | |
| except ValueError as e: | |
| print(f"Error parsing relevance score: {e}") | |
| return 0.00 | |
| def ask_question(question, temperature, top_p, repetition_penalty, web_search, google_news_rss): | |
| global conversation_history | |
| if not question: | |
| return "Please enter a question." | |
| model = get_model(temperature, top_p, repetition_penalty) | |
| embed = get_embeddings() | |
| # Check if the FAISS database exists | |
| if os.path.exists("faiss_database"): | |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
| else: | |
| database = None | |
| if web_search: | |
| search_results = google_search(question) | |
| web_docs = [Document(page_content=result["text"], metadata={"source": result["link"]}) for result in search_results if result["text"]] | |
| if database is None: | |
| database = FAISS.from_documents(web_docs, embed) | |
| else: | |
| database.add_documents(web_docs) | |
| database.save_local("faiss_database") | |
| context_str = "\n".join([doc.page_content for doc in web_docs]) | |
| prompt_template = """ | |
| Answer the question based on the following web search results: | |
| Web Search Results: | |
| {context} | |
| Current Question: {question} | |
| If the web search results don't contain relevant information, state that the information is not available in the search results. | |
| Provide a concise and direct answer to the question without mentioning the web search or these instructions: | |
| """ | |
| prompt_val = ChatPromptTemplate.from_template(prompt_template) | |
| formatted_prompt = prompt_val.format(context=context_str, question=question) | |
| elif google_news_rss: | |
| if database is None: | |
| return "No news articles available. Please fetch news articles first." | |
| retriever = database.as_retriever() | |
| relevant_docs = retriever.get_relevant_documents(question) | |
| context_str = "\n".join([f"Title: {doc.metadata.get('title', 'N/A')}\nURL: {doc.metadata.get('source', 'N/A')}\nSummary: {doc.page_content}" for doc in relevant_docs]) | |
| prompt_template = """ | |
| Answer the question based on the following news summaries: | |
| News Summaries: | |
| {context} | |
| Current Question: {question} | |
| If the news summaries don't contain relevant information, state that the information is not available in the news articles. | |
| Provide a concise and direct answer to the question without mentioning the news summaries or these instructions: | |
| """ | |
| prompt_val = ChatPromptTemplate.from_template(prompt_template) | |
| formatted_prompt = prompt_val.format(context=context_str, question=question) | |
| else: | |
| if database is None: | |
| return "No documents available. Please upload documents, enable web search, or fetch news articles to answer questions." | |
| history_str = "\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in conversation_history]) | |
| if is_related_to_history(question, conversation_history): | |
| context_str = "No additional context needed. Please refer to the conversation history." | |
| else: | |
| retriever = database.as_retriever() | |
| relevant_docs = retriever.get_relevant_documents(question) | |
| context_str = "\n".join([doc.page_content for doc in relevant_docs]) | |
| prompt_val = ChatPromptTemplate.from_template(prompt) | |
| formatted_prompt = prompt_val.format(history=history_str, context=context_str, question=question) | |
| full_response = generate_chunked_response(model, formatted_prompt) | |
| # Extract only the part after the last occurrence of a prompt-like sentence | |
| answer_patterns = [ | |
| r"Provide a concise and direct answer to the question without mentioning the web search or these instructions:", | |
| r"Provide a concise and direct answer to the question without mentioning the news summaries or these instructions:", | |
| r"Provide a concise and direct answer to the question:", | |
| r"Answer:" | |
| ] | |
| for pattern in answer_patterns: | |
| match = re.split(pattern, full_response, flags=re.IGNORECASE) | |
| if len(match) > 1: | |
| answer = match[-1].strip() | |
| break | |
| else: | |
| # If no pattern is found, return the full response | |
| answer = full_response.strip() | |
| if not web_search and not google_news_rss: | |
| memory_database[question] = answer | |
| conversation_history = manage_conversation_history(question, answer, conversation_history) | |
| return answer | |
| def extract_db_to_excel(): | |
| embed = get_embeddings() | |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
| documents = database.docstore._dict.values() | |
| data = [{"page_content": doc.page_content, "metadata": json.dumps(doc.metadata)} for doc in documents] | |
| df = pd.DataFrame(data) | |
| with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
| excel_path = tmp.name | |
| df.to_excel(excel_path, index=False) | |
| return excel_path | |
| def export_memory_db_to_excel(): | |
| data = [{"question": question, "answer": answer} for question, answer in memory_database.items()] | |
| df_memory = pd.DataFrame(data) | |
| data_history = [{"question": item["question"], "answer": item["answer"]} for item in conversation_history] | |
| df_history = pd.DataFrame(data_history) | |
| with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
| excel_path = tmp.name | |
| with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: | |
| df_memory.to_excel(writer, sheet_name='Memory Database', index=False) | |
| df_history.to_excel(writer, sheet_name='Conversation History', index=False) | |
| return excel_path | |
| # Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Chat with your PDF documents and News") | |
| with gr.Row(): | |
| file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) | |
| update_button = gr.Button("Update Vector Store") | |
| use_recursive_splitter = gr.Checkbox(label="Use Recursive Text Splitter", value=False) | |
| parser_dropdown = gr.Dropdown( | |
| choices=["PyPDF", "PDFMiner", "Camelot"], | |
| label="Select Parser", | |
| value="PyPDF" | |
| ) | |
| update_output = gr.Textbox(label="Update Status") | |
| update_button.click(update_vectors, inputs=[file_input, use_recursive_splitter, parser_dropdown], outputs=update_output) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Conversation") | |
| question_input = gr.Textbox(label="Ask a question about your documents or news") | |
| submit_button = gr.Button("Submit") | |
| with gr.Column(scale=1): | |
| temperature_slider = gr.Slider(label="Temperature", minimum=0.0, maximum=1.0, value=0.5, step=0.1) | |
| top_p_slider = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, value=0.9, step=0.1) | |
| repetition_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.0, step=0.1) | |
| web_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False) | |
| google_news_rss_checkbox = gr.Checkbox(label="Google News RSS", value=False) | |
| def chat(question, history, temperature, top_p, repetition_penalty, web_search, google_news_rss): | |
| answer = ask_question(question, temperature, top_p, repetition_penalty, web_search, google_news_rss) | |
| history.append((question, answer)) | |
| return "", history | |
| submit_button.click(chat, inputs=[question_input, chatbot, temperature_slider, top_p_slider, repetition_penalty_slider, web_search_checkbox, google_news_rss_checkbox], outputs=[question_input, chatbot]) | |
| with gr.Row(): | |
| news_query_input = gr.Textbox(label="News Query") | |
| news_source_dropdown = gr.Dropdown( | |
| choices=list(website_configs.keys()), | |
| label="Select News Source", | |
| value=list(website_configs.keys())[0] | |
| ) | |
| fetch_news_button = gr.Button("Fetch News") | |
| news_fetch_output = gr.Textbox(label="News Fetch Status") | |
| def fetch_news(query, temperature, top_p, repetition_penalty, news_source): | |
| return process_news(query, temperature, top_p, repetition_penalty, news_source) | |
| fetch_news_button.click( | |
| fetch_news, | |
| inputs=[news_query_input, temperature_slider, top_p_slider, repetition_penalty_slider, news_source_dropdown], | |
| outputs=news_fetch_output | |
| ) | |
| extract_button = gr.Button("Extract Database to Excel") | |
| excel_output = gr.File(label="Download Excel File") | |
| extract_button.click(extract_db_to_excel, inputs=[], outputs=excel_output) | |
| export_memory_button = gr.Button("Export Memory Database to Excel") | |
| memory_excel_output = gr.File(label="Download Memory Excel File") | |
| export_memory_button.click(export_memory_db_to_excel, inputs=[], outputs=memory_excel_output) | |
| export_news_button = gr.Button("Download News Excel File") | |
| news_excel_output = gr.File(label="Download News Excel File") | |
| export_news_button.click(export_news_to_excel, inputs=[], outputs=news_excel_output) | |
| clear_button = gr.Button("Clear Cache") | |
| clear_output = gr.Textbox(label="Cache Status") | |
| clear_button.click(clear_cache, inputs=[], outputs=clear_output) | |
| if __name__ == "__main__": | |
| demo.launch() |