File size: 10,339 Bytes
a2335c5
 
10e4113
 
c1cdf7c
10e4113
e283410
10e4113
a7932b8
302823e
10e4113
a2335c5
302823e
 
 
 
 
 
a2335c5
46adcec
ef5fd00
10e4113
302823e
 
 
 
 
 
 
 
 
14fbe41
302823e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10e4113
302823e
 
 
 
 
 
 
 
 
 
 
 
 
 
10e4113
302823e
 
 
 
 
 
 
 
 
 
ecb4d0c
302823e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ecb4d0c
302823e
 
 
 
 
 
 
 
ecb4d0c
 
302823e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10e4113
302823e
 
10e4113
302823e
 
 
 
 
 
 
 
 
10e4113
302823e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import requests
from bs4 import BeautifulSoup
import gradio as gr
from huggingface_hub import InferenceClient
import random
import urllib.parse
from datetime import datetime, timedelta
import re
import os
import PyPDF2
# List of user agents to rotate through
_useragent_list = [
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36",
]
API_URL = "https://api-inference.huggingface.co/models/meta-llama/Meta-Llama-3-8B-Instruct"
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"}
def query_llama(payload):
   """Send a query to the Llama model via Hugging Face API"""
   try:
       print(f"Payload: {payload}")  # Debug: Print payload
       response = requests.post(API_URL, headers=headers, json=payload)
       response.raise_for_status()
       return response.json()
   except requests.exceptions.RequestException as e:
       print(f"Error querying Llama model: {e}")
       return None
def google_search(term, num_results=1, lang="en", timeout=30, safe="active", ssl_verify=None, days_back=90):
   """Perform a Google search and return results"""
   print(f"Searching for term: {term}")
   # Calculate the date range
   end_date = datetime.now()
   start_date = end_date - timedelta(days=days_back)
   # Format dates as strings
   start_date_str = start_date.strftime("%Y-%m-%d")
   end_date_str = end_date.strftime("%Y-%m-%d")
   # Add the date range to the search term
   search_term = f"{term} financial earnings report after:{start_date_str} before:{end_date_str}"
   escaped_term = urllib.parse.quote_plus(search_term)
   start = 0
   all_results = []
   max_attempts = num_results * 2  # Allow for some failed attempts
   with requests.Session() as session:
       attempts = 0
       while len(all_results) < num_results and attempts < max_attempts:
           try:
               # Choose a random user agent
               user_agent = random.choice(_useragent_list)
               headers = {'User-Agent': user_agent}
               resp = session.get(
                   url="https://www.google.com/search",
                   headers=headers,
                   params={
                       "q": search_term,
                       "num": num_results - len(all_results),
                       "hl": lang,
                       "start": start,
                       "safe": safe,
                   },
                   timeout=timeout,
                   verify=ssl_verify,
               )
               resp.raise_for_status()
               soup = BeautifulSoup(resp.text, "html.parser")
               result_block = soup.find_all("div", attrs={"class": "g"})
               if not result_block:
                   print("No more results found.")
                   break
               for result in result_block:
                   if len(all_results) >= num_results:
                       break
                   link = result.find("a", href=True)
                   if link:
                       link = link["href"]
                       print(f"Found link: {link}")
                       try:
                           webpage = session.get(link, headers=headers, timeout=timeout)
                           webpage.raise_for_status()
                           visible_text = extract_text_from_webpage(webpage.text)
                           all_results.append({"link": link, "text": visible_text})
                       except requests.exceptions.HTTPError as e:
                           if e.response.status_code == 403:
                               print(f"403 Forbidden error for {link}, skipping...")
                           else:
                               print(f"HTTP error {e.response.status_code} for {link}, skipping...")
                       except requests.exceptions.RequestException as e:
                           print(f"Error fetching or processing {link}: {e}")
                   else:
                       print("No link found in result.")
               start += len(result_block)
               attempts += 1
           except requests.exceptions.RequestException as e:
               print(f"Error fetching search results: {e}")
               attempts += 1
   print(f"Total results fetched: {len(all_results)}")
   return all_results
def extract_text_from_webpage(html_content):
   """Extract visible text from HTML content"""
   soup = BeautifulSoup(html_content, 'html.parser')
   # Remove script and style elements
   for script in soup(["script", "style"]):
       script.decompose()
   # Get text
   text = soup.get_text()
   # Break into lines and remove leading and trailing space on each
   lines = (line.strip() for line in text.splitlines())
   # Break multi-headlines into a line each
   chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
   # Drop blank lines
   text = '\n'.join(chunk for chunk in chunks if chunk)
   return text
def filter_relevant_content(text):
   """Filter out irrelevant content"""
   # List of keywords related to financial reports
   keywords = ['revenue', 'profit', 'earnings', 'financial', 'quarter', 'fiscal', 'growth', 'income', 'loss', 'dividend']
   # Split the text into sentences
   sentences = re.split(r'(?<=[.!?])\s+', text)
   # Filter sentences containing at least one keyword
   relevant_sentences = [sentence for sentence in sentences if any(keyword in sentence.lower() for keyword in keywords)]
   # Join the relevant sentences back into a single string
   filtered_text = ' '.join(relevant_sentences)
   return filtered_text
def chunk_text(text, max_chunk_size=1000, overlap=100):
   # List of keywords that might indicate new sections
   section_keywords = ["revenue", "income", "profit", "loss", "expenses", "outlook", "forecast", "quarter", "year"]
   # Split text into sentences
   sentences = re.split(r'(?<=[.!?])\s+', text)
   chunks = []
   current_chunk = ""
   for sentence in sentences:
       if len(current_chunk) + len(sentence) > max_chunk_size:
           # If adding this sentence exceeds max_chunk_size, start a new chunk
           chunks.append(current_chunk.strip())
           current_chunk = sentence + " "
       elif any(keyword in sentence.lower() for keyword in section_keywords):
           # If sentence contains a section keyword, start a new chunk
           if current_chunk:
               chunks.append(current_chunk.strip())
           current_chunk = sentence + " "
       else:
           current_chunk += sentence + " "
   # Add the last chunk if it's not empty
   if current_chunk:
       chunks.append(current_chunk.strip())
   # Add overlap
   overlapped_chunks = []
   for i, chunk in enumerate(chunks):
       if i > 0:
           chunk = chunks[i-1][-overlap:] + chunk
       if i < len(chunks) - 1:
           chunk = chunk + chunks[i+1][:overlap]
       overlapped_chunks.append(chunk)
   return overlapped_chunks
def summarize_text(text, context_instructions):
   chunks = chunk_text(text, max_chunk_size=3000, overlap=200)
   summaries = []
   for chunk in chunks:
       prompt = f"""You are a financial analyst. Summarize the following text from a financial perspective:
{chunk}
{context_instructions}"""
       summary = query_llama({"inputs": prompt, "parameters": {"max_length": 1000}})
       if summary and isinstance(summary, list) and 'generated_text' in summary[0]:
           summaries.append(summary[0]['generated_text'])
   # Combine summaries
   combined_summary = "\n\n".join(summaries)
   # Final summarization of combined summaries
   final_prompt = f"""As a financial analyst, provide a coherent and comprehensive summary of the following financial information:
{combined_summary}
Focus on the most important financial implications and analysis."""
   final_summary = query_llama({"inputs": final_prompt, "parameters": {"max_length": 3000}})
   if final_summary and isinstance(final_summary, list) and 'generated_text' in final_summary[0]:
       return final_summary[0]['generated_text']
   else:
       return "Unable to generate summary due to an error."
def summarize_financial_news(query, read_pdf=False, pdf=None):
   """Search for financial news, extract relevant content
, and summarize"""
   all_filtered_text = ""
   if read_pdf and pdf is not None:
       pdf_text = extract_text_from_pdf(pdf)
       all_filtered_text += pdf_text + "\n\n"
   else:
       search_results = google_search(query, num_results=1)
       for result in search_results:
           if result['text']:
               filtered_text = filter_relevant_content(result['text'])
               all_filtered_text += filtered_text + "\n\n"
   if not all_filtered_text:
       return "No relevant financial information found."
   context_instructions = "Provide a detailed, coherent summary focusing on financial implications and analysis."
   return summarize_text(all_filtered_text, context_instructions)
def extract_text_from_pdf(pdf):
   """Extract text from each page of the PDF"""
   reader = PyPDF2.PdfFileReader(pdf)
   text = ""
   for page_num in range(reader.getNumPages()):
       page = reader.getPage(page_num)
       text += page.extract_text() + "\n"
   return text
# Gradio Interface
def interface_function(query, read_pdf, pdf):
   return summarize_financial_news(query, read_pdf, pdf)
iface = gr.Interface(
   fn=interface_function,
   inputs=[
       gr.Textbox(lines=2, placeholder="Enter a company name or financial topic..."),
       gr.Checkbox(label="Read PDF"),
       gr.File(label="Upload PDF", type="file")
   ],
   outputs="text",
   title="Financial News Summarizer",
   description="Enter a company name or financial topic to get a summary of recent financial news. Optionally, upload a PDF to summarize its content."
)
iface.launch()