Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import time | |
| import re | |
| import json | |
| from datetime import datetime | |
| from typing import List, Dict, Type | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| from pydantic import BaseModel, Field, create_model | |
| import html2text | |
| import tiktoken | |
| from dotenv import load_dotenv | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.action_chains import ActionChains | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from openai import OpenAI | |
| import google.generativeai as genai | |
| from groq import Groq | |
| from assets import USER_AGENTS,PRICING,HEADLESS_OPTIONS,SYSTEM_MESSAGE,USER_MESSAGE,LLAMA_MODEL_FULLNAME,GROQ_LLAMA_MODEL_FULLNAME | |
| load_dotenv() | |
| # Set up the Chrome WebDriver options | |
| def setup_selenium(): | |
| options = Options() | |
| # Randomly select a user agent from the imported list | |
| user_agent = random.choice(USER_AGENTS) | |
| options.add_argument(f"user-agent={user_agent}") | |
| # Add other options | |
| for option in HEADLESS_OPTIONS: | |
| options.add_argument(option) | |
| # Specify the path to the ChromeDriver | |
| service = Service(r"./chromedriver-win64/chromedriver.exe") | |
| # Initialize the WebDriver | |
| driver = webdriver.Chrome(service=service, options=options) | |
| return driver | |
| def click_accept_cookies(driver): | |
| """ | |
| Tries to find and click on a cookie consent button. It looks for several common patterns. | |
| """ | |
| try: | |
| # Wait for cookie popup to load | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.XPATH, "//button | //a | //div")) | |
| ) | |
| # Common text variations for cookie buttons | |
| accept_text_variations = [ | |
| "accept", "agree", "allow", "consent", "continue", "ok", "I agree", "got it" | |
| ] | |
| # Iterate through different element types and common text variations | |
| for tag in ["button", "a", "div"]: | |
| for text in accept_text_variations: | |
| try: | |
| # Create an XPath to find the button by text | |
| element = driver.find_element(By.XPATH, f"//{tag}[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text}')]") | |
| if element: | |
| element.click() | |
| print(f"Clicked the '{text}' button.") | |
| return | |
| except: | |
| continue | |
| print("No 'Accept Cookies' button found.") | |
| except Exception as e: | |
| print(f"Error finding 'Accept Cookies' button: {e}") | |
| def fetch_html_selenium(url): | |
| driver = setup_selenium() | |
| try: | |
| driver.get(url) | |
| # Add random delays to mimic human behavior | |
| time.sleep(1) # Adjust this to simulate time for user to read or interact | |
| driver.maximize_window() | |
| # Try to find and click the 'Accept Cookies' button | |
| # click_accept_cookies(driver) | |
| # Add more realistic actions like scrolling | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);") | |
| time.sleep(random.uniform(1.1, 1.8)) # Simulate time taken to scroll and read | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight/1.2);") | |
| time.sleep(random.uniform(1.1, 1.8)) | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight/1);") | |
| time.sleep(random.uniform(1.1, 2.1)) | |
| html = driver.page_source | |
| return html | |
| finally: | |
| driver.quit() | |
| def clean_html(html_content): | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Remove headers and footers based on common HTML tags or classes | |
| for element in soup.find_all(['header', 'footer']): | |
| element.decompose() # Remove these tags and their content | |
| return str(soup) | |
| def html_to_markdown_with_readability(html_content): | |
| cleaned_html = clean_html(html_content) | |
| # Convert to markdown | |
| markdown_converter = html2text.HTML2Text() | |
| markdown_converter.ignore_links = False | |
| markdown_content = markdown_converter.handle(cleaned_html) | |
| return markdown_content | |
| def save_raw_data(raw_data: str, output_folder: str, file_name: str): | |
| """Save raw markdown data to the specified output folder.""" | |
| os.makedirs(output_folder, exist_ok=True) | |
| raw_output_path = os.path.join(output_folder, file_name) | |
| with open(raw_output_path, 'w', encoding='utf-8') as f: | |
| f.write(raw_data) | |
| print(f"Raw data saved to {raw_output_path}") | |
| return raw_output_path | |
| def remove_urls_from_file(file_path): | |
| # Regex pattern to find URLs | |
| url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
| # Construct the new file name | |
| base, ext = os.path.splitext(file_path) | |
| new_file_path = f"{base}_cleaned{ext}" | |
| # Read the original markdown content | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| markdown_content = file.read() | |
| # Replace all found URLs with an empty string | |
| cleaned_content = re.sub(url_pattern, '', markdown_content) | |
| # Write the cleaned content to a new file | |
| with open(new_file_path, 'w', encoding='utf-8') as file: | |
| file.write(cleaned_content) | |
| print(f"Cleaned file saved as: {new_file_path}") | |
| return cleaned_content | |
| def create_dynamic_listing_model(field_names: List[str]) -> Type[BaseModel]: | |
| """ | |
| Dynamically creates a Pydantic model based on provided fields. | |
| field_name is a list of names of the fields to extract from the markdown. | |
| """ | |
| # Create field definitions using aliases for Field parameters | |
| field_definitions = {field: (str, ...) for field in field_names} | |
| # Dynamically create the model with all field | |
| return create_model('DynamicListingModel', **field_definitions) | |
| def create_listings_container_model(listing_model: Type[BaseModel]) -> Type[BaseModel]: | |
| """ | |
| Create a container model that holds a list of the given listing model. | |
| """ | |
| return create_model('DynamicListingsContainer', listings=(List[listing_model], ...)) | |
| def trim_to_token_limit(text, model, max_tokens=120000): | |
| encoder = tiktoken.encoding_for_model(model) | |
| tokens = encoder.encode(text) | |
| if len(tokens) > max_tokens: | |
| trimmed_text = encoder.decode(tokens[:max_tokens]) | |
| return trimmed_text | |
| return text | |
| def generate_system_message(listing_model: BaseModel) -> str: | |
| """ | |
| Dynamically generate a system message based on the fields in the provided listing model. | |
| """ | |
| # Use the model_json_schema() method to introspect the Pydantic model | |
| schema_info = listing_model.model_json_schema() | |
| # Extract field descriptions from the schema | |
| field_descriptions = [] | |
| for field_name, field_info in schema_info["properties"].items(): | |
| # Get the field type from the schema info | |
| field_type = field_info["type"] | |
| field_descriptions.append(f'"{field_name}": "{field_type}"') | |
| # Create the JSON schema structure for the listings | |
| schema_structure = ",\n".join(field_descriptions) | |
| # Generate the system message dynamically | |
| system_message = f""" | |
| You are an intelligent text extraction and conversion assistant. Your task is to extract structured information | |
| from the given text and convert it into a pure JSON format. The JSON should contain only the structured data extracted from the text, | |
| with no additional commentary, explanations, or extraneous information. | |
| You could encounter cases where you can't find the data of the fields you have to extract or the data will be in a foreign language. | |
| Please process the following text and provide the output in pure JSON format with no words before or after the JSON: | |
| Please ensure the output strictly follows this schema: | |
| {{ | |
| "listings": [ | |
| {{ | |
| {schema_structure} | |
| }} | |
| ] | |
| }} """ | |
| return system_message | |
| def format_data(data, DynamicListingsContainer, DynamicListingModel, selected_model): | |
| token_counts = {} | |
| if selected_model in ["gpt-4o-mini", "gpt-4o-2024-08-06"]: | |
| # Use OpenAI API | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
| completion = client.beta.chat.completions.parse( | |
| model=selected_model, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_MESSAGE}, | |
| {"role": "user", "content": USER_MESSAGE + data}, | |
| ], | |
| response_format=DynamicListingsContainer | |
| ) | |
| # Calculate tokens using tiktoken | |
| encoder = tiktoken.encoding_for_model(selected_model) | |
| input_token_count = len(encoder.encode(USER_MESSAGE + data)) | |
| output_token_count = len(encoder.encode(json.dumps(completion.choices[0].message.parsed.dict()))) | |
| token_counts = { | |
| "input_tokens": input_token_count, | |
| "output_tokens": output_token_count | |
| } | |
| return completion.choices[0].message.parsed, token_counts | |
| elif selected_model == "gemini-1.5-flash": | |
| # Use Google Gemini API | |
| genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
| model = genai.GenerativeModel('gemini-1.5-flash', | |
| generation_config={ | |
| "response_mime_type": "application/json", | |
| "response_schema": DynamicListingsContainer | |
| }) | |
| prompt = SYSTEM_MESSAGE + "\n" + USER_MESSAGE + data | |
| # Count input tokens using Gemini's method | |
| input_tokens = model.count_tokens(prompt) | |
| completion = model.generate_content(prompt) | |
| # Extract token counts from usage_metadata | |
| usage_metadata = completion.usage_metadata | |
| token_counts = { | |
| "input_tokens": usage_metadata.prompt_token_count, | |
| "output_tokens": usage_metadata.candidates_token_count | |
| } | |
| return completion.text, token_counts | |
| elif selected_model == "Llama3.1 8B": | |
| # Dynamically generate the system message based on the schema | |
| sys_message = generate_system_message(DynamicListingModel) | |
| # print(SYSTEM_MESSAGE) | |
| # Point to the local server | |
| client = OpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio") | |
| completion = client.chat.completions.create( | |
| model=LLAMA_MODEL_FULLNAME, #change this if needed (use a better model) | |
| messages=[ | |
| {"role": "system", "content": sys_message}, | |
| {"role": "user", "content": USER_MESSAGE + data} | |
| ], | |
| temperature=0.7, | |
| ) | |
| # Extract the content from the response | |
| response_content = completion.choices[0].message.content | |
| print(response_content) | |
| # Convert the content from JSON string to a Python dictionary | |
| parsed_response = json.loads(response_content) | |
| # Extract token usage | |
| token_counts = { | |
| "input_tokens": completion.usage.prompt_tokens, | |
| "output_tokens": completion.usage.completion_tokens | |
| } | |
| return parsed_response, token_counts | |
| elif selected_model== "Groq Llama3.1 70b": | |
| # Dynamically generate the system message based on the schema | |
| sys_message = generate_system_message(DynamicListingModel) | |
| # print(SYSTEM_MESSAGE) | |
| # Point to the local server | |
| client = Groq(api_key=os.environ.get("GROQ_API_KEY"),) | |
| completion = client.chat.completions.create( | |
| messages=[ | |
| {"role": "system","content": sys_message}, | |
| {"role": "user","content": USER_MESSAGE + data} | |
| ], | |
| model=GROQ_LLAMA_MODEL_FULLNAME, | |
| ) | |
| # Extract the content from the response | |
| response_content = completion.choices[0].message.content | |
| # Convert the content from JSON string to a Python dictionary | |
| parsed_response = json.loads(response_content) | |
| # completion.usage | |
| token_counts = { | |
| "input_tokens": completion.usage.prompt_tokens, | |
| "output_tokens": completion.usage.completion_tokens | |
| } | |
| return parsed_response, token_counts | |
| else: | |
| raise ValueError(f"Unsupported model: {selected_model}") | |
| def save_formatted_data(formatted_data, output_folder: str, json_file_name: str, excel_file_name: str): | |
| """Save formatted data as JSON and Excel in the specified output folder.""" | |
| os.makedirs(output_folder, exist_ok=True) | |
| # Parse the formatted data if it's a JSON string (from Gemini API) | |
| if isinstance(formatted_data, str): | |
| try: | |
| formatted_data_dict = json.loads(formatted_data) | |
| except json.JSONDecodeError: | |
| raise ValueError("The provided formatted data is a string but not valid JSON.") | |
| else: | |
| # Handle data from OpenAI or other sources | |
| formatted_data_dict = formatted_data.dict() if hasattr(formatted_data, 'dict') else formatted_data | |
| # Save the formatted data as JSON | |
| json_output_path = os.path.join(output_folder, json_file_name) | |
| with open(json_output_path, 'w', encoding='utf-8') as f: | |
| json.dump(formatted_data_dict, f, indent=4) | |
| print(f"Formatted data saved to JSON at {json_output_path}") | |
| # Prepare data for DataFrame | |
| if isinstance(formatted_data_dict, dict): | |
| # If the data is a dictionary containing lists, assume these lists are records | |
| data_for_df = next(iter(formatted_data_dict.values())) if len(formatted_data_dict) == 1 else formatted_data_dict | |
| elif isinstance(formatted_data_dict, list): | |
| data_for_df = formatted_data_dict | |
| else: | |
| raise ValueError("Formatted data is neither a dictionary nor a list, cannot convert to DataFrame") | |
| # Create DataFrame | |
| try: | |
| df = pd.DataFrame(data_for_df) | |
| print("DataFrame created successfully.") | |
| # Save the DataFrame to an Excel file | |
| excel_output_path = os.path.join(output_folder, excel_file_name) | |
| df.to_excel(excel_output_path, index=False) | |
| print(f"Formatted data saved to Excel at {excel_output_path}") | |
| return df | |
| except Exception as e: | |
| print(f"Error creating DataFrame or saving Excel: {str(e)}") | |
| return None | |
| def calculate_price(token_counts, model): | |
| input_token_count = token_counts.get("input_tokens", 0) | |
| output_token_count = token_counts.get("output_tokens", 0) | |
| # Calculate the costs | |
| input_cost = input_token_count * PRICING[model]["input"] | |
| output_cost = output_token_count * PRICING[model]["output"] | |
| total_cost = input_cost + output_cost | |
| return input_token_count, output_token_count, total_cost | |
| def generate_unique_folder_name(url): | |
| timestamp = datetime.now().strftime('%Y_%m_%d__%H_%M_%S') | |
| url_name = re.sub(r'\W+', '_', url.split('//')[1].split('/')[0]) # Extract domain name and replace non-alphanumeric characters | |
| return f"{url_name}_{timestamp}" | |
| def scrape_multiple_urls(urls, fields, selected_model): | |
| output_folder = os.path.join('output', generate_unique_folder_name(urls[0])) | |
| os.makedirs(output_folder, exist_ok=True) | |
| total_input_tokens = 0 | |
| total_output_tokens = 0 | |
| total_cost = 0 | |
| all_data = [] | |
| markdown = None # We'll store the markdown for the first (or only) URL | |
| for i, url in enumerate(urls, start=1): | |
| raw_html = fetch_html_selenium(url) | |
| current_markdown = html_to_markdown_with_readability(raw_html) | |
| if i == 1: | |
| markdown = current_markdown # Store markdown for the first URL | |
| input_tokens, output_tokens, cost, formatted_data = scrape_url(url, fields, selected_model, output_folder, i, current_markdown) | |
| total_input_tokens += input_tokens | |
| total_output_tokens += output_tokens | |
| total_cost += cost | |
| all_data.append(formatted_data) | |
| return output_folder, total_input_tokens, total_output_tokens, total_cost, all_data, markdown | |
| def scrape_url(url: str, fields: List[str], selected_model: str, output_folder: str, file_number: int, markdown: str): | |
| """Scrape a single URL and save the results.""" | |
| try: | |
| # Save raw data | |
| save_raw_data(markdown, output_folder, f'rawData_{file_number}.md') | |
| # Create the dynamic listing model | |
| DynamicListingModel = create_dynamic_listing_model(fields) | |
| # Create the container model that holds a list of the dynamic listing models | |
| DynamicListingsContainer = create_listings_container_model(DynamicListingModel) | |
| # Format data | |
| formatted_data, token_counts = format_data(markdown, DynamicListingsContainer, DynamicListingModel, selected_model) | |
| # Save formatted data | |
| save_formatted_data(formatted_data, output_folder, f'sorted_data_{file_number}.json', f'sorted_data_{file_number}.xlsx') | |
| # Calculate and return token usage and cost | |
| input_tokens, output_tokens, total_cost = calculate_price(token_counts, selected_model) | |
| return input_tokens, output_tokens, total_cost, formatted_data | |
| except Exception as e: | |
| print(f"An error occurred while processing {url}: {e}") | |
| return 0, 0, 0, None | |