|
from typing import List, Optional |
|
from pathlib import Path |
|
import tree_sitter_python |
|
from tree_sitter import Language, Parser, Node |
|
|
|
from knowlang.parser.base.parser import LanguageParser |
|
from knowlang.core.types import CodeChunk, ChunkType |
|
from knowlang.utils.fancy_log import FancyLogger |
|
|
|
LOG = FancyLogger(__name__) |
|
|
|
class PythonParser(LanguageParser): |
|
"""Python-specific implementation of LanguageParser""" |
|
|
|
def setup(self) -> None: |
|
"""Initialize tree-sitter with Python language support""" |
|
self.language = Language(tree_sitter_python.language()) |
|
self.parser = Parser(self.language) |
|
self.language_config = self.config.parser.languages["python"] |
|
|
|
def _get_preceding_docstring(self, node: Node, source_code: bytes) -> Optional[str]: |
|
"""Extract docstring from comments""" |
|
docstring_parts = [] |
|
current_node = node.prev_sibling |
|
|
|
while current_node: |
|
if current_node.type == "comment": |
|
comment = source_code[current_node.start_byte:current_node.end_byte].decode('utf-8') |
|
docstring_parts.insert(0, comment) |
|
elif current_node.type == "expression_statement": |
|
string_node = current_node.children[0] if current_node.children else None |
|
if string_node and string_node.type in ("string", "string_literal"): |
|
docstring = source_code[string_node.start_byte:string_node.end_byte].decode('utf-8') |
|
docstring_parts.insert(0, docstring) |
|
break |
|
elif current_node.type not in ("empty_statement", "newline"): |
|
break |
|
current_node = current_node.prev_sibling |
|
|
|
return '\n'.join(docstring_parts) if docstring_parts else None |
|
|
|
def _has_syntax_error(self, node: Node) -> bool: |
|
"""Check if the node or its children contain syntax errors""" |
|
if node.type == "ERROR": |
|
return True |
|
if node.has_error: |
|
return True |
|
return any(self._has_syntax_error(child) for child in node.children) |
|
|
|
def _process_class(self, node: Node, source_code: bytes, file_path: Path) -> CodeChunk: |
|
"""Process a class node and return a CodeChunk""" |
|
name = next( |
|
(child.text.decode('utf-8') |
|
for child in node.children |
|
if child.type == "identifier"), |
|
None |
|
) |
|
|
|
if not name: |
|
raise ValueError(f"Could not find class name in node: {node.text}") |
|
|
|
return CodeChunk( |
|
type=ChunkType.CLASS, |
|
name=name, |
|
content=source_code[node.start_byte:node.end_byte].decode('utf-8'), |
|
start_line=node.start_point[0], |
|
end_line=node.end_point[0], |
|
file_path=str(file_path), |
|
docstring=self._get_preceding_docstring(node, source_code) |
|
) |
|
|
|
def _process_function(self, node: Node, source_code: bytes, file_path: Path) -> CodeChunk: |
|
"""Process a function node and return a CodeChunk""" |
|
name = next( |
|
(child.text.decode('utf-8') |
|
for child in node.children |
|
if child.type == "identifier"), |
|
None |
|
) |
|
|
|
if not name: |
|
raise ValueError(f"Could not find function name in node: {node.text}") |
|
|
|
|
|
parent_node = node.parent |
|
parent_name = None |
|
if parent_node and parent_node.type == "class_definition": |
|
parent_name = next( |
|
(child.text.decode('utf-8') |
|
for child in parent_node.children |
|
if child.type == "identifier"), |
|
None |
|
) |
|
|
|
return CodeChunk( |
|
type=ChunkType.FUNCTION, |
|
name=name, |
|
content=source_code[node.start_byte:node.end_byte].decode('utf-8'), |
|
start_line=node.start_point[0], |
|
end_line=node.end_point[0], |
|
file_path=str(file_path), |
|
parent_name=parent_name, |
|
docstring=self._get_preceding_docstring(node, source_code) |
|
) |
|
|
|
def parse_file(self, file_path: Path) -> List[CodeChunk]: |
|
"""Parse a single Python file and return list of code chunks""" |
|
if not self.supports_extension(file_path.suffix): |
|
LOG.debug(f"Skipping file {file_path}: unsupported extension") |
|
return [] |
|
|
|
try: |
|
|
|
if file_path.stat().st_size > self.language_config.max_file_size: |
|
LOG.warning(f"Skipping file {file_path}: exceeds size limit of {self.language_config.max_file_size} bytes") |
|
return [] |
|
|
|
with open(file_path, 'rb') as f: |
|
source_code = f.read() |
|
|
|
if not self.parser: |
|
raise RuntimeError("Parser not initialized. Call setup() first.") |
|
|
|
tree = self.parser.parse(source_code) |
|
|
|
|
|
if self._has_syntax_error(tree.root_node): |
|
LOG.warning(f"Syntax errors found in {file_path}") |
|
return [] |
|
|
|
chunks: List[CodeChunk] = [] |
|
|
|
|
|
for node in tree.root_node.children: |
|
if node.type == "class_definition": |
|
chunks.append(self._process_class(node, source_code, file_path)) |
|
elif node.type == "function_definition": |
|
chunks.append(self._process_function(node, source_code, file_path)) |
|
else: |
|
|
|
pass |
|
|
|
return chunks |
|
|
|
except Exception as e: |
|
LOG.error(f"Error parsing file {file_path}: {str(e)}") |
|
return [] |
|
|
|
def supports_extension(self, ext: str) -> bool: |
|
"""Check if this parser supports a given file extension""" |
|
return ext in self.language_config.file_extensions |