Spaces:
Sleeping
Sleeping
# %% | |
import os | |
import sys | |
# Change the current working directory to the directory where the script is located | |
#__file__ = | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
os.chdir(current_dir) | |
# %% | |
# import requests | |
# from bs4 import BeautifulSoup | |
# from urllib.parse import urljoin | |
# import time | |
# import concurrent.futures | |
# from queue import Queue | |
# from threading import Lock | |
# def fetch_and_parse_links(url, base_url): | |
# try: | |
# response = requests.get(url, timeout=10) | |
# response.raise_for_status() | |
# soup = BeautifulSoup(response.content, 'html.parser') | |
# main_div = soup.find('div', id='main') | |
# if not main_div: | |
# print(f"No div with id='main' found in {url}") | |
# return [] | |
# links = main_div.find_all('a', href=True) | |
# paths = [] | |
# for link in links: | |
# href = urljoin(url, link['href']) | |
# if href.startswith(base_url) and '#' not in href: | |
# path = href[len(base_url):].strip("/") | |
# if path and path not in paths: | |
# paths.append(path) | |
# return paths | |
# except requests.RequestException as e: | |
# print(f"Error fetching {url}: {e}") | |
# return [] | |
# def worker(base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock): | |
# while True: | |
# current_path = to_visit_queue.get() | |
# if current_path is None: | |
# break | |
# with lock: | |
# if current_path in visited_paths: | |
# to_visit_queue.task_done() | |
# continue | |
# visited_paths.add(current_path) | |
# current_url = urljoin(base_url, current_path) | |
# print(f"Visiting: {current_url}") | |
# new_paths = fetch_and_parse_links(current_url, base_url) | |
# with lock: | |
# for new_path in new_paths: | |
# if new_path not in visited_paths: | |
# to_visit_queue.put(new_path) | |
# unvisited_paths.add(new_path) | |
# from_url = f"{base_url}{current_path}" | |
# to_url = f"{base_url}{new_path}" | |
# new_tuple = (from_url, to_url) | |
# if new_tuple not in tuples_list: | |
# tuples_list.append(new_tuple) | |
# if current_path in unvisited_paths: | |
# unvisited_paths.remove(current_path) | |
# to_visit_queue.task_done() | |
# time.sleep(1) # Be polite to the server | |
# def create_tuples_from_paths(base_url, max_workers=5): | |
# visited_paths = set() | |
# unvisited_paths = set() | |
# tuples_list = [] | |
# to_visit_queue = Queue() | |
# lock = Lock() | |
# to_visit_queue.put("") # Start with an empty string to represent the root | |
# with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
# futures = [] | |
# for _ in range(max_workers): | |
# future = executor.submit(worker, base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock) | |
# futures.append(future) | |
# to_visit_queue.join() | |
# for _ in range(max_workers): | |
# to_visit_queue.put(None) | |
# concurrent.futures.wait(futures) | |
# return tuples_list, visited_paths, unvisited_paths | |
# # Define the base URL | |
# base_url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/" | |
# import json | |
# def load_json(file_path): | |
# with open(file_path, 'r', encoding='utf-8') as file: | |
# return json.load(file) | |
# def flatten_list(nested_list): | |
# for item in nested_list: | |
# if isinstance(item, list): | |
# yield from flatten_list(item) # Recursively yield from nested lists | |
# else: | |
# yield item | |
# import polars as pl | |
# # Define the base URL | |
# base_url = 'https://www.gov.br/governodigital/pt-br/' | |
# # Example usage | |
# file_path = 'memory/graph_data_tiplet.json' # Replace with your actual file path | |
# base_url = 'https://www.gov.br/governodigital/pt-br/' | |
# json_data = load_json(file_path) | |
# json_data = list(flatten_list(json_data)) | |
# # Convert the list of URLs to a Polars DataFrame | |
# df = pl.DataFrame({ | |
# 'url': json_data | |
# }) | |
# # Remove the base URL and convert to path | |
# df = df.with_columns( | |
# (pl.col('url').str.replace(base_url, '')).alias('path') | |
# ) | |
# # Extract paths as a list | |
# paths = df['path'].to_list() | |
# # Build a hierarchical structure | |
# def build_tree(paths): | |
# tree = {} | |
# for path in paths: | |
# parts = path.strip('/').split('/') | |
# current_level = tree | |
# for part in parts: | |
# if part not in current_level: | |
# current_level[part] = {} | |
# current_level = current_level[part] | |
# return tree | |
#%% | |
from utils.llm import chat | |
from utils.file import File | |
import json | |
system = File("prompts/system.md") | |
knowledge = File("prompts/knowledge.md") | |
graph = File("interface/visualization.html") | |
graph_data = File("memory/graph_data.json") | |
# user_question = input("Question?") | |
# messages = [ | |
# { | |
# "role": "system", | |
# "content": [ | |
# { | |
# "type": "text", | |
# "text": system | |
# } | |
# ] | |
# }, | |
# { | |
# "role": "user", | |
# "content": [ | |
# { | |
# "type": "text", | |
# "text": user_question | |
# } | |
# ] | |
# } | |
# ] | |
def pipeline(messages): | |
res = chat(messages=messages) | |
response = res.choices[0].message.content | |
return response | |
# if __name__ == "__main__": | |
# res = chat(messages=messages) | |
# response = res.choices[0].message.content | |
# print(response) | |
#%% | |
# from IPython.display import display, Markdown | |
# def build_tree_structure(tree, indent=0): | |
# """ | |
# Recursively builds a string representation of the tree structure. | |
# Args: | |
# tree (dict): The hierarchical tree structure. | |
# indent (int): The current level of indentation. | |
# Returns: | |
# str: A string representing the tree structure. | |
# """ | |
# result = "" | |
# for key, subtree in tree.items(): | |
# result += f"{' ' * indent} - {key}/\n" | |
# if isinstance(subtree, dict): | |
# result += build_tree_structure(subtree, indent + 1) | |
# return result | |
# # Create and print the hierarchical structure | |
# tree_structure = build_tree(paths) | |
# obj = build_tree_structure(tree_structure) | |
# print(obj) | |
# display(Markdown(obj)) | |
# # print(json.dumps(tree_structure, indent=2)) | |
# #%% | |
# # Create tuples from paths and track visited/unvisited paths | |
# tuples_list, visited_paths, unvisited_paths = create_tuples_from_paths(base_url, 10) | |
# # Print the resulting list of tuples | |
# print("\nTuples:") | |
# for t in tuples_list: | |
# print(t) | |
# # Print visited and unvisited paths | |
# print("\nVisited Paths:") | |
# for p in visited_paths: | |
# print(f"{base_url}{p}") | |
# print("\nUnvisited Paths:") | |
# for p in unvisited_paths: | |
# print(f"{base_url}{p}") | |
# # Print summary | |
# print(f"\nTotal links found: {len(tuples_list)}") | |
# print(f"Visited pages: {len(visited_paths)}") | |
# print(f"Unvisited pages: {len(unvisited_paths)}") | |
# # Create a dictionary to hold our graph data | |
# graph_data = { | |
# "nodes": [], | |
# "edges": [] | |
# } | |
# import json | |
# # Create a set to keep track of nodes we've added | |
# added_nodes = set() | |
# # Process the tuples to create nodes and edges | |
# for from_url, to_url in tuples_list: | |
# from_path = from_url[len(base_url):].strip("/") or "root" | |
# to_path = to_url[len(base_url):].strip("/") | |
# if from_path not in added_nodes: | |
# graph_data["nodes"].append({"id": from_path, "label": from_path}) | |
# added_nodes.add(from_path) | |
# if to_path not in added_nodes: | |
# graph_data["nodes"].append({"id": to_path, "label": to_path}) | |
# added_nodes.add(to_path) | |
# graph_data["edges"].append({"from": from_path, "to": to_path}) | |
# # Save the graph data to a JSON file | |
# with open('graph_data.json', 'w') as f: | |
# json.dump(graph_data, f) | |
# # Save the graph data to a JSON file | |
# with open('graph_data_tiplet.json', 'w') as f: | |
# json.dump(tuples_list, f) | |
# print("Graph data saved to graph_data.json") | |
# # %% | |
# import requests | |
# from bs4 import BeautifulSoup | |
# from markdownify import markdownify as md | |
# import os | |
# os.chdir("/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR") | |
# from Banco_de_Dados.Estruturado.data2json import format_for_markdown | |
# # URL da página web | |
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br" | |
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/atendimento-presencial" | |
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br" | |
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br/recuperar-conta-gov.br" | |
# # Obter o HTML da página | |
# response = requests.get(url) | |
# html_content = response.text | |
# # Usar BeautifulSoup para analisar o HTML | |
# soup = BeautifulSoup(html_content, 'html.parser') | |
# # Extrair o conteúdo da div com id 'main' | |
# main_div = soup.find('div', id='main') | |
# a = format_for_markdown(main_div) | |
# print(a) | |
# if main_div: | |
# # Converter o conteúdo da div para Markdown | |
# markdown_content = md(str(main_div)) | |
# # Remover quebras de linha extras (\n\n) | |
# markdown_content = "\n".join([line for line in markdown_content.split("\n\n") if line.strip()]) | |
# print(markdown_content) | |
# # Salvar o conteúdo em Markdown em um arquivo | |
# with open("main_content.md", "w", encoding="utf-8") as file: | |
# file.write(markdown_content) | |
# print("Conversão concluída e salva em 'main_content.md'.") | |
# else: | |
# print("Div com id 'main' não encontrada.") | |
# # %% | |
# import requests | |
# def pipeline(): | |
# # url = input("website: ") | |
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br" | |
# response = requests.get(url).text | |
# print(response) | |
# import os | |
# def print_directory_structure(path, level=0): | |
# if not os.path.isdir(path): | |
# print(f"{path} is not a valid directory.") | |
# return | |
# prefix = ' ' * 4 * level + '|-- ' | |
# print(prefix + os.path.basename(path) + '/') | |
# for item in os.listdir(path): | |
# item_path = os.path.join(path, item) | |
# if os.path.isdir(item_path): | |
# print_directory_structure(item_path, level + 1) | |
# else: | |
# print(' ' * 4 * (level + 1) + '|-- ' + item) | |
# # Replace 'your_path_here' with the path you want to print | |
# your_path_here = '/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR/AI_agent' | |
# print_directory_structure(your_path_here) | |
# if __name__ == "__main__": | |
# pipeline() | |