27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 | class OrchestraAgent(BaseAgent):
def __init__(self, config: AgentConfig | str):
"""Initialize the orchestra agent"""
if isinstance(config, str):
config = ConfigLoader.load_agent_config(config)
self.config = config
# init subagents
self.planner_agent = PlannerAgent(config)
self.worker_agents = self._setup_workers()
self.reporter_agent = ReporterAgent(config)
def set_planner(self, planner: PlannerAgent):
self.planner_agent = planner
def _setup_workers(self) -> dict[str, BaseWorkerAgent]:
workers = {}
for name, config in self.config.workers.items():
assert config.type == "simple", f"Only support SimpleAgent as worker in orchestra agent, get {config}"
workers[name] = SimpleWorkerAgent(config=config)
return workers
async def build(self):
await self.planner_agent.build()
for worker_agent in self.worker_agents.values():
await worker_agent.build()
await self.reporter_agent.build()
async def run(self, input: str, trace_id: str = None) -> OrchestraTaskRecorder:
"""Run the orchestra agent
1. plan
2. sequentially execute subtasks
3. report
"""
# setup
trace_id = trace_id or AgentsUtils.gen_trace_id()
logger.info(f"> trace_id: {trace_id}")
# TODO: error_tracing
task_recorder = OrchestraTaskRecorder(task=input, trace_id=trace_id)
with trace(workflow_name="orchestra_agent", trace_id=trace_id):
await self.plan(task_recorder)
for task in task_recorder.plan.todo:
await self.work(task_recorder, task)
result = await self.report(task_recorder)
task_recorder.set_final_output(result.output)
return task_recorder
def run_streamed(self, input: str, trace_id: str = None) -> OrchestraTaskRecorder:
trace_id = trace_id or AgentsUtils.gen_trace_id()
logger.info(f"> trace_id: {trace_id}")
with trace(workflow_name="orchestra_agent", trace_id=trace_id):
task_recorder = OrchestraTaskRecorder(task=input, trace_id=trace_id)
# Kick off the actual agent loop in the background and return the streamed result object.
task_recorder._run_impl_task = asyncio.create_task(self._start_streaming(task_recorder))
return task_recorder
async def _start_streaming(self, task_recorder: OrchestraTaskRecorder):
task_recorder._event_queue.put_nowait(AgentUpdatedStreamEvent(new_agent=self.planner_agent))
plan = await self.plan(task_recorder)
task_recorder._event_queue.put_nowait(OrchestraStreamEvent(name="plan", item=plan))
for task in task_recorder.plan.todo:
# print(f"> processing {task}")
# DONOT send this event because Runner will send it
# task_recorder._event_queue.put_nowait(
# AgentUpdatedStreamEvent(new_agent=self.worker_agents[task.agent_name])
# )
worker_agent = self.worker_agents[task.agent_name]
result_streaming = worker_agent.work_streamed(task_recorder, task)
async for event in result_streaming.stream.stream_events():
task_recorder._event_queue.put_nowait(event)
result_streaming.output = result_streaming.stream.final_output
result_streaming.trajectory = AgentsUtils.get_trajectory_from_agent_result(result_streaming.stream)
task_recorder.add_worker_result(result_streaming)
# print(f"< processed {task}")
task_recorder._event_queue.put_nowait(AgentUpdatedStreamEvent(new_agent=self.reporter_agent))
result = await self.report(task_recorder)
task_recorder.set_final_output(result.output)
task_recorder._event_queue.put_nowait(OrchestraStreamEvent(name="report", item=result))
task_recorder._event_queue.put_nowait(QueueCompleteSentinel())
task_recorder._is_complete = True
async def plan(self, task_recorder: OrchestraTaskRecorder) -> CreatePlanResult:
"""Step1: Plan"""
with function_span("planner") as span_planner:
plan = await self.planner_agent.create_plan(task_recorder)
assert all(t.agent_name in self.worker_agents for t in plan.todo), (
f"agent_name in plan.todo must be in worker_agents, get {plan.todo}"
)
task_recorder.set_plan(plan)
span_planner.span_data.input = json.dumps({"input": task_recorder.task}, ensure_ascii=False)
span_planner.span_data.output = plan.to_dict()
return plan
async def work(self, task_recorder: OrchestraTaskRecorder, task: Subtask) -> WorkerResult:
"""Step2: Work"""
worker_agent = self.worker_agents[task.agent_name]
result = await worker_agent.work(task_recorder, task)
task_recorder.add_worker_result(result)
return result
async def report(self, task_recorder: OrchestraTaskRecorder) -> AnalysisResult:
"""Step3: Report"""
with function_span("reporter") as span_fn:
analysis_result = await self.reporter_agent.report(task_recorder)
task_recorder.add_reporter_result(analysis_result)
span_fn.span_data.input = json.dumps(
{
"input": task_recorder.task,
"task_records": [{"task": r.task, "output": r.output} for r in task_recorder.task_records],
},
ensure_ascii=False,
)
span_fn.span_data.output = analysis_result.to_dict()
return analysis_result
|