BloodyInside's picture
new
cffd4ca
from bs4 import BeautifulSoup
from pprint import pprint
from ..utils import SeleniumScraper
from selenium.webdriver.common.by import By
from core.settings import BASE_DIR
from PIL import Image
from backend.module.utils import date_utils
import json,time, threading,os, uuid, sqlite3, io, base64, sys
MAX_TIMEOUT = 10
scraper = None
def __scrollToBottom(driver:object=None):
if not driver: raise ValueError("The 'driver' argument is required.")
timeout = date_utils.utc_time().add(60,'second').get()
previous_height = 0
scrolledY = 0
while True:
if date_utils.utc_time().get() >= timeout: raise Exception("[Get Chapter] Finding lastest element Timed out!")
# Scroll to the bottom of the page
driver.execute_script(f"window.scrollBy(0, {scrolledY});")
current_height = driver.execute_script("return document.documentElement.scrollHeight")
if current_height > previous_height:
previous_height = current_height
else:
parent_div = driver.find_element(By.CLASS_NAME, "mh_mangalist")
child_elements = parent_div.find_elements(By.XPATH, "./*")
if child_elements[-1].get_attribute('text') != '__cad.read_periodical();': break
scrolledY += 50
def scrap(comic_id:str="",chapter_id:str="",output_dir:str=""):
if not comic_id: raise ValueError("The 'comic_id' parameter is required.")
if not chapter_id: raise ValueError("The 'chapter_id' parameter is required.")
if not output_dir: raise ValueError("The 'output_dir' parameter is required.")
global scraper
try:
url = f"https://www.colamanga.com/{chapter_id}"
if not scraper: scraper = SeleniumScraper()
driver = scraper.driver()
driver.get(url)
__scrollToBottom(driver=driver)
parent_element = driver.find_element(By.ID, "mangalist")
child_list = parent_element.find_elements(By.CLASS_NAME, "mh_comicpic")
blob_list = []
for child in child_list:
timeout = date_utils.utc_time().add(5,'second').get()
while True:
if date_utils.utc_time().get() > timeout: break
image_element = child.find_element(By.TAG_NAME, "img")
driver.execute_script("arguments[0].scrollIntoView({ behavior: 'smooth', block: 'center' });", image_element)
url = image_element.get_attribute("src")
if url:
is_image_loaded = driver.execute_script(
"return arguments[0].complete",
image_element
)
if is_image_loaded:
blob_list.append(url)
break
def process_browser_log_entry(entry):
response = json.loads(entry['message'])['message']
return response
browser_log = driver.get_log('performance')
events = [process_browser_log_entry(entry) for entry in browser_log]
events = [event for event in events if 'Network.response' in event['method']]
for e in events:
if e.get("params").get("type") == "Image":
url = e.get("params").get("response").get("url")
if url.split(":")[0] == "blob":
request_id = e["params"]["requestId"]
response = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
img = Image.open(io.BytesIO(base64.decodebytes(bytes(response.get("body"), "utf-8"))))
chapter_id = chapter_id.split("/")[-1].split(".")[0]
dir = os.path.join(output_dir)
os.makedirs(dir, exist_ok=True)
img.save(os.path.join(dir,f"{blob_list.index(url)}.png"))
return {"status":"success"}
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
line_number = exc_tb.tb_lineno
print(f"Error on line {line_number}: {e}")
raise Exception(e)
finally: pass
if __name__ == "__main__":
DATA = scrap(chapter_id="manga-gu881388",chapter=334)
# with open("./temp.html","w", encoding='utf-8') as f:
# f.write(ul.prettify()) # Write each element prettified
# pprint(DATA)