|  | """File reader skills for the OpenHands agent. | 
					
						
						|  |  | 
					
						
						|  | This module provides various functions to parse and extract content from different file types, | 
					
						
						|  | including PDF, DOCX, LaTeX, audio, image, video, and PowerPoint files. It utilizes different | 
					
						
						|  | libraries and APIs to process these files and output their content or descriptions. | 
					
						
						|  |  | 
					
						
						|  | Functions: | 
					
						
						|  | parse_pdf(file_path: str) -> None: Parse and print content of a PDF file. | 
					
						
						|  | parse_docx(file_path: str) -> None: Parse and print content of a DOCX file. | 
					
						
						|  | parse_latex(file_path: str) -> None: Parse and print content of a LaTeX file. | 
					
						
						|  | parse_audio(file_path: str, model: str = 'whisper-1') -> None: Transcribe and print content of an audio file. | 
					
						
						|  | parse_image(file_path: str, task: str = 'Describe this image as detail as possible.') -> None: Analyze and print description of an image file. | 
					
						
						|  | parse_video(file_path: str, task: str = 'Describe this image as detail as possible.', frame_interval: int = 30) -> None: Analyze and print description of video frames. | 
					
						
						|  | parse_pptx(file_path: str) -> None: Parse and print content of a PowerPoint file. | 
					
						
						|  |  | 
					
						
						|  | Note: | 
					
						
						|  | Some functions (parse_audio, parse_video, parse_image) require OpenAI API credentials | 
					
						
						|  | and are only available if the necessary environment variables are set. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | import base64 | 
					
						
						|  |  | 
					
						
						|  | import docx | 
					
						
						|  | import PyPDF2 | 
					
						
						|  | from pptx import Presentation | 
					
						
						|  | from pylatexenc.latex2text import LatexNodes2Text | 
					
						
						|  |  | 
					
						
						|  | from openhands.runtime.plugins.agent_skills.utils.config import ( | 
					
						
						|  | _get_max_token, | 
					
						
						|  | _get_openai_api_key, | 
					
						
						|  | _get_openai_base_url, | 
					
						
						|  | _get_openai_client, | 
					
						
						|  | _get_openai_model, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_pdf(file_path: str) -> None: | 
					
						
						|  | """Parses the content of a PDF file and prints it. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the file to open. | 
					
						
						|  | """ | 
					
						
						|  | print(f'[Reading PDF file from {file_path}]') | 
					
						
						|  | content = PyPDF2.PdfReader(file_path) | 
					
						
						|  | text = '' | 
					
						
						|  | for page_idx in range(len(content.pages)): | 
					
						
						|  | text += ( | 
					
						
						|  | f'@@ Page {page_idx + 1} @@\n' | 
					
						
						|  | + content.pages[page_idx].extract_text() | 
					
						
						|  | + '\n\n' | 
					
						
						|  | ) | 
					
						
						|  | print(text.strip()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_docx(file_path: str) -> None: | 
					
						
						|  | """Parses the content of a DOCX file and prints it. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the file to open. | 
					
						
						|  | """ | 
					
						
						|  | print(f'[Reading DOCX file from {file_path}]') | 
					
						
						|  | content = docx.Document(file_path) | 
					
						
						|  | text = '' | 
					
						
						|  | for i, para in enumerate(content.paragraphs): | 
					
						
						|  | text += f'@@ Page {i + 1} @@\n' + para.text + '\n\n' | 
					
						
						|  | print(text) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_latex(file_path: str) -> None: | 
					
						
						|  | """Parses the content of a LaTex file and prints it. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the file to open. | 
					
						
						|  | """ | 
					
						
						|  | print(f'[Reading LaTex file from {file_path}]') | 
					
						
						|  | with open(file_path) as f: | 
					
						
						|  | data = f.read() | 
					
						
						|  | text = LatexNodes2Text().latex_to_text(data) | 
					
						
						|  | print(text.strip()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _base64_img(file_path: str) -> str: | 
					
						
						|  | with open(file_path, 'rb') as image_file: | 
					
						
						|  | encoded_image = base64.b64encode(image_file.read()).decode('utf-8') | 
					
						
						|  | return encoded_image | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _base64_video(file_path: str, frame_interval: int = 10) -> list[str]: | 
					
						
						|  | import cv2 | 
					
						
						|  |  | 
					
						
						|  | video = cv2.VideoCapture(file_path) | 
					
						
						|  | base64_frames = [] | 
					
						
						|  | frame_count = 0 | 
					
						
						|  | while video.isOpened(): | 
					
						
						|  | success, frame = video.read() | 
					
						
						|  | if not success: | 
					
						
						|  | break | 
					
						
						|  | if frame_count % frame_interval == 0: | 
					
						
						|  | _, buffer = cv2.imencode('.jpg', frame) | 
					
						
						|  | base64_frames.append(base64.b64encode(buffer).decode('utf-8')) | 
					
						
						|  | frame_count += 1 | 
					
						
						|  | video.release() | 
					
						
						|  | return base64_frames | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _prepare_image_messages(task: str, base64_image: str): | 
					
						
						|  | return [ | 
					
						
						|  | { | 
					
						
						|  | 'role': 'user', | 
					
						
						|  | 'content': [ | 
					
						
						|  | {'type': 'text', 'text': task}, | 
					
						
						|  | { | 
					
						
						|  | 'type': 'image_url', | 
					
						
						|  | 'image_url': {'url': f'data:image/jpeg;base64,{base64_image}'}, | 
					
						
						|  | }, | 
					
						
						|  | ], | 
					
						
						|  | } | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_audio(file_path: str, model: str = 'whisper-1') -> None: | 
					
						
						|  | """Parses the content of an audio file and prints it. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the audio file to transcribe. | 
					
						
						|  | model: str: The audio model to use for transcription. Defaults to 'whisper-1'. | 
					
						
						|  | """ | 
					
						
						|  | print(f'[Transcribing audio file from {file_path}]') | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | with open(file_path, 'rb') as audio_file: | 
					
						
						|  | transcript = _get_openai_client().audio.translations.create( | 
					
						
						|  | model=model, file=audio_file | 
					
						
						|  | ) | 
					
						
						|  | print(transcript.text) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f'Error transcribing audio file: {e}') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_image( | 
					
						
						|  | file_path: str, task: str = 'Describe this image as detail as possible.' | 
					
						
						|  | ) -> None: | 
					
						
						|  | """Parses the content of an image file and prints the description. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the file to open. | 
					
						
						|  | task: str: The task description for the API call. Defaults to 'Describe this image as detail as possible.'. | 
					
						
						|  | """ | 
					
						
						|  | print(f'[Reading image file from {file_path}]') | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | base64_image = _base64_img(file_path) | 
					
						
						|  | response = _get_openai_client().chat.completions.create( | 
					
						
						|  | model=_get_openai_model(), | 
					
						
						|  | messages=_prepare_image_messages(task, base64_image), | 
					
						
						|  | max_tokens=_get_max_token(), | 
					
						
						|  | ) | 
					
						
						|  | content = response.choices[0].message.content | 
					
						
						|  | print(content) | 
					
						
						|  |  | 
					
						
						|  | except Exception as error: | 
					
						
						|  | print(f'Error with the request: {error}') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_video( | 
					
						
						|  | file_path: str, | 
					
						
						|  | task: str = 'Describe this image as detail as possible.', | 
					
						
						|  | frame_interval: int = 30, | 
					
						
						|  | ) -> None: | 
					
						
						|  | """Parses the content of an image file and prints the description. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the video file to open. | 
					
						
						|  | task: str: The task description for the API call. Defaults to 'Describe this image as detail as possible.'. | 
					
						
						|  | frame_interval: int: The interval between frames to analyze. Defaults to 30. | 
					
						
						|  |  | 
					
						
						|  | """ | 
					
						
						|  | print( | 
					
						
						|  | f'[Processing video file from {file_path} with frame interval {frame_interval}]' | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | task = task or 'This is one frame from a video, please summarize this frame.' | 
					
						
						|  | base64_frames = _base64_video(file_path) | 
					
						
						|  | selected_frames = base64_frames[::frame_interval] | 
					
						
						|  |  | 
					
						
						|  | if len(selected_frames) > 30: | 
					
						
						|  | new_interval = len(base64_frames) // 30 | 
					
						
						|  | selected_frames = base64_frames[::new_interval] | 
					
						
						|  |  | 
					
						
						|  | print(f'Totally {len(selected_frames)} would be analyze...\n') | 
					
						
						|  |  | 
					
						
						|  | idx = 0 | 
					
						
						|  | for base64_frame in selected_frames: | 
					
						
						|  | idx += 1 | 
					
						
						|  | print(f'Process the {file_path}, current No. {idx * frame_interval} frame...') | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | response = _get_openai_client().chat.completions.create( | 
					
						
						|  | model=_get_openai_model(), | 
					
						
						|  | messages=_prepare_image_messages(task, base64_frame), | 
					
						
						|  | max_tokens=_get_max_token(), | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | content = response.choices[0].message.content | 
					
						
						|  | current_frame_content = f"Frame {idx}'s content: {content}\n" | 
					
						
						|  | print(current_frame_content) | 
					
						
						|  |  | 
					
						
						|  | except Exception as error: | 
					
						
						|  | print(f'Error with the request: {error}') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def parse_pptx(file_path: str) -> None: | 
					
						
						|  | """Parses the content of a pptx file and prints it. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_path: str: The path to the file to open. | 
					
						
						|  | """ | 
					
						
						|  | print(f'[Reading PowerPoint file from {file_path}]') | 
					
						
						|  | try: | 
					
						
						|  | pres = Presentation(str(file_path)) | 
					
						
						|  | text = [] | 
					
						
						|  | for slide_idx, slide in enumerate(pres.slides): | 
					
						
						|  | text.append(f'@@ Slide {slide_idx + 1} @@') | 
					
						
						|  | for shape in slide.shapes: | 
					
						
						|  | if hasattr(shape, 'text'): | 
					
						
						|  | text.append(shape.text) | 
					
						
						|  | print('\n'.join(text)) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f'Error reading PowerPoint file: {e}') | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | __all__ = [ | 
					
						
						|  | 'parse_pdf', | 
					
						
						|  | 'parse_docx', | 
					
						
						|  | 'parse_latex', | 
					
						
						|  | 'parse_pptx', | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if _get_openai_api_key() and _get_openai_base_url(): | 
					
						
						|  | __all__ += ['parse_audio', 'parse_video', 'parse_image'] | 
					
						
						|  |  |