prototipo-2-agente / agent.py
Pecximenes's picture
Uploading Fauses's agent to HuggingFace Spaces
2dd02d0
raw
history blame
11 kB
# %%
import os
import sys
# Change the current working directory to the directory where the script is located
#__file__ =
current_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(current_dir)
# %%
# import requests
# from bs4 import BeautifulSoup
# from urllib.parse import urljoin
# import time
# import concurrent.futures
# from queue import Queue
# from threading import Lock
# def fetch_and_parse_links(url, base_url):
# try:
# response = requests.get(url, timeout=10)
# response.raise_for_status()
# soup = BeautifulSoup(response.content, 'html.parser')
# main_div = soup.find('div', id='main')
# if not main_div:
# print(f"No div with id='main' found in {url}")
# return []
# links = main_div.find_all('a', href=True)
# paths = []
# for link in links:
# href = urljoin(url, link['href'])
# if href.startswith(base_url) and '#' not in href:
# path = href[len(base_url):].strip("/")
# if path and path not in paths:
# paths.append(path)
# return paths
# except requests.RequestException as e:
# print(f"Error fetching {url}: {e}")
# return []
# def worker(base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock):
# while True:
# current_path = to_visit_queue.get()
# if current_path is None:
# break
# with lock:
# if current_path in visited_paths:
# to_visit_queue.task_done()
# continue
# visited_paths.add(current_path)
# current_url = urljoin(base_url, current_path)
# print(f"Visiting: {current_url}")
# new_paths = fetch_and_parse_links(current_url, base_url)
# with lock:
# for new_path in new_paths:
# if new_path not in visited_paths:
# to_visit_queue.put(new_path)
# unvisited_paths.add(new_path)
# from_url = f"{base_url}{current_path}"
# to_url = f"{base_url}{new_path}"
# new_tuple = (from_url, to_url)
# if new_tuple not in tuples_list:
# tuples_list.append(new_tuple)
# if current_path in unvisited_paths:
# unvisited_paths.remove(current_path)
# to_visit_queue.task_done()
# time.sleep(1) # Be polite to the server
# def create_tuples_from_paths(base_url, max_workers=5):
# visited_paths = set()
# unvisited_paths = set()
# tuples_list = []
# to_visit_queue = Queue()
# lock = Lock()
# to_visit_queue.put("") # Start with an empty string to represent the root
# with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# futures = []
# for _ in range(max_workers):
# future = executor.submit(worker, base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock)
# futures.append(future)
# to_visit_queue.join()
# for _ in range(max_workers):
# to_visit_queue.put(None)
# concurrent.futures.wait(futures)
# return tuples_list, visited_paths, unvisited_paths
# # Define the base URL
# base_url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/"
# import json
# def load_json(file_path):
# with open(file_path, 'r', encoding='utf-8') as file:
# return json.load(file)
# def flatten_list(nested_list):
# for item in nested_list:
# if isinstance(item, list):
# yield from flatten_list(item) # Recursively yield from nested lists
# else:
# yield item
# import polars as pl
# # Define the base URL
# base_url = 'https://www.gov.br/governodigital/pt-br/'
# # Example usage
# file_path = 'memory/graph_data_tiplet.json' # Replace with your actual file path
# base_url = 'https://www.gov.br/governodigital/pt-br/'
# json_data = load_json(file_path)
# json_data = list(flatten_list(json_data))
# # Convert the list of URLs to a Polars DataFrame
# df = pl.DataFrame({
# 'url': json_data
# })
# # Remove the base URL and convert to path
# df = df.with_columns(
# (pl.col('url').str.replace(base_url, '')).alias('path')
# )
# # Extract paths as a list
# paths = df['path'].to_list()
# # Build a hierarchical structure
# def build_tree(paths):
# tree = {}
# for path in paths:
# parts = path.strip('/').split('/')
# current_level = tree
# for part in parts:
# if part not in current_level:
# current_level[part] = {}
# current_level = current_level[part]
# return tree
#%%
from utils.llm import chat
from utils.file import File
import json
system = File("prompts/system.md")
knowledge = File("prompts/knowledge.md")
graph = File("interface/visualization.html")
graph_data = File("memory/graph_data.json")
# user_question = input("Question?")
# messages = [
# {
# "role": "system",
# "content": [
# {
# "type": "text",
# "text": system
# }
# ]
# },
# {
# "role": "user",
# "content": [
# {
# "type": "text",
# "text": user_question
# }
# ]
# }
# ]
def pipeline(messages):
res = chat(messages=messages)
response = res.choices[0].message.content
return response
# if __name__ == "__main__":
# res = chat(messages=messages)
# response = res.choices[0].message.content
# print(response)
#%%
# from IPython.display import display, Markdown
# def build_tree_structure(tree, indent=0):
# """
# Recursively builds a string representation of the tree structure.
# Args:
# tree (dict): The hierarchical tree structure.
# indent (int): The current level of indentation.
# Returns:
# str: A string representing the tree structure.
# """
# result = ""
# for key, subtree in tree.items():
# result += f"{' ' * indent} - {key}/\n"
# if isinstance(subtree, dict):
# result += build_tree_structure(subtree, indent + 1)
# return result
# # Create and print the hierarchical structure
# tree_structure = build_tree(paths)
# obj = build_tree_structure(tree_structure)
# print(obj)
# display(Markdown(obj))
# # print(json.dumps(tree_structure, indent=2))
# #%%
# # Create tuples from paths and track visited/unvisited paths
# tuples_list, visited_paths, unvisited_paths = create_tuples_from_paths(base_url, 10)
# # Print the resulting list of tuples
# print("\nTuples:")
# for t in tuples_list:
# print(t)
# # Print visited and unvisited paths
# print("\nVisited Paths:")
# for p in visited_paths:
# print(f"{base_url}{p}")
# print("\nUnvisited Paths:")
# for p in unvisited_paths:
# print(f"{base_url}{p}")
# # Print summary
# print(f"\nTotal links found: {len(tuples_list)}")
# print(f"Visited pages: {len(visited_paths)}")
# print(f"Unvisited pages: {len(unvisited_paths)}")
# # Create a dictionary to hold our graph data
# graph_data = {
# "nodes": [],
# "edges": []
# }
# import json
# # Create a set to keep track of nodes we've added
# added_nodes = set()
# # Process the tuples to create nodes and edges
# for from_url, to_url in tuples_list:
# from_path = from_url[len(base_url):].strip("/") or "root"
# to_path = to_url[len(base_url):].strip("/")
# if from_path not in added_nodes:
# graph_data["nodes"].append({"id": from_path, "label": from_path})
# added_nodes.add(from_path)
# if to_path not in added_nodes:
# graph_data["nodes"].append({"id": to_path, "label": to_path})
# added_nodes.add(to_path)
# graph_data["edges"].append({"from": from_path, "to": to_path})
# # Save the graph data to a JSON file
# with open('graph_data.json', 'w') as f:
# json.dump(graph_data, f)
# # Save the graph data to a JSON file
# with open('graph_data_tiplet.json', 'w') as f:
# json.dump(tuples_list, f)
# print("Graph data saved to graph_data.json")
# # %%
# import requests
# from bs4 import BeautifulSoup
# from markdownify import markdownify as md
# import os
# os.chdir("/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR")
# from Banco_de_Dados.Estruturado.data2json import format_for_markdown
# # URL da página web
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br"
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/atendimento-presencial"
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br"
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br/recuperar-conta-gov.br"
# # Obter o HTML da página
# response = requests.get(url)
# html_content = response.text
# # Usar BeautifulSoup para analisar o HTML
# soup = BeautifulSoup(html_content, 'html.parser')
# # Extrair o conteúdo da div com id 'main'
# main_div = soup.find('div', id='main')
# a = format_for_markdown(main_div)
# print(a)
# if main_div:
# # Converter o conteúdo da div para Markdown
# markdown_content = md(str(main_div))
# # Remover quebras de linha extras (\n\n)
# markdown_content = "\n".join([line for line in markdown_content.split("\n\n") if line.strip()])
# print(markdown_content)
# # Salvar o conteúdo em Markdown em um arquivo
# with open("main_content.md", "w", encoding="utf-8") as file:
# file.write(markdown_content)
# print("Conversão concluída e salva em 'main_content.md'.")
# else:
# print("Div com id 'main' não encontrada.")
# # %%
# import requests
# def pipeline():
# # url = input("website: ")
# url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br"
# response = requests.get(url).text
# print(response)
# import os
# def print_directory_structure(path, level=0):
# if not os.path.isdir(path):
# print(f"{path} is not a valid directory.")
# return
# prefix = ' ' * 4 * level + '|-- '
# print(prefix + os.path.basename(path) + '/')
# for item in os.listdir(path):
# item_path = os.path.join(path, item)
# if os.path.isdir(item_path):
# print_directory_structure(item_path, level + 1)
# else:
# print(' ' * 4 * (level + 1) + '|-- ' + item)
# # Replace 'your_path_here' with the path you want to print
# your_path_here = '/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR/AI_agent'
# print_directory_structure(your_path_here)
# if __name__ == "__main__":
# pipeline()