Skip to content

Python: Improve plan and execute sample #11370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 78 additions & 57 deletions python/samples/concepts/processes/plan_and_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"""
The following code demonstrates a simple plan and execute process using the Semantic Kernel Process Framework.
It defines a multi-step workflow that leverages OpenAI's Responses API and integrated web search tools to generate,
refine, and execute a plan based on a user's query.
refine, and execute a plan based on a user's query. An OpenAI api key is required to run this sample. The Azure OpenAI
Responses API does not yet support the web search tool.

The process is composed of several steps:

Expand All @@ -45,7 +46,9 @@
"""


# Define a method to get a response from the OpenAI agent
#
# 1) Helper to run OpenAI agent
#
async def run_openai_agent(instructions: str, prompt: str, agent_name: str = "GenericAgent") -> str:
client, model = OpenAIResponsesAgent.setup_resources()
agent_tools = [OpenAIResponsesAgent.configure_web_search_tool()]
Expand All @@ -62,7 +65,7 @@ async def run_openai_agent(instructions: str, prompt: str, agent_name: str = "Ge


#
# 1) Global Events
# 2) Global Events
#
class PlanExecuteEvents(str, Enum):
StartProcess = "StartProcess"
Expand All @@ -75,7 +78,7 @@ class PlanExecuteEvents(str, Enum):


#
# 2) Planner Step
# 3) Planner Step
#
class PlannerStepState:
times_called: int = 0
Expand Down Expand Up @@ -111,7 +114,7 @@ async def create_plan(self, user_request: str, context: KernelProcessStepContext


#
# 3) Replan Step
# 4) Replan Step
#
class ReplanStepState:
times_called: int = 0
Expand Down Expand Up @@ -147,7 +150,7 @@ async def refine_plan(self, payload: dict, context: KernelProcessStepContext) ->


#
# 4) Execute Step
# 5) Execute Step
#
class ExecuteStepState:
current_index: int = 0
Expand All @@ -163,19 +166,16 @@ async def activate(self, state: KernelProcessStepState[ExecuteStepState]):
@kernel_function(name=EXECUTE_PLAN)
async def execute_plan(self, payload: dict, context: KernelProcessStepContext) -> dict:
plan = payload["plan"]
partials = payload.get("partials", []) # May be empty on first run
partials = payload.get("partials", [])

if self.state.current_index >= len(plan):
# No more tasks
return {
"partial_result": "All tasks done",
"plan": plan,
"current_index": self.state.current_index,
}

current_task = plan[self.state.current_index]

# Incorporate partial context into the prompt:
prompt = (
f"So far we have these partial results:\n\n{chr(10).join(partials)}\n\nNow your task is: {current_task}"
)
Expand All @@ -185,7 +185,6 @@ async def execute_plan(self, payload: dict, context: KernelProcessStepContext) -
prompt=prompt,
agent_name="ExecuteAgent",
)

partial_result = response.strip()

executed_index = self.state.current_index
Expand All @@ -199,7 +198,7 @@ async def execute_plan(self, payload: dict, context: KernelProcessStepContext) -


#
# 5) Decision Step
# 6) Decision Step
#
class DecisionStepState(KernelBaseModel):
partials: list[str] = Field(default_factory=list)
Expand All @@ -223,38 +222,41 @@ async def make_decision(self, data: dict, context: KernelProcessStepContext):
if partial_result and partial_result.lower() != "all tasks done":
self.state.partials.append(partial_result)

# (A) If "All tasks done"
# A) If "All tasks done"
if partial_result.strip().lower().startswith("all tasks done"):
final_text = "[FINAL] All tasks completed.\n\n=== Aggregated Partial Results ===\n" + "\n\n".join(
self.state.partials
)
await context.emit_event(PlanExecuteEvents.PlanFinished, data=final_text)
# No more tasks
final_text = "\n".join(self.state.partials)
payload = {
"type": "final",
"content": final_text.strip(),
}
await context.emit_event(PlanExecuteEvents.PlanFinished, data=payload)
return

# (B) If physically done all tasks
# B) If physically done all tasks
if current_index >= len(plan):
final_text = "[FINAL] No more tasks.\n\n=== Aggregated Partial Results ===\n" + "\n\n".join(
self.state.partials
)
await context.emit_event(PlanExecuteEvents.PlanFinished, data=final_text)
final_text = "\n".join(self.state.partials)
payload = {
"type": "final",
"content": final_text.strip(),
}
await context.emit_event(PlanExecuteEvents.PlanFinished, data=payload)
return

# (C) Let LLM decide: continue or replan
# C) Otherwise: let LLM decide whether to continue or replan
prompt = (
"We have a plan with remaining tasks.\n"
"PARTIAL RESULTS SO FAR:\n" + "\n".join(self.state.partials) + "\n\n"
"Decide: 'continue' with the next step or 'replan' if something is invalid. "
"DO NOT say 'finish' if tasks remain. Return only 'continue' or 'replan'."
)

response = await run_openai_agent(
instructions="Plan orchestrator with web-search if needed. Only respond 'continue' or 'replan'.",
prompt=prompt,
agent_name="DecisionAgent",
)

decision = response.strip().lower()

print(f"[DecisionStep] LLM decision: {decision}")
self.state.last_decision = decision

Expand All @@ -276,10 +278,11 @@ async def make_decision(self, data: dict, context: KernelProcessStepContext):


#
# 6) Output Step
# 7) Output Step
#
class OutputStepState:
last_message: str = ""
class OutputStepState(KernelBaseModel):
debug_history: list[str] = Field(default_factory=list)
final_answer: str = ""


class OutputStep(KernelProcessStep[OutputStepState]):
Expand All @@ -290,13 +293,22 @@ async def activate(self, state: KernelProcessStepState[OutputStepState]):
self.state = state.state

@kernel_function(name=SHOW_MESSAGE)
async def show_message(self, message: str | list[str]):
self.state.last_message = message
print(f"[OutputStep] {message}")
async def show_message(self, message: dict):
"""Handles either debug messages or final messages."""
msg_type = message.get("type", "debug")
content = message.get("content", "")

if msg_type == "debug":
self.state.debug_history.append(content)
print(content)
else:
# final
self.state.final_answer = content
print("[OutputStep] Storing final result:", content)


#
# 7) Build the Process
# 8) Build the Process
#
def build_process() -> KernelProcess:
builder = ProcessBuilder(name="GeneralPlanAndExecute")
Expand All @@ -311,7 +323,7 @@ def build_process() -> KernelProcess:
# 1) Start => Planner
builder.on_input_event(PlanExecuteEvents.StartProcess).send_event_to(target=planner, parameter_name="user_request")

# 2) Planner => Executor + Output
# 2) Planner => Executor + Output (debug)
planner.on_function_result(PlannerStep.CREATE_PLAN).send_event_to(target=executor, parameter_name="payload")

planner.on_function_result(PlannerStep.CREATE_PLAN).send_event_to(
Expand Down Expand Up @@ -339,7 +351,7 @@ async def main():

# Provide any user question
user_question = "Where was the quarterback of the winning team in the 2014 Super Bowl born?"
print(f"Starting process with: {user_question}")
print(f"Starting process with: '{user_question}'")

process = build_process()
async with await start_local_process(
Expand All @@ -351,38 +363,47 @@ async def main():
visibility=KernelProcessEventVisibility.Public,
),
) as process_context:
# Finally, get the Output
# Retrieve final state
process_state = await process_context.get_state()
output_step_state: KernelProcessStepState[OutputStepState] = next(
(s.state for s in process_state.steps if s.state.name == "OutputStep"), None
)
final_msg = output_step_state.state.last_message if output_step_state else "No final message."
print(f"\n\n[Final State] => {final_msg}")

if output_step_state:
# Final user-facing answer:
final_answer = output_step_state.state.final_answer.strip()

print("\n[Final State]:")
print(final_answer)
else:
print("[Final State]: No final message.")

"""
Sample Output:
Starting process with: 'Where was the quarterback of the winning team in the 2014 Super Bowl born?'
[PlannerStep] Created plan: ['Identify the Winning Team:** Find out which team won the 2014 Super Bowl.
[NFL.com](https://www.nfl.com/)', 'Locate the Quarterback:** Determine who was the quarterback for the winning
team during that game.', 'Research Birthplace:** Search for the birthplace of the identified quarterback.
[Wikipedia](https://www.wikipedia.org/)'] (times_called=1)

Starting process with: Where was the quarterback of the winning team in the 2014 Super Bowl born?
[PlannerStep] Created plan: ['1. Identify the winning team and quarterback of the 2014 Super Bowl.', '2. Find the
birthplace of that quarterback.'] (times_called=1)
[OutputStep] {'plan': ['1. Identify the winning team and quarterback of the 2014 Super Bowl.', '2. Find the
birthplace of that quarterback.']}
[DecisionStep] LLM decision: continue
[DecisionStep] LLM decision: continue
[OutputStep] [FINAL] All tasks completed.

=== Aggregated Partial Results ===
The winning team of the 2014 Super Bowl (Super Bowl XLVIII) was the Seattle Seahawks. The quarterback for the
Seahawks at that time was Russell Wilson.

Russell Wilson was born in Cincinnati, Ohio.


[Final State] => [FINAL] All tasks completed.

=== Aggregated Partial Results ===
The winning team of the 2014 Super Bowl (Super Bowl XLVIII) was the Seattle Seahawks. The quarterback for the
Seahawks at that time was Russell Wilson.
[DecisionStep] LLM decision: continue
[OutputStep] Storing final result: The Seattle Seahawks won Super Bowl XLVIII in 2014, defeating the Denver Broncos
43-8. The Seahawks' defense dominated the game, forcing four turnovers and limiting the Broncos' high-scoring
offense. Linebacker Malcolm Smith was named Super Bowl MVP after returning an interception 69 yards for a
touchdown. ([nfl.com](https://www.nfl.com/news/seattle-seahawks-d-dominates-manning-denver-broncos-to-win-supe-0ap2000000323056?utm_source=openai))
The quarterback for the Seattle Seahawks during Super Bowl XLVIII was Russell Wilson.
Russell Wilson, the quarterback for the Seattle Seahawks during Super Bowl XLVIII, was born on November 29, 1988,
in Cincinnati, Ohio. ([britannica.com](https://www.britannica.com/facts/Russell-Wilson?utm_source=openai))

[Final State]:
The Seattle Seahawks won Super Bowl XLVIII in 2014, defeating the Denver Broncos 43-8. The Seahawks' defense
dominated the game, forcing four turnovers and limiting the Broncos' high-scoring offense. Linebacker Malcolm
Smith was named Super Bowl MVP after returning an interception 69 yards for a touchdown.
([nfl.com](https://www.nfl.com/news/seattle-seahawks-d-dominates-manning-denver-broncos-to-win-supe-0ap2000000323056?utm_source=openai))
The quarterback for the Seattle Seahawks during Super Bowl XLVIII was Russell Wilson.
Russell Wilson, the quarterback for the Seattle Seahawks during Super Bowl XLVIII, was born on November 29, 1988,
in Cincinnati, Ohio. ([britannica.com](https://www.britannica.com/facts/Russell-Wilson?utm_source=openai))
"""


Expand Down
Loading