Spaces:
Running
Running
import warnings | |
warnings.filterwarnings("ignore") | |
from finnlp.data_sources.news._base import News_Downloader | |
from tqdm import tqdm | |
from lxml import etree | |
import pandas as pd | |
import requests | |
import finnhub | |
import time | |
import json | |
class Finnhub_Date_Range(News_Downloader): | |
def __init__(self, args = {}): | |
super().__init__(args) | |
assert "token" in args.keys(), "Please input your finnhub token. Avaliable at https://finnhub.io/dashboard" | |
self.finnhub_client = finnhub.Client(api_key=args["token"]) | |
def download_date_range_stock(self, start_date, end_date, stock = "AAPL"): | |
self.date_list = pd.date_range(start_date,end_date) | |
self.dataframe = pd.DataFrame() | |
days_each_time = 4 | |
date_list = self.date_list | |
# cal total lenth | |
if len(date_list)%days_each_time == 0: | |
total = len(date_list)//days_each_time | |
else: | |
total = len(date_list)//days_each_time+1 | |
with tqdm(total=total, desc= "Downloading Titles") as bar: | |
while len(date_list): | |
tmp_date_list = date_list[:days_each_time] | |
date_list = date_list[days_each_time:] | |
tmp_start_date = tmp_date_list[0].strftime("%Y-%m-%d") | |
tmp_end_date = tmp_date_list[-1].strftime("%Y-%m-%d") | |
res = self._gather_one_part(tmp_start_date,tmp_end_date,stock = stock ) | |
self.dataframe = pd.concat([self.dataframe,res]) | |
bar.update(1) | |
# res = self.finnhub_client.company_news(stock, _from=start_date, to=end_date) | |
self.dataframe.datetime = pd.to_datetime(self.dataframe.datetime,unit = "s") | |
self.dataframe = self.dataframe.reset_index(drop = True) | |
def _gather_one_part(self, start_date, end_date, stock = "AAPL", delay = 1): | |
res = self.finnhub_client.company_news(stock, _from=start_date, to=end_date) | |
time.sleep(delay) | |
return pd.DataFrame(res) | |
def gather_content(self, delay = 0.01): | |
pbar = tqdm(total = self.dataframe.shape[0], desc= "Gathering news contents") | |
self.dataframe["content"] = self.dataframe.apply(lambda x:self._gather_content_apply(x, pbar, delay), axis = 1) | |
def _gather_content_apply(self,x, pbar, delay = 0.01): | |
time.sleep(delay) | |
url = x.url | |
source = x.source | |
response = self._request_get(url = url) | |
# response = self._request_get(url= url, headers= headers) | |
pbar.update(1) | |
if response is None: | |
return "Connection Error" | |
else: | |
page = etree.HTML(response.text) | |
try: | |
# Yahoo Finance | |
if source == "Yahoo": | |
page = page.xpath("/html/body/div[3]/div[1]/div/main/div[1]/div/div/div/div/article/div/div/div/div/div/div[2]/div[4]") | |
content = page[0].xpath(".//text()") | |
content = "\n".join(content) | |
return content | |
# Reuters | |
elif source == "Reuters": | |
page = page.xpath("/html/body/div[1]/div[3]/div/main/article/div[1]/div[2]/div/div/div[2]") | |
content = page[0].xpath(".//text()") | |
content = "\n".join(content) | |
return content | |
# SeekingAlpha | |
elif source == "SeekingAlpha": | |
page = page.xpath("/html/body/div[2]/div/div[1]/main/div/div[2]/div/article/div/div/div[2]/div/section[1]/div/div/div") | |
content = page[0].xpath(".//text()") | |
content = "\n".join(content) | |
return content | |
# PennyStocks | |
elif source == "PennyStocks": | |
page = page.xpath("/html/body/div[3]/div/div[1]/div/div/div/main/article/div[2]/div[2]/div") | |
content = page[0].xpath(".//text()") | |
content = "\n".join(content) | |
return content | |
# MarketWatch | |
elif source == "MarketWatch": | |
page = page.xpath('//*[@id="js-article__body"]') | |
content = page[0].xpath(".//text()") | |
content = "".join(content) | |
while " " in content: | |
content = content.replace(" ", " ") | |
while "\n \n"in content: | |
content = content.replace("\n \n", " ") | |
while "\n "in content: | |
content = content.replace("\n ", " ") | |
return content | |
# Seeking Alpha | |
elif source == "Seeking Alpha": | |
# first get Seeking Alpha URL | |
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href') | |
url_new = page[0] | |
response = self._request_get(url= url_new) | |
if response is None: | |
return "Connection Error" | |
else: | |
page = etree.HTML(response.text) | |
content = page[0].xpath(".//text()") | |
content = "\n".join(content) | |
return content | |
# Alliance News | |
elif source == "Alliance News": | |
page = page.xpath('//*[@id="comtext"]') | |
content = page[0].xpath(".//text()") | |
content = [c for c in content if not str(c).startswith("\r\n")] | |
content = "\n".join(content) | |
return content | |
# Thefly.com | |
elif source == "Thefly.com": | |
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href') | |
url_new = page[0] | |
response = self._request_get(url= url_new, verify= False) | |
if response is None: | |
return "Connection Error" | |
else: | |
page = etree.HTML(response.text) | |
page = page.xpath('/html/body/div[2]/div/div/div/div/div[2]/div[2]//text()') | |
# content = page[0].xpath(".//text()") | |
# content = [c for c in content if not str(c).startswith("\r\n")] | |
content = "\n".join(page) | |
content = content.replace("\r\n","") | |
return content | |
# TalkMarkets | |
elif source == "TalkMarkets": | |
return "Not supported yet" | |
# CNBC | |
elif source == "CNBC": | |
page = page.xpath('/html/body/div[3]/div/div[1]/div[3]/div/div/div/div[3]/div[1]/div[2]/div[3]//text()') | |
content = "\n".join(page) | |
return content | |
# GuruFocus | |
elif source == "GuruFocus": | |
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href') | |
url_new = page[0] | |
response = self._request_get(url= url_new) | |
if response is None: | |
return "Connection Error" | |
else: | |
page = etree.HTML(response.text) | |
page = page.xpath('/html/body/div[1]/div/section/section/main/section/main/div[1]/div/div/div[1]/div[2]/div//text()') | |
page_new = [] | |
for c in page: | |
while "\n" in c: | |
c = c.replace("\n","") | |
while " "in c: | |
c = c.replace(" ","") | |
page_new.append(c) | |
content = "\n".join(page_new) | |
return content | |
# InvestorPlace | |
elif source == "InvestorPlace": | |
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href') | |
url_new = page[0] | |
response = self._request_get(url= url_new) | |
if response is None: | |
return "Connection Error" | |
else: | |
page = etree.HTML(response.text) | |
page = page.xpath('//script[@type="application/ld+json"]')[1] | |
content = page.xpath(".//text()") | |
content = json.loads(content[0]) | |
content = content["articleBody"] | |
return content | |
# TipRanks | |
elif source == "TipRanks": | |
page = page.xpath('/html/body/div[5]/div[2]/section[1]/article[2]/div/div[2]/p/a/@href') | |
url_new = page[0] | |
response = self._request_get(url= url_new) | |
if response is None: | |
return "Connection Error" | |
else: | |
page = etree.HTML(response.text) | |
# /html/body/div[1]/div[2]/div[5]/div[2]/div[2]/div/div[6]/div/article/p[1]/p | |
page = page.xpath('/html/body/div[1]/div[1]/div[4]/div[2]/div[2]/div[1]/div[6]//text()') | |
# content = page[0].xpath('.//text()') | |
page = [p.replace("\n","") for p in page] | |
content = "".join(page) | |
return content | |
else: | |
return "Not supported yet" | |
except: | |
return "Error" | |