Sigrid De los Santos
Remove remaining binary file for Hugging Face
9df4cc0
import warnings
warnings.filterwarnings("ignore")
from finnlp.data_sources.news._base import News_Downloader
from tqdm import tqdm
from lxml import etree
import pandas as pd
import requests
import finnhub
import time
import json
class Yahoo_Date_Range(News_Downloader):
def __init__(self, args = {}):
super().__init__(args)
assert "token" in args.keys(), "Please input your finnhub token. Avaliable at https://finnhub.io/dashboard"
self.finnhub_client = finnhub.Client(api_key=args["token"])
def download_date_range_stock(self, start_date, end_date, stock = "AAPL"):
self.date_list = pd.date_range(start_date,end_date)
self.dataframe = pd.DataFrame()
days_each_time = 4
date_list = self.date_list
# cal total lenth
if len(date_list)%days_each_time == 0:
total = len(date_list)//days_each_time
else:
total = len(date_list)//days_each_time+1
with tqdm(total=total, desc= "Downloading Titles") as bar:
while len(date_list):
tmp_date_list = date_list[:days_each_time]
date_list = date_list[days_each_time:]
tmp_start_date = tmp_date_list[0].strftime("%Y-%m-%d")
tmp_end_date = tmp_date_list[-1].strftime("%Y-%m-%d")
res = self._gather_one_part(tmp_start_date,tmp_end_date,stock = stock )
self.dataframe = pd.concat([self.dataframe,res])
bar.update(1)
# res = self.finnhub_client.company_news(stock, _from=start_date, to=end_date)
self.dataframe.datetime = pd.to_datetime(self.dataframe.datetime,unit = "s")
self.dataframe = self.dataframe.reset_index(drop = True)
def _gather_one_part(self, start_date, end_date, stock = "AAPL", delay = 1):
res = self.finnhub_client.company_news(stock, _from=start_date, to=end_date)
time.sleep(delay)
return pd.DataFrame(res)
def gather_content(self, delay = 0.01):
pbar = tqdm(total = self.dataframe.shape[0], desc= "Gathering news contents")
self.dataframe["content"] = self.dataframe.apply(lambda x:self._gather_content_apply(x, pbar, delay), axis = 1)
def _gather_content_apply(self,x, pbar, delay = 0.01):
time.sleep(delay)
url = x.url
source = x.source
response = self._request_get(url = url)
# response = self._request_get(url= url, headers= headers)
pbar.update(1)
if response is None:
return "Connection Error"
else:
page = etree.HTML(response.text)
try:
# Yahoo Finance
if source == "Yahoo":
page = page.xpath("/html/body/div[3]/div[1]/div/main/div[1]/div/div/div/div/article/div/div/div/div/div/div[2]/div[4]")
content = page[0].xpath(".//text()")
content = "\n".join(content)
return content
# Reuters
elif source == "Reuters":
page = page.xpath("/html/body/div[1]/div[3]/div/main/article/div[1]/div[2]/div/div/div[2]")
content = page[0].xpath(".//text()")
content = "\n".join(content)
return content
# SeekingAlpha
elif source == "SeekingAlpha":
page = page.xpath("/html/body/div[2]/div/div[1]/main/div/div[2]/div/article/div/div/div[2]/div/section[1]/div/div/div")
content = page[0].xpath(".//text()")
content = "\n".join(content)
return content
# PennyStocks
elif source == "PennyStocks":
page = page.xpath("/html/body/div[3]/div/div[1]/div/div/div/main/article/div[2]/div[2]/div")
content = page[0].xpath(".//text()")
content = "\n".join(content)
return content
# MarketWatch
elif source == "MarketWatch":
page = page.xpath('//*[@id="js-article__body"]')
content = page[0].xpath(".//text()")
content = "".join(content)
while " " in content:
content = content.replace(" ", " ")
while "\n \n"in content:
content = content.replace("\n \n", " ")
while "\n "in content:
content = content.replace("\n ", " ")
return content
# Seeking Alpha
elif source == "Seeking Alpha":
# first get Seeking Alpha URL
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href')
url_new = page[0]
response = self._request_get(url= url_new)
if response is None:
return "Connection Error"
else:
page = etree.HTML(response.text)
content = page[0].xpath(".//text()")
content = "\n".join(content)
return content
# Alliance News
elif source == "Alliance News":
page = page.xpath('//*[@id="comtext"]')
content = page[0].xpath(".//text()")
content = [c for c in content if not str(c).startswith("\r\n")]
content = "\n".join(content)
return content
# Thefly.com
elif source == "Thefly.com":
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href')
url_new = page[0]
response = self._request_get(url= url_new, verify= False)
if response is None:
return "Connection Error"
else:
page = etree.HTML(response.text)
page = page.xpath('/html/body/div[2]/div/div/div/div/div[2]/div[2]//text()')
# content = page[0].xpath(".//text()")
# content = [c for c in content if not str(c).startswith("\r\n")]
content = "\n".join(page)
content = content.replace("\r\n","")
return content
# TalkMarkets
elif source == "TalkMarkets":
return "Not supported yet"
# CNBC
elif source == "CNBC":
page = page.xpath('/html/body/div[3]/div/div[1]/div[3]/div/div/div/div[3]/div[1]/div[2]/div[3]//text()')
content = "\n".join(page)
return content
# GuruFocus
elif source == "GuruFocus":
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href')
url_new = page[0]
response = self._request_get(url= url_new)
if response is None:
return "Connection Error"
else:
page = etree.HTML(response.text)
page = page.xpath('/html/body/div[1]/div/section/section/main/section/main/div[1]/div/div/div[1]/div[2]/div//text()')
page_new = []
for c in page:
while "\n" in c:
c = c.replace("\n","")
while " "in c:
c = c.replace(" ","")
page_new.append(c)
content = "\n".join(page_new)
return content
# InvestorPlace
elif source == "InvestorPlace":
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href')
url_new = page[0]
response = self._request_get(url= url_new)
if response is None:
return "Connection Error"
else:
page = etree.HTML(response.text)
page = page.xpath('//script[@type="application/ld+json"]')[1]
content = page.xpath(".//text()")
content = json.loads(content[0])
content = content["articleBody"]
return content
# TipRanks
elif source == "TipRanks":
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href')
url_new = page[0]
response = self._request_get(url= url_new)
if response is None:
return "Connection Error"
else:
page = etree.HTML(response.text)
# /html/body/div[1]/div[2]/div[5]/div[2]/div[2]/div/div[6]/div/article/p[1]/p
page = page.xpath('/html/body/div[1]/div[1]/div[4]/div[2]/div[2]/div[1]/div[6]//text()')
# content = page[0].xpath('.//text()')
page = [p.replace("\n","") for p in page]
content = "".join(page)
return content
else:
return "Not supported yet"
except:
return "Error"