Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import PyPDF2 | |
import pandas as pd | |
import docx | |
import json | |
import requests | |
from docx import Document | |
from langchain_community.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from transformers import pipeline | |
# Configurar Hugging Face API Token | |
HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") | |
# Carregar o modelo Mistral 7B gratuitamente do Hugging Face | |
chatbot_pipeline = pipeline("text-generation", model="tiiuae/falcon-7b-instruct", token=HF_API_TOKEN) | |
def extract_files_from_folder(folder_path): | |
"""Scans a folder for PDF, TXT, CSV, DOCX, and IPYNB files.""" | |
extracted_files = {"pdf": [], "txt": [], "csv": [], "docx": [], "ipynb": []} | |
for root, _, files in os.walk(folder_path): | |
for file_name in files: | |
file_path = os.path.join(root, file_name) | |
if file_name.endswith(".pdf"): | |
extracted_files["pdf"].append(file_path) | |
elif file_name.endswith(".txt"): | |
extracted_files["txt"].append(file_path) | |
elif file_name.endswith(".csv"): | |
extracted_files["csv"].append(file_path) | |
elif file_name.endswith(".docx"): | |
extracted_files["docx"].append(file_path) | |
elif file_name.endswith(".ipynb"): | |
extracted_files["ipynb"].append(file_path) | |
return extracted_files | |
def get_text_from_pdf(pdf_files): | |
text = "" | |
for pdf_path in pdf_files: | |
with open(pdf_path, "rb") as pdf_file: | |
reader = PyPDF2.PdfReader(pdf_file) | |
for page in reader.pages: | |
text += page.extract_text() + "\n" | |
return text | |
def read_text_from_files(file_paths): | |
text = "" | |
for file_path in file_paths: | |
with open(file_path, "r", encoding="utf-8", errors="ignore") as file: | |
text += file.read() + "\n" | |
return text | |
def get_text_from_csv(csv_files): | |
text = "" | |
for csv_path in csv_files: | |
df = pd.read_csv(csv_path) | |
text += df.to_string() + "\n" | |
return text | |
def get_text_from_docx(docx_files): | |
text = "" | |
for docx_path in docx_files: | |
doc = Document(docx_path) | |
for para in doc.paragraphs: | |
text += para.text + "\n" | |
return text | |
def get_text_from_ipynb(ipynb_files): | |
text = "" | |
for ipynb_path in ipynb_files: | |
with open(ipynb_path, "r", encoding="utf-8", errors="ignore") as file: | |
content = json.load(file) | |
for cell in content.get("cells", []): | |
if cell.get("cell_type") in ["markdown", "code"]: | |
text += "\n".join(cell.get("source", [])) + "\n" | |
return text | |
def combine_text_from_files(extracted_files): | |
text = ( | |
get_text_from_pdf(extracted_files["pdf"]) + | |
read_text_from_files(extracted_files["txt"]) + | |
get_text_from_csv(extracted_files["csv"]) + | |
get_text_from_docx(extracted_files["docx"]) + | |
get_text_from_ipynb(extracted_files["ipynb"]) | |
) | |
return text | |
def generate_response(question, text): | |
"""Uses the Mistral 7B model to answer questions based on extracted text.""" | |
prompt = f"Question: {question}\nBased on the following document content:\n{text[:3000]}" # Limite de 3000 caracteres | |
response = chatbot_pipeline(prompt, max_length=500, truncation=True)[0]['generated_text'] | |
return response.strip() | |
def chatbot_interface(question): | |
folder_path = "New_Data_Analytics/" | |
extracted_files = extract_files_from_folder(folder_path) | |
text = combine_text_from_files(extracted_files) | |
if not text.strip(): | |
return "No valid files found. Please upload supported file types." | |
return generate_response(question, text) | |
demo = gr.Interface( | |
fn=chatbot_interface, | |
inputs=gr.Textbox(label="Ask a question", placeholder="Type your question here..."), | |
outputs=gr.Textbox(label="Answer") | |
) | |
demo.launch() | |