Spaces:
Running
Running
File size: 6,776 Bytes
fe5c39d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
from __future__ import annotations
import json
from pydantic import BaseModel, Field
from metagpt.actions.di.ask_review import AskReview, ReviewConst
from metagpt.actions.di.write_plan import (
WritePlan,
precheck_update_plan_from_rsp,
update_plan_from_rsp,
)
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.schema import Message, Plan, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.utils.common import remove_comments
STRUCTURAL_CONTEXT = """
## User Requirement
{user_requirement}
## Context
{context}
## Current Plan
{tasks}
## Current Task
{current_task}
"""
PLAN_STATUS = """
## Finished Tasks
### code
```python
{code_written}
```
### execution result
{task_results}
## Current Task
{current_task}
## Task Guidance
Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc.
Specifically, {guidance}
"""
class Planner(BaseModel):
plan: Plan
working_memory: Memory = Field(
default_factory=Memory
) # memory for working on each task, discarded each time a task is done
auto_run: bool = False
def __init__(self, goal: str = "", plan: Plan = None, **kwargs):
plan = plan or Plan(goal=goal)
super().__init__(plan=plan, **kwargs)
@property
def current_task(self):
return self.plan.current_task
@property
def current_task_id(self):
return self.plan.current_task_id
async def update_plan(self, goal: str = "", max_tasks: int = 3, max_retries: int = 3):
if goal:
self.plan = Plan(goal=goal)
plan_confirmed = False
while not plan_confirmed:
context = self.get_useful_memories()
rsp = await WritePlan().run(context, max_tasks=max_tasks)
self.working_memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan))
# precheck plan before asking reviews
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan)
if not is_plan_valid and max_retries > 0:
error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only"
logger.warning(error_msg)
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan))
max_retries -= 1
continue
_, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER)
update_plan_from_rsp(rsp=rsp, current_plan=self.plan)
self.working_memory.clear()
async def process_task_result(self, task_result: TaskResult):
# ask for acceptance, users can other refuse and change tasks in the plan
review, task_result_confirmed = await self.ask_review(task_result)
if task_result_confirmed:
# tick off this task and record progress
await self.confirm_task(self.current_task, task_result, review)
elif "redo" in review:
# Ask the Role to redo this task with help of review feedback,
# useful when the code run is successful but the procedure or result is not what we want
pass # simply pass, not confirming the result
else:
# update plan according to user's feedback and to take on changed tasks
await self.update_plan()
async def ask_review(
self,
task_result: TaskResult = None,
auto_run: bool = None,
trigger: str = ReviewConst.TASK_REVIEW_TRIGGER,
review_context_len: int = 5,
):
"""
Ask to review the task result, reviewer needs to provide confirmation or request change.
If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds;
if auto mode, then the code run has to succeed for the task to be considered completed.
"""
auto_run = auto_run or self.auto_run
if not auto_run:
context = self.get_useful_memories()
review, confirmed = await AskReview().run(
context=context[-review_context_len:], plan=self.plan, trigger=trigger
)
if not confirmed:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
return review, confirmed
confirmed = task_result.is_success if task_result else True
return "", confirmed
async def confirm_task(self, task: Task, task_result: TaskResult, review: str):
task.update_task_result(task_result=task_result)
self.plan.finish_current_task()
self.working_memory.clear()
confirmed_and_more = (
ReviewConst.CONTINUE_WORDS[0] in review.lower() and review.lower() not in ReviewConst.CONTINUE_WORDS[0]
) # "confirm, ... (more content, such as changing downstream tasks)"
if confirmed_and_more:
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview))
await self.update_plan()
def get_useful_memories(self, task_exclude_field=None) -> list[Message]:
"""find useful memories only to reduce context length and improve performance"""
user_requirement = self.plan.goal
context = self.plan.context
tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks]
tasks = json.dumps(tasks, indent=4, ensure_ascii=False)
current_task = self.plan.current_task.json() if self.plan.current_task else {}
context = STRUCTURAL_CONTEXT.format(
user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task
)
context_msg = [Message(content=context, role="user")]
return context_msg + self.working_memory.get()
def get_plan_status(self) -> str:
# prepare components of a plan status
finished_tasks = self.plan.get_finished_tasks()
code_written = [remove_comments(task.code) for task in finished_tasks]
code_written = "\n\n".join(code_written)
task_results = [task.result for task in finished_tasks]
task_results = "\n\n".join(task_results)
task_type_name = self.current_task.task_type
task_type = TaskType.get_type(task_type_name)
guidance = task_type.guidance if task_type else ""
# combine components in a prompt
prompt = PLAN_STATUS.format(
code_written=code_written,
task_results=task_results,
current_task=self.current_task.instruction,
guidance=guidance,
)
return prompt
|