Spaces:
Running
Running
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| @Time : 2023/6/8 14:03 | |
| @Author : alexanderwu | |
| @File : document.py | |
| @Desc : Classes and Operations Related to Files in the File System. | |
| """ | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Optional, Union | |
| import pandas as pd | |
| from llama_index.core import Document, SimpleDirectoryReader | |
| from llama_index.core.node_parser import SimpleNodeParser | |
| from llama_index.readers.file import PDFReader | |
| from pydantic import BaseModel, ConfigDict, Field | |
| from tqdm import tqdm | |
| from metagpt.logs import logger | |
| from metagpt.repo_parser import RepoParser | |
| def validate_cols(content_col: str, df: pd.DataFrame): | |
| if content_col not in df.columns: | |
| raise ValueError("Content column not found in DataFrame.") | |
| def read_data(data_path: Path) -> Union[pd.DataFrame, list[Document]]: | |
| suffix = data_path.suffix | |
| if ".xlsx" == suffix: | |
| data = pd.read_excel(data_path) | |
| elif ".csv" == suffix: | |
| data = pd.read_csv(data_path) | |
| elif ".json" == suffix: | |
| data = pd.read_json(data_path) | |
| elif suffix in (".docx", ".doc"): | |
| data = SimpleDirectoryReader(input_files=[str(data_path)]).load_data() | |
| elif ".txt" == suffix: | |
| data = SimpleDirectoryReader(input_files=[str(data_path)]).load_data() | |
| node_parser = SimpleNodeParser.from_defaults(separator="\n", chunk_size=256, chunk_overlap=0) | |
| data = node_parser.get_nodes_from_documents(data) | |
| elif ".pdf" == suffix: | |
| data = PDFReader.load_data(str(data_path)) | |
| else: | |
| raise NotImplementedError("File format not supported.") | |
| return data | |
| class DocumentStatus(Enum): | |
| """Indicates document status, a mechanism similar to RFC/PEP""" | |
| DRAFT = "draft" | |
| UNDERREVIEW = "underreview" | |
| APPROVED = "approved" | |
| DONE = "done" | |
| class Document(BaseModel): | |
| """ | |
| Document: Handles operations related to document files. | |
| """ | |
| path: Path = Field(default=None) | |
| name: str = Field(default="") | |
| content: str = Field(default="") | |
| # metadata? in content perhaps. | |
| author: str = Field(default="") | |
| status: DocumentStatus = Field(default=DocumentStatus.DRAFT) | |
| reviews: list = Field(default_factory=list) | |
| def from_path(cls, path: Path): | |
| """ | |
| Create a Document instance from a file path. | |
| """ | |
| if not path.exists(): | |
| raise FileNotFoundError(f"File {path} not found.") | |
| content = path.read_text() | |
| return cls(content=content, path=path) | |
| def from_text(cls, text: str, path: Optional[Path] = None): | |
| """ | |
| Create a Document from a text string. | |
| """ | |
| return cls(content=text, path=path) | |
| def to_path(self, path: Optional[Path] = None): | |
| """ | |
| Save content to the specified file path. | |
| """ | |
| if path is not None: | |
| self.path = path | |
| if self.path is None: | |
| raise ValueError("File path is not set.") | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| # TODO: excel, csv, json, etc. | |
| self.path.write_text(self.content, encoding="utf-8") | |
| def persist(self): | |
| """ | |
| Persist document to disk. | |
| """ | |
| return self.to_path() | |
| class IndexableDocument(Document): | |
| """ | |
| Advanced document handling: For vector databases or search engines. | |
| """ | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| data: Union[pd.DataFrame, list] | |
| content_col: Optional[str] = Field(default="") | |
| meta_col: Optional[str] = Field(default="") | |
| def from_path(cls, data_path: Path, content_col="content", meta_col="metadata"): | |
| if not data_path.exists(): | |
| raise FileNotFoundError(f"File {data_path} not found.") | |
| data = read_data(data_path) | |
| if isinstance(data, pd.DataFrame): | |
| validate_cols(content_col, data) | |
| return cls(data=data, content=str(data), content_col=content_col, meta_col=meta_col) | |
| try: | |
| content = data_path.read_text() | |
| except Exception as e: | |
| logger.debug(f"Load {str(data_path)} error: {e}") | |
| content = "" | |
| return cls(data=data, content=content, content_col=content_col, meta_col=meta_col) | |
| def _get_docs_and_metadatas_by_df(self) -> (list, list): | |
| df = self.data | |
| docs = [] | |
| metadatas = [] | |
| for i in tqdm(range(len(df))): | |
| docs.append(df[self.content_col].iloc[i]) | |
| if self.meta_col: | |
| metadatas.append({self.meta_col: df[self.meta_col].iloc[i]}) | |
| else: | |
| metadatas.append({}) | |
| return docs, metadatas | |
| def _get_docs_and_metadatas_by_llamaindex(self) -> (list, list): | |
| data = self.data | |
| docs = [i.text for i in data] | |
| metadatas = [i.metadata for i in data] | |
| return docs, metadatas | |
| def get_docs_and_metadatas(self) -> (list, list): | |
| if isinstance(self.data, pd.DataFrame): | |
| return self._get_docs_and_metadatas_by_df() | |
| elif isinstance(self.data, list): | |
| return self._get_docs_and_metadatas_by_llamaindex() | |
| else: | |
| raise NotImplementedError("Data type not supported for metadata extraction.") | |
| class RepoMetadata(BaseModel): | |
| name: str = Field(default="") | |
| n_docs: int = Field(default=0) | |
| n_chars: int = Field(default=0) | |
| symbols: list = Field(default_factory=list) | |
| class Repo(BaseModel): | |
| # Name of this repo. | |
| name: str = Field(default="") | |
| # metadata: RepoMetadata = Field(default=RepoMetadata) | |
| docs: dict[Path, Document] = Field(default_factory=dict) | |
| codes: dict[Path, Document] = Field(default_factory=dict) | |
| assets: dict[Path, Document] = Field(default_factory=dict) | |
| path: Path = Field(default=None) | |
| def _path(self, filename): | |
| return self.path / filename | |
| def from_path(cls, path: Path): | |
| """Load documents, code, and assets from a repository path.""" | |
| path.mkdir(parents=True, exist_ok=True) | |
| repo = Repo(path=path, name=path.name) | |
| for file_path in path.rglob("*"): | |
| # FIXME: These judgments are difficult to support multiple programming languages and need to be more general | |
| if file_path.is_file() and file_path.suffix in [".json", ".txt", ".md", ".py", ".js", ".css", ".html"]: | |
| repo._set(file_path.read_text(), file_path) | |
| return repo | |
| def to_path(self): | |
| """Persist all documents, code, and assets to the given repository path.""" | |
| for doc in self.docs.values(): | |
| doc.to_path() | |
| for code in self.codes.values(): | |
| code.to_path() | |
| for asset in self.assets.values(): | |
| asset.to_path() | |
| def _set(self, content: str, path: Path): | |
| """Add a document to the appropriate category based on its file extension.""" | |
| suffix = path.suffix | |
| doc = Document(content=content, path=path, name=str(path.relative_to(self.path))) | |
| # FIXME: These judgments are difficult to support multiple programming languages and need to be more general | |
| if suffix.lower() == ".md": | |
| self.docs[path] = doc | |
| elif suffix.lower() in [".py", ".js", ".css", ".html"]: | |
| self.codes[path] = doc | |
| else: | |
| self.assets[path] = doc | |
| return doc | |
| def set(self, filename: str, content: str): | |
| """Set a document and persist it to disk.""" | |
| path = self._path(filename) | |
| doc = self._set(content, path) | |
| doc.to_path() | |
| def get(self, filename: str) -> Optional[Document]: | |
| """Get a document by its filename.""" | |
| path = self._path(filename) | |
| return self.docs.get(path) or self.codes.get(path) or self.assets.get(path) | |
| def get_text_documents(self) -> list[Document]: | |
| return list(self.docs.values()) + list(self.codes.values()) | |
| def eda(self) -> RepoMetadata: | |
| n_docs = sum(len(i) for i in [self.docs, self.codes, self.assets]) | |
| n_chars = sum(sum(len(j.content) for j in i.values()) for i in [self.docs, self.codes, self.assets]) | |
| symbols = RepoParser(base_directory=self.path).generate_symbols() | |
| return RepoMetadata(name=self.name, n_docs=n_docs, n_chars=n_chars, symbols=symbols) | |