from __future__ import annotations import json from pydantic import BaseModel, Field from metagpt.actions.di.ask_review import AskReview, ReviewConst from metagpt.actions.di.write_plan import ( WritePlan, precheck_update_plan_from_rsp, update_plan_from_rsp, ) from metagpt.logs import logger from metagpt.memory import Memory from metagpt.schema import Message, Plan, Task, TaskResult from metagpt.strategy.task_type import TaskType from metagpt.utils.common import remove_comments STRUCTURAL_CONTEXT = """ ## User Requirement {user_requirement} ## Context {context} ## Current Plan {tasks} ## Current Task {current_task} """ PLAN_STATUS = """ ## Finished Tasks ### code ```python {code_written} ``` ### execution result {task_results} ## Current Task {current_task} ## Task Guidance Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc. Specifically, {guidance} """ class Planner(BaseModel): plan: Plan working_memory: Memory = Field( default_factory=Memory ) # memory for working on each task, discarded each time a task is done auto_run: bool = False def __init__(self, goal: str = "", plan: Plan = None, **kwargs): plan = plan or Plan(goal=goal) super().__init__(plan=plan, **kwargs) @property def current_task(self): return self.plan.current_task @property def current_task_id(self): return self.plan.current_task_id async def update_plan(self, goal: str = "", max_tasks: int = 3, max_retries: int = 3): if goal: self.plan = Plan(goal=goal) plan_confirmed = False while not plan_confirmed: context = self.get_useful_memories() rsp = await WritePlan().run(context, max_tasks=max_tasks) self.working_memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan)) # precheck plan before asking reviews is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) if not is_plan_valid and max_retries > 0: error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only" logger.warning(error_msg) self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) max_retries -= 1 continue _, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) update_plan_from_rsp(rsp=rsp, current_plan=self.plan) self.working_memory.clear() async def process_task_result(self, task_result: TaskResult): # ask for acceptance, users can other refuse and change tasks in the plan review, task_result_confirmed = await self.ask_review(task_result) if task_result_confirmed: # tick off this task and record progress await self.confirm_task(self.current_task, task_result, review) elif "redo" in review: # Ask the Role to redo this task with help of review feedback, # useful when the code run is successful but the procedure or result is not what we want pass # simply pass, not confirming the result else: # update plan according to user's feedback and to take on changed tasks await self.update_plan() async def ask_review( self, task_result: TaskResult = None, auto_run: bool = None, trigger: str = ReviewConst.TASK_REVIEW_TRIGGER, review_context_len: int = 5, ): """ Ask to review the task result, reviewer needs to provide confirmation or request change. If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; if auto mode, then the code run has to succeed for the task to be considered completed. """ auto_run = auto_run or self.auto_run if not auto_run: context = self.get_useful_memories() review, confirmed = await AskReview().run( context=context[-review_context_len:], plan=self.plan, trigger=trigger ) if not confirmed: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) return review, confirmed confirmed = task_result.is_success if task_result else True return "", confirmed async def confirm_task(self, task: Task, task_result: TaskResult, review: str): task.update_task_result(task_result=task_result) self.plan.finish_current_task() self.working_memory.clear() confirmed_and_more = ( ReviewConst.CONTINUE_WORDS[0] in review.lower() and review.lower() not in ReviewConst.CONTINUE_WORDS[0] ) # "confirm, ... (more content, such as changing downstream tasks)" if confirmed_and_more: self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) await self.update_plan() def get_useful_memories(self, task_exclude_field=None) -> list[Message]: """find useful memories only to reduce context length and improve performance""" user_requirement = self.plan.goal context = self.plan.context tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks] tasks = json.dumps(tasks, indent=4, ensure_ascii=False) current_task = self.plan.current_task.json() if self.plan.current_task else {} context = STRUCTURAL_CONTEXT.format( user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task ) context_msg = [Message(content=context, role="user")] return context_msg + self.working_memory.get() def get_plan_status(self) -> str: # prepare components of a plan status finished_tasks = self.plan.get_finished_tasks() code_written = [remove_comments(task.code) for task in finished_tasks] code_written = "\n\n".join(code_written) task_results = [task.result for task in finished_tasks] task_results = "\n\n".join(task_results) task_type_name = self.current_task.task_type task_type = TaskType.get_type(task_type_name) guidance = task_type.guidance if task_type else "" # combine components in a prompt prompt = PLAN_STATUS.format( code_written=code_written, task_results=task_results, current_task=self.current_task.instruction, guidance=guidance, ) return prompt