Sigrid De los Santos
Remove remaining binary file for Hugging Face
9df4cc0
import warnings
warnings.filterwarnings("ignore")
import requests
from lxml import etree
from tqdm import tqdm
import pandas as pd
import json
import time
from finnlp.data_sources.social_media._base import Social_Media_Downloader
# TODO:
# 1. Better performance
import json
import time
import numpy as np
from selenium import webdriver
from selenium.webdriver.common.by import By
class Facebook_Streaming(Social_Media_Downloader):
def __init__(self, args = {}):
super().__init__(args)
self.dataframe = pd.DataFrame()
self.cookies = args["cookies"]
self.stealth_path = args["stealth_path"]
self.headless = args["headless"] if "headless" in args.keys() else True
def download_streaming_stock(self, keyword = "AAPL", rounds = 3, delay = 0.5):
# init
self._init_opt()
# search for the keyword
search_url = "https://m.facebook.com/search_results/?q=" + keyword
self.browser.get(search_url)
# click on the posts
post_element = self.browser.find_elements(By.XPATH, "/html/body/div[2]/div/div[2]/div[3]/div[1]")[0]
post_element.click()
time.sleep(5)
# click on recent posts
post_element = self.browser.find_elements(By.XPATH, "/html/body/div[2]/div/div[2]/div[3]/div[1]")[0]
post_element.click()
time.sleep(5)
# get data
all = []
title_divs = self.browser.find_elements(By.XPATH, "/html/body/div[2]/div/div[2]/div")
for title_div in tqdm(title_divs):
# title
try:
title = title_div.find_elements(By.XPATH,"./div[2]/div/div/div[2]/div/div/div/div")
if len(title)>0:
title = title[0].text
else:
title = np.nan
except Exception as e:
print(e)
title = np.nan
# time
try:
time_element = title_div.find_elements(By.XPATH, './div[2]/div/div/div[1]/div/div/div/div[2]/div[2]/div/span')
if len(time_element)>0:
time_ = time_element[0].text
else:
time_ = np.nan
except:
time_ = np.nan
all.append((title, time_))
# close browser
self.browser.close()
tmp = pd.DataFrame(all, columns=["content", "date"])
self.dataframe = pd.concat([self.dataframe, tmp])
self.dataframe = self.dataframe.dropna(how="all")
print("Only support the first page now!")
def _init_opt(self):
self.chromeOptions = webdriver.ChromeOptions()
if self.headless:
self.chromeOptions.add_argument('--headless')
self.chromeOptions.add_argument('--disable-blink-features=AutomationControlled')
self.chromeOptions.add_argument("--user-agent=Mozilla/5.0 (iPhone; CPU iPhone OS 14_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1")
self.chromeOptions.add_experimental_option('excludeSwitches', ['enable-automation'])
self.browser = webdriver.Chrome(options=self.chromeOptions)
with open(self.stealth_path) as f:
js = f.read()
self.browser.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": js
})
self.browser.get('https://m.facebook.com/')
self.browser.delete_all_cookies()
for i in self.cookies:
self.browser.add_cookie(i)
self.browser.implicitly_wait(2)