Spaces:
Running
Running
File size: 5,271 Bytes
9df4cc0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
from finnlp.data_sources.company_announcement._base import Company_Announcement_Downloader
import requests
import time
import json
import os
import pandas as pd
from tqdm import tqdm
from PyPDF2 import PdfReader
class Juchao_Announcement(Company_Announcement_Downloader):
def __init__(self, args = {}):
super().__init__(args)
self.dataframe = pd.DataFrame()
def download_date_range_stock(self,start_date, end_date, stock = "000001",max_page = 100, searchkey= "", get_content = False, save_dir = "./tmp/" , delate_pdf = False):
self.org_dict = self._get_orgid()
# download the first page
res = self._get_open_page(start_date, end_date, stock, 1, searchkey)
total_pages = res["totalpages"]+1
if res["announcements"] is None:
print(f"Nothing related to your searchkey({searchkey}) is found, you may try another one or just leave it blank")
else:
tmp_df = self._process_data(res)
self.dataframe = pd.concat([self.dataframe, tmp_df])
page = 2
# download other page
pbar = tqdm(total=total_pages,desc="Downloading by page...")
for _ in range(max_page):
res = self._get_open_page(start_date, end_date, stock, page, searchkey)
if res["announcements"] is None:
break
tmp_df = self._process_data(res)
self.dataframe = pd.concat([self.dataframe, tmp_df])
pbar.update(1)
page += 1
pbar.update(1)
# Convert Time
self.dataframe.announcementTime = self.dataframe.announcementTime.apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(x/1000)))
self.dataframe.announcementTime = pd.to_datetime(self.dataframe.announcementTime)
if get_content:
pbar = tqdm(total=self.dataframe.shape[0], desc="Getting the text data...")
self.dataframe[["PDF_path","Content"]] = self.dataframe.apply(lambda x: self._get_pdfs(x,save_dir, delate_pdf, pbar),axis= 1,result_type = "expand")
if delate_pdf:
os.removedirs(save_dir)
self.dataframe = self.dataframe.reset_index(drop = True)
def _get_open_page(self,start_date,end_date, stock,page, searchkey):
url = "http://www.cninfo.com.cn/new/hisAnnouncement/query?"
headers = {
"Referer": "http://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search&lastPage=index",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
}
data = {
"pageNum": page,
"pageSize": "30",
"column": "szse",
"tabName": "fulltext",
"plate":"",
"stock":stock + "," + self.org_dict[stock] ,
"searchkey": searchkey,
"secid":"",
"category":"",
"trade":"",
"seDate": f"{start_date}~{end_date}",
"sortName": "",
"sortType": "",
"isHLtitle": "true",
}
res = requests.post(url = url, headers = headers, data = data)
if res.status_code != 200:
raise ConnectionError
res = json.loads(res.text)
return res
def _process_data(self,res):
if res is None:
return res
else:
return pd.DataFrame(res["announcements"])
def _get_pdfs(self,x, save_dir, delate_pdf,pbar):
os.makedirs(save_dir, exist_ok= True)
adjunctUrl = x.adjunctUrl
pdf_base_url = "http://static.cninfo.com.cn/"
pdf_url = pdf_base_url + adjunctUrl
responsepdf = self._request_get(pdf_url)
if responsepdf is None:
pbar.update(1)
return ("Failed Download","Failed Download")
else:
# make preparations
file_name = x.announcementTitle
file_name = "".join(file_name.split("<em>"))
file_name = "".join(file_name.split("</em>"))
file_name
file_name = f"{x.secCode}_{x.secName}_{file_name}.pdf"
file_path = os.path.join(save_dir, file_name)
# save pdf
with open(file_path, "wb") as f:
f.write(responsepdf.content)
# analyze pdf
with open(file_path, "rb") as filehandle:
pdf = PdfReader(filehandle)
text_all = ""
for page in pdf.pages:
text = page.extract_text()
text = "".join(text.split("\n"))
text_all += text
pbar.update(1)
if delate_pdf:
os.remove(file_path)
return ("removed", text_all)
else:
return (file_path, text_all)
def _get_orgid(self):
org_dict = {}
org_json = self._request_get("http://www.cninfo.com.cn/new/data/szse_stock.json").json()["stockList"]
for i in range(len(org_json)):
org_dict[org_json[i]["code"]] = org_json[i]["orgId"]
return org_dict |