Sigrid De los Santos
Remove remaining binary file for Hugging Face
9df4cc0
from finnlp.data_sources.company_announcement._base import Company_Announcement_Downloader
import requests
import time
import json
import os
import pandas as pd
from tqdm import tqdm
from PyPDF2 import PdfReader
class Juchao_Announcement(Company_Announcement_Downloader):
def __init__(self, args = {}):
super().__init__(args)
self.dataframe = pd.DataFrame()
def download_date_range_stock(self,start_date, end_date, stock = "000001",max_page = 100, searchkey= "", get_content = False, save_dir = "./tmp/" , delate_pdf = False):
self.org_dict = self._get_orgid()
# download the first page
res = self._get_open_page(start_date, end_date, stock, 1, searchkey)
total_pages = res["totalpages"]+1
if res["announcements"] is None:
print(f"Nothing related to your searchkey({searchkey}) is found, you may try another one or just leave it blank")
else:
tmp_df = self._process_data(res)
self.dataframe = pd.concat([self.dataframe, tmp_df])
page = 2
# download other page
pbar = tqdm(total=total_pages,desc="Downloading by page...")
for _ in range(max_page):
res = self._get_open_page(start_date, end_date, stock, page, searchkey)
if res["announcements"] is None:
break
tmp_df = self._process_data(res)
self.dataframe = pd.concat([self.dataframe, tmp_df])
pbar.update(1)
page += 1
pbar.update(1)
# Convert Time
self.dataframe.announcementTime = self.dataframe.announcementTime.apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(x/1000)))
self.dataframe.announcementTime = pd.to_datetime(self.dataframe.announcementTime)
if get_content:
pbar = tqdm(total=self.dataframe.shape[0], desc="Getting the text data...")
self.dataframe[["PDF_path","Content"]] = self.dataframe.apply(lambda x: self._get_pdfs(x,save_dir, delate_pdf, pbar),axis= 1,result_type = "expand")
if delate_pdf:
os.removedirs(save_dir)
self.dataframe = self.dataframe.reset_index(drop = True)
def _get_open_page(self,start_date,end_date, stock,page, searchkey):
url = "http://www.cninfo.com.cn/new/hisAnnouncement/query?"
headers = {
"Referer": "http://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search&lastPage=index",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
}
data = {
"pageNum": page,
"pageSize": "30",
"column": "szse",
"tabName": "fulltext",
"plate":"",
"stock":stock + "," + self.org_dict[stock] ,
"searchkey": searchkey,
"secid":"",
"category":"",
"trade":"",
"seDate": f"{start_date}~{end_date}",
"sortName": "",
"sortType": "",
"isHLtitle": "true",
}
res = requests.post(url = url, headers = headers, data = data)
if res.status_code != 200:
raise ConnectionError
res = json.loads(res.text)
return res
def _process_data(self,res):
if res is None:
return res
else:
return pd.DataFrame(res["announcements"])
def _get_pdfs(self,x, save_dir, delate_pdf,pbar):
os.makedirs(save_dir, exist_ok= True)
adjunctUrl = x.adjunctUrl
pdf_base_url = "http://static.cninfo.com.cn/"
pdf_url = pdf_base_url + adjunctUrl
responsepdf = self._request_get(pdf_url)
if responsepdf is None:
pbar.update(1)
return ("Failed Download","Failed Download")
else:
# make preparations
file_name = x.announcementTitle
file_name = "".join(file_name.split("<em>"))
file_name = "".join(file_name.split("</em>"))
file_name
file_name = f"{x.secCode}_{x.secName}_{file_name}.pdf"
file_path = os.path.join(save_dir, file_name)
# save pdf
with open(file_path, "wb") as f:
f.write(responsepdf.content)
# analyze pdf
with open(file_path, "rb") as filehandle:
pdf = PdfReader(filehandle)
text_all = ""
for page in pdf.pages:
text = page.extract_text()
text = "".join(text.split("\n"))
text_all += text
pbar.update(1)
if delate_pdf:
os.remove(file_path)
return ("removed", text_all)
else:
return (file_path, text_all)
def _get_orgid(self):
org_dict = {}
org_json = self._request_get("http://www.cninfo.com.cn/new/data/szse_stock.json").json()["stockList"]
for i in range(len(org_json)):
org_dict[org_json[i]["code"]] = org_json[i]["orgId"]
return org_dict