import os from llama_index.core import SimpleDirectoryReader from llama_index.core import VectorStoreIndex from llama_index.llms.ollama import Ollama from llama_index.core.node_parser import SentenceSplitter from llama_index.core.objects import ObjectIndex, SimpleObjectNodeMapping from llama_index.core.tools import QueryEngineTool from llama_index.core import StorageContext, load_index_from_storage from utils.loader import config_loader from typing import List, Tuple import yaml from llama_index.core import Settings # Global Settings.llm = Ollama(model="llama3", request_timeout=120) class IndexHandler: def __init__(self, data_path): self.data_path = data_path self.config_path = os.path.join(self.data_path, "config.yaml") self.config = config_loader(self.config_path) self.index_dir = self.create_index() def load_docs(self): pdf_files = self.config.get("PDF_FILES", []) txt_files = self.config.get("TXT_FILES", []) exist_files = pdf_files + txt_files print(f"Existing_files: {exist_files}") # Check data_path and add files if not in exist_files new_files = [] pdf_files = self.config.get("PDF_FILES", []) txt_files = self.config.get("TXT_FILES", []) for root, dirs, files in os.walk(self.data_path): for file in files: if file.endswith(('.pdf', '.txt')): full_path = os.path.join(root, file) print(f"Checking file: {full_path}") if full_path not in exist_files: new_files.append(full_path) print(f"Adding new file: {full_path}") if file.endswith('.pdf'): pdf_files.append(full_path) elif file.endswith('.txt'): txt_files.append(full_path) # Update config.yaml self.config["PDF_FILES"] = pdf_files self.config["TXT_FILES"] = txt_files with open(self.config_path, 'w') as f: yaml.dump(self.config, f) return new_files def create_index(self): node_parser = SentenceSplitter() new_files = self.load_docs() # existing_index index_dir = "/data1/home/purui/projects/chatbot/kb" existing_indexs = [] for root, dirs, files in os.walk(index_dir): for dir in dirs: existing_indexs.append(os.path.join(root, dir)) # 退出循环,因为我们只想要第一层的目录 break # if dirs: # for dir in dirs: # existing_indexs.append(os.path.join(root, dir)) # Create index for new files if new_files: for file_path in new_files: file_name = os.path.basename(file_path).split('.')[0] doc = SimpleDirectoryReader(input_files=[file_path]).load_data()[0] doc.metadata.update({'file_name': file_name}) # create nodes nodes = node_parser.get_nodes_from_documents([doc], show_progress=True) # create index persist_dir = f"/data1/home/purui/projects/chatbot/kb/{file_name}" index = VectorStoreIndex(nodes=nodes) index.storage_context.persist(persist_dir=persist_dir) existing_indexs.append(persist_dir) # return all existing indexs return existing_indexs def get_all_index(self): indexs = [] for index in self.index_dir: index_name = os.path.basename(index) storage_context = StorageContext.from_defaults(persist_dir=index) vectorIndex = load_index_from_storage(storage_context) # (index: name, vectorIndex: VectorStoreIndex) indexs.append((index_name, vectorIndex)) return indexs # def from_objects(self, obj: List[Tuple[str, QueryEngineTool]]): # obj = [tool[1] for tool in obj] # obj_node_mapping = SimpleObjectNodeMapping.from_objects(obj) # obj_index = ObjectIndex.from_objects( # objects=obj, # index_cls=VectorStoreIndex, # obj_node_mapping=obj_node_mapping, # ) # obj_index.persist(persist_dir="../kb/top_index") # return obj_index if __name__ == '__main__': index_handler = IndexHandler("/data1/home/purui/projects/chatbot/data") indexs = index_handler.get_all_index() print(indexs)