Spaces:
Configuration error
Configuration error
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
import base64 | |
import io | |
import re | |
from dataclasses import dataclass | |
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union | |
import numpy as np | |
from PIL import Image | |
from pydantic import BaseModel | |
from camel.messages import ( | |
FunctionCallFormatter, | |
HermesFunctionFormatter, | |
OpenAIAssistantMessage, | |
OpenAIMessage, | |
OpenAISystemMessage, | |
OpenAIUserMessage, | |
) | |
from camel.messages.conversion import ShareGPTMessage | |
from camel.prompts import CodePrompt, TextPrompt | |
from camel.types import ( | |
OpenAIBackendRole, | |
OpenAIImageType, | |
OpenAIVisionDetailType, | |
RoleType, | |
) | |
from camel.utils import Constants | |
class BaseMessage: | |
r"""Base class for message objects used in CAMEL chat system. | |
Args: | |
role_name (str): The name of the user or assistant role. | |
role_type (RoleType): The type of role, either :obj:`RoleType. | |
ASSISTANT` or :obj:`RoleType.USER`. | |
meta_dict (Optional[Dict[str, str]]): Additional metadata dictionary | |
for the message. | |
content (str): The content of the message. | |
video_bytes (Optional[bytes]): Optional bytes of a video associated | |
with the message. (default: :obj:`None`) | |
image_list (Optional[List[Image.Image]]): Optional list of PIL Image | |
objects associated with the message. (default: :obj:`None`) | |
image_detail (Literal["auto", "low", "high"]): Detail level of the | |
images associated with the message. (default: :obj:`auto`) | |
video_detail (Literal["auto", "low", "high"]): Detail level of the | |
videos associated with the message. (default: :obj:`low`) | |
parsed: Optional[Union[Type[BaseModel], dict]]: Optional object which | |
is parsed from the content. (default: :obj:`None`) | |
""" | |
role_name: str | |
role_type: RoleType | |
meta_dict: Optional[Dict[str, Any]] | |
content: str | |
video_bytes: Optional[bytes] = None | |
image_list: Optional[List[Image.Image]] = None | |
image_detail: Literal["auto", "low", "high"] = "auto" | |
video_detail: Literal["auto", "low", "high"] = "low" | |
parsed: Optional[Union[Type[BaseModel], dict]] = None | |
def make_user_message( | |
cls, | |
role_name: str, | |
content: str, | |
meta_dict: Optional[Dict[str, str]] = None, | |
video_bytes: Optional[bytes] = None, | |
image_list: Optional[List[Image.Image]] = None, | |
image_detail: Union[ | |
OpenAIVisionDetailType, str | |
] = OpenAIVisionDetailType.AUTO, | |
video_detail: Union[ | |
OpenAIVisionDetailType, str | |
] = OpenAIVisionDetailType.LOW, | |
) -> "BaseMessage": | |
r"""Create a new user message. | |
Args: | |
role_name (str): The name of the user role. | |
content (str): The content of the message. | |
meta_dict (Optional[Dict[str, str]]): Additional metadata | |
dictionary for the message. | |
video_bytes (Optional[bytes]): Optional bytes of a video | |
associated with the message. | |
image_list (Optional[List[Image.Image]]): Optional list of PIL | |
Image objects associated with the message. | |
image_detail (Union[OpenAIVisionDetailType, str]): Detail level of | |
the images associated with the message. | |
video_detail (Union[OpenAIVisionDetailType, str]): Detail level of | |
the videos associated with the message. | |
Returns: | |
BaseMessage: The new user message. | |
""" | |
return cls( | |
role_name, | |
RoleType.USER, | |
meta_dict, | |
content, | |
video_bytes, | |
image_list, | |
OpenAIVisionDetailType(image_detail).value, | |
OpenAIVisionDetailType(video_detail).value, | |
) | |
def make_assistant_message( | |
cls, | |
role_name: str, | |
content: str, | |
meta_dict: Optional[Dict[str, str]] = None, | |
video_bytes: Optional[bytes] = None, | |
image_list: Optional[List[Image.Image]] = None, | |
image_detail: Union[ | |
OpenAIVisionDetailType, str | |
] = OpenAIVisionDetailType.AUTO, | |
video_detail: Union[ | |
OpenAIVisionDetailType, str | |
] = OpenAIVisionDetailType.LOW, | |
) -> "BaseMessage": | |
r"""Create a new assistant message. | |
Args: | |
role_name (str): The name of the assistant role. | |
content (str): The content of the message. | |
meta_dict (Optional[Dict[str, str]]): Additional metadata | |
dictionary for the message. | |
video_bytes (Optional[bytes]): Optional bytes of a video | |
associated with the message. | |
image_list (Optional[List[Image.Image]]): Optional list of PIL | |
Image objects associated with the message. | |
image_detail (Union[OpenAIVisionDetailType, str]): Detail level of | |
the images associated with the message. | |
video_detail (Union[OpenAIVisionDetailType, str]): Detail level of | |
the videos associated with the message. | |
Returns: | |
BaseMessage: The new assistant message. | |
""" | |
return cls( | |
role_name, | |
RoleType.ASSISTANT, | |
meta_dict, | |
content, | |
video_bytes, | |
image_list, | |
OpenAIVisionDetailType(image_detail).value, | |
OpenAIVisionDetailType(video_detail).value, | |
) | |
def create_new_instance(self, content: str) -> "BaseMessage": | |
r"""Create a new instance of the :obj:`BaseMessage` with updated | |
content. | |
Args: | |
content (str): The new content value. | |
Returns: | |
BaseMessage: The new instance of :obj:`BaseMessage`. | |
""" | |
return self.__class__( | |
role_name=self.role_name, | |
role_type=self.role_type, | |
meta_dict=self.meta_dict, | |
content=content, | |
) | |
def __add__(self, other: Any) -> Union["BaseMessage", Any]: | |
r"""Addition operator override for :obj:`BaseMessage`. | |
Args: | |
other (Any): The value to be added with. | |
Returns: | |
Union[BaseMessage, Any]: The result of the addition. | |
""" | |
if isinstance(other, BaseMessage): | |
combined_content = self.content.__add__(other.content) | |
elif isinstance(other, str): | |
combined_content = self.content.__add__(other) | |
else: | |
raise TypeError( | |
f"Unsupported operand type(s) for +: '{type(self)}' and " | |
f"'{type(other)}'" | |
) | |
return self.create_new_instance(combined_content) | |
def __mul__(self, other: Any) -> Union["BaseMessage", Any]: | |
r"""Multiplication operator override for :obj:`BaseMessage`. | |
Args: | |
other (Any): The value to be multiplied with. | |
Returns: | |
Union[BaseMessage, Any]: The result of the multiplication. | |
""" | |
if isinstance(other, int): | |
multiplied_content = self.content.__mul__(other) | |
return self.create_new_instance(multiplied_content) | |
else: | |
raise TypeError( | |
f"Unsupported operand type(s) for *: '{type(self)}' and " | |
f"'{type(other)}'" | |
) | |
def __len__(self) -> int: | |
r"""Length operator override for :obj:`BaseMessage`. | |
Returns: | |
int: The length of the content. | |
""" | |
return len(self.content) | |
def __contains__(self, item: str) -> bool: | |
r"""Contains operator override for :obj:`BaseMessage`. | |
Args: | |
item (str): The item to check for containment. | |
Returns: | |
bool: :obj:`True` if the item is contained in the content, | |
:obj:`False` otherwise. | |
""" | |
return item in self.content | |
def extract_text_and_code_prompts( | |
self, | |
) -> Tuple[List[TextPrompt], List[CodePrompt]]: | |
r"""Extract text and code prompts from the message content. | |
Returns: | |
Tuple[List[TextPrompt], List[CodePrompt]]: A tuple containing a | |
list of text prompts and a list of code prompts extracted | |
from the content. | |
""" | |
text_prompts: List[TextPrompt] = [] | |
code_prompts: List[CodePrompt] = [] | |
lines = self.content.split("\n") | |
idx = 0 | |
start_idx = 0 | |
while idx < len(lines): | |
while idx < len(lines) and ( | |
not lines[idx].lstrip().startswith("```") | |
): | |
idx += 1 | |
text = "\n".join(lines[start_idx:idx]).strip() | |
text_prompts.append(TextPrompt(text)) | |
if idx >= len(lines): | |
break | |
code_type = lines[idx].strip()[3:].strip() | |
idx += 1 | |
start_idx = idx | |
while not lines[idx].lstrip().startswith("```"): | |
idx += 1 | |
code = "\n".join(lines[start_idx:idx]).strip() | |
code_prompts.append(CodePrompt(code, code_type=code_type)) | |
idx += 1 | |
start_idx = idx | |
return text_prompts, code_prompts | |
def from_sharegpt( | |
cls, | |
message: ShareGPTMessage, | |
function_format: Optional[FunctionCallFormatter[Any, Any]] = None, | |
role_mapping=None, | |
) -> "BaseMessage": | |
r"""Convert ShareGPT message to BaseMessage or FunctionCallingMessage. | |
Note tool calls and responses have an 'assistant' role in CAMEL | |
Args: | |
message (ShareGPTMessage): ShareGPT message to convert. | |
function_format (FunctionCallFormatter, optional): Function call | |
formatter to use. (default: :obj:`HermesFunctionFormatter()`. | |
role_mapping (Dict[str, List[str, RoleType]], optional): Role | |
mapping to use. Defaults to a CAMEL specific mapping. | |
Returns: | |
BaseMessage: Converted message. | |
""" | |
from camel.messages import FunctionCallingMessage | |
if role_mapping is None: | |
role_mapping = { | |
"system": ["system", RoleType.USER], | |
"human": ["user", RoleType.USER], | |
"gpt": ["assistant", RoleType.ASSISTANT], | |
"tool": ["assistant", RoleType.ASSISTANT], | |
} | |
role_name, role_type = role_mapping[message.from_] | |
if function_format is None: | |
function_format = HermesFunctionFormatter() | |
# Check if this is a function-related message | |
if message.from_ == "gpt": | |
func_info = function_format.extract_tool_calls(message.value) | |
if ( | |
func_info and len(func_info) == 1 | |
): # TODO: Handle multiple tool calls | |
# Including cleaned content is useful to | |
# remind consumers of non-considered content | |
clean_content = re.sub( | |
r"<tool_call>.*?</tool_call>", | |
"", | |
message.value, | |
flags=re.DOTALL, | |
).strip() | |
return FunctionCallingMessage( | |
role_name=role_name, | |
role_type=role_type, | |
meta_dict=None, | |
content=clean_content, | |
func_name=func_info[0].__dict__["name"], | |
args=func_info[0].__dict__["arguments"], | |
) | |
elif message.from_ == "tool": | |
func_r_info = function_format.extract_tool_response(message.value) | |
if func_r_info: | |
return FunctionCallingMessage( | |
role_name=role_name, | |
role_type=role_type, | |
meta_dict=None, | |
content="", | |
func_name=func_r_info.__dict__["name"], | |
result=func_r_info.__dict__["content"], | |
) | |
# Regular message | |
return cls( | |
role_name=role_name, | |
role_type=role_type, | |
meta_dict=None, | |
content=message.value, | |
) | |
def to_sharegpt( | |
self, | |
function_format: Optional[FunctionCallFormatter] = None, | |
) -> ShareGPTMessage: | |
r"""Convert BaseMessage to ShareGPT message | |
Args: | |
function_format (FunctionCallFormatter): Function call formatter | |
to use. Defaults to Hermes. | |
""" | |
if function_format is None: | |
function_format = HermesFunctionFormatter() | |
# Convert role type to ShareGPT 'from' field | |
if self.role_type == RoleType.USER: | |
from_ = "system" if self.role_name == "system" else "human" | |
else: # RoleType.ASSISTANT | |
from_ = "gpt" | |
# Function conversion code in FunctionCallingMessage | |
return ShareGPTMessage(from_=from_, value=self.content) # type: ignore[call-arg] | |
def to_openai_message( | |
self, | |
role_at_backend: OpenAIBackendRole, | |
) -> OpenAIMessage: | |
r"""Converts the message to an :obj:`OpenAIMessage` object. | |
Args: | |
role_at_backend (OpenAIBackendRole): The role of the message in | |
OpenAI chat system. | |
Returns: | |
OpenAIMessage: The converted :obj:`OpenAIMessage` object. | |
""" | |
if role_at_backend == OpenAIBackendRole.SYSTEM: | |
return self.to_openai_system_message() | |
elif role_at_backend == OpenAIBackendRole.USER: | |
return self.to_openai_user_message() | |
elif role_at_backend == OpenAIBackendRole.ASSISTANT: | |
return self.to_openai_assistant_message() | |
else: | |
raise ValueError(f"Unsupported role: {role_at_backend}.") | |
def to_openai_system_message(self) -> OpenAISystemMessage: | |
r"""Converts the message to an :obj:`OpenAISystemMessage` object. | |
Returns: | |
OpenAISystemMessage: The converted :obj:`OpenAISystemMessage` | |
object. | |
""" | |
return {"role": "system", "content": self.content} | |
def to_openai_user_message(self) -> OpenAIUserMessage: | |
r"""Converts the message to an :obj:`OpenAIUserMessage` object. | |
Returns: | |
OpenAIUserMessage: The converted :obj:`OpenAIUserMessage` object. | |
""" | |
hybird_content: List[Any] = [] | |
hybird_content.append( | |
{ | |
"type": "text", | |
"text": self.content, | |
} | |
) | |
if self.image_list and len(self.image_list) > 0: | |
for image in self.image_list: | |
if image.format is None: | |
raise ValueError( | |
f"Image's `format` is `None`, please " | |
f"transform the `PIL.Image.Image` to one of " | |
f"following supported formats, such as " | |
f"{list(OpenAIImageType)}" | |
) | |
image_type: str = image.format.lower() | |
if image_type not in OpenAIImageType: | |
raise ValueError( | |
f"Image type {image.format} " | |
f"is not supported by OpenAI vision model" | |
) | |
with io.BytesIO() as buffer: | |
image.save(fp=buffer, format=image.format) | |
encoded_image = base64.b64encode(buffer.getvalue()).decode( | |
"utf-8" | |
) | |
image_prefix = f"data:image/{image_type};base64," | |
hybird_content.append( | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"{image_prefix}{encoded_image}", | |
"detail": self.image_detail, | |
}, | |
} | |
) | |
if self.video_bytes: | |
import imageio.v3 as iio | |
base64Frames: List[str] = [] | |
frame_count = 0 | |
# read video bytes | |
video = iio.imiter( | |
self.video_bytes, plugin=Constants.VIDEO_DEFAULT_PLUG_PYAV | |
) | |
for frame in video: | |
frame_count += 1 | |
if ( | |
frame_count % Constants.VIDEO_IMAGE_EXTRACTION_INTERVAL | |
== 0 | |
): | |
# convert frame to numpy array | |
frame_array = np.asarray(frame) | |
frame_image = Image.fromarray(frame_array) | |
# Get the dimensions of the frame | |
width, height = frame_image.size | |
# resize the frame to the default image size | |
new_width = Constants.VIDEO_DEFAULT_IMAGE_SIZE | |
aspect_ratio = width / height | |
new_height = int(new_width / aspect_ratio) | |
resized_img = frame_image.resize((new_width, new_height)) | |
# encode the image to base64 | |
with io.BytesIO() as buffer: | |
image_format = OpenAIImageType.JPEG.value | |
image_format = image_format.upper() | |
resized_img.save(fp=buffer, format=image_format) | |
encoded_image = base64.b64encode( | |
buffer.getvalue() | |
).decode("utf-8") | |
base64Frames.append(encoded_image) | |
for encoded_image in base64Frames: | |
item = { | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{encoded_image}", | |
"detail": self.video_detail, | |
}, | |
} | |
hybird_content.append(item) | |
if len(hybird_content) > 1: | |
return { | |
"role": "user", | |
"content": hybird_content, | |
} | |
# This return just for str message | |
else: | |
return { | |
"role": "user", | |
"content": self.content, | |
} | |
def to_openai_assistant_message(self) -> OpenAIAssistantMessage: | |
r"""Converts the message to an :obj:`OpenAIAssistantMessage` object. | |
Returns: | |
OpenAIAssistantMessage: The converted :obj:`OpenAIAssistantMessage` | |
object. | |
""" | |
return {"role": "assistant", "content": self.content} | |
def to_dict(self) -> Dict: | |
r"""Converts the message to a dictionary. | |
Returns: | |
dict: The converted dictionary. | |
""" | |
return { | |
"role_name": self.role_name, | |
"role_type": self.role_type.name, | |
**(self.meta_dict or {}), | |
"content": self.content, | |
} |