Sigrid De los Santos
Remove remaining binary file for Hugging Face
9df4cc0
from finnlp.data_sources.company_announcement._base import Company_Announcement_Downloader
from tqdm import tqdm
from lxml import etree
import pandas as pd
import requests
import json
import time
class SEC_Announcement(Company_Announcement_Downloader):
def __init__(self, args = {}):
super().__init__(args)
self.dataframe = pd.DataFrame()
def download_date_range_stock(self, start_date, end_date, stock = "AAPL", delay = 0.1):
entityName = self._get_entity_name(stock)
# first page
total_pages = self._gather_one_page(start_date, end_date, 1, entityName, delay)
# other pages
if total_pages>1:
for page in tqdm(range(1, total_pages), desc="Downloading other page..."):
self._gather_one_page(start_date, end_date, page + 1, entityName, delay )
self.dataframe = self.dataframe.reset_index(drop = True)
def _get_entity_name(self, stock = "AAPL"):
url = "https://efts.sec.gov/LATEST/search-index"
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
}
params = {
"keysTyped":stock
}
resp = self._request_get(url = url, headers= headers, params= params)
if resp is None:
raise ConnectionError("Can't get entity name")
res = json.loads(resp.text)
item_list = res["hits"]["hits"]
entityName_list = []
for item in item_list:
c_name_one = item["_source"]["entity_words"]
c_name_two = item["_id"].zfill(10)
entityName = f"{c_name_one} (CIK {c_name_two})"
entityName_list.append(entityName)
entityName = entityName_list[0]
return entityName
def _gather_one_page(self, start_date, end_date, page, entityName = "Apple Inc. (AAPL) (CIK 0000320193)", delay = 0.01):
from_ = (page-1)*100
url = "https://efts.sec.gov/LATEST/search-index"
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
}
params = {
"dateRange": "all",
"entityName": entityName,
"startdt": start_date,
"enddt": end_date,
"from" : from_,
"page" : page,
}
resp = self._request_get(url = url, headers= headers, params= params)
if resp is None:
return 'Error'
res = json.loads(resp.text)
# total
total_items = res["hits"]["total"]["value"]
if total_items % 100 == 0:
total_pages = total_items // 100
else:
total_pages = total_items // 100 + 1
items = res["hits"]["hits"]
url_base = "https://www.sec.gov/Archives/edgar/data"
for item in tqdm(items, desc="Downloading by item..." ):
url_third = item["_source"]["xsl"]
url_second, url_fourth = item["_id"].split(":")
url_second = url_second.split("-")
url_first = url_second[0]
url_first = url_first.strip("0")
url_second = ''.join(url_second)
url_first, url_second, url_fourth
if url_third is not None:
url_new = f"{url_base}/{url_first}/{url_second}/{url_third}/{url_fourth}"
else:
url_new = f"{url_base}/{url_first}/{url_second}/{url_fourth}"
respn = self._request_get(url = url_new, headers= headers)
if respn is None:
continue
try:
res = etree.HTML(respn.text)
content = res.xpath("/html/body//text()")
content = [c for c in content if c != "\n"]
content = "".join(content)
_id = item["_id"]
ciks = item["_source"]["ciks"]
period_ending = item["_source"]["period_ending"]
root_form = item["_source"]["root_form"]
file_num = item["_source"]["file_num"]
display_names = item["_source"]["display_names"]
xsl = item["_source"]["xsl"]
sequence = item["_source"]["sequence"]
file_date = item["_source"]["file_date"]
biz_states = item["_source"]["biz_states"]
sics = item["_source"]["sics"]
form = item["_source"]["form"]
adsh = item["_source"]["adsh"]
film_num = item["_source"]["film_num"]
biz_locations = item["_source"]["biz_locations"]
file_type = item["_source"]["file_type"]
file_description = item["_source"]["file_description"]
inc_states = item["_source"]["inc_states"]
ite = item["_source"]["items"]
data = [
_id, ciks, period_ending, root_form, file_num, display_names, xsl, sequence,
file_date, biz_states, sics, form, adsh, film_num, biz_locations, file_type,
file_description, inc_states, ite, content
]
columns = [
"_id", "ciks", "period_ending", "root_form", "file_num", "display_names", "xsl", "sequence",
"file_date", "biz_states", "sics", "form", "adsh", "film_num", "biz_locations", "file_type",
"file_description", "inc_states", "ite", "content"
]
tmp = pd.DataFrame(data = data).T
tmp.columns = columns
self.dataframe = pd.concat([self.dataframe, tmp])
time.sleep(delay)
except:
continue
return total_pages