From 71d69c22a0d145308d8a169a8865fd8fd4857f5b Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Fri, 6 Feb 2026 16:54:46 +0200 Subject: [PATCH] feat: chat runtime support multiple triggers --- pyproject.toml | 4 +- src/uipath/runtime/chat/protocol.py | 6 +- src/uipath/runtime/chat/runtime.py | 40 +++-- tests/test_chat.py | 256 +++++++++++++++++++++++++++- uv.lock | 10 +- 5 files changed, 288 insertions(+), 28 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 01539aa..5d237f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,11 @@ [project] name = "uipath-runtime" -version = "0.6.3" +version = "0.7.0" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" dependencies = [ - "uipath-core>=0.2.0, <0.3.0", + "uipath-core>=0.3.0, <0.4.0", ] classifiers = [ "Intended Audience :: Developers", diff --git a/src/uipath/runtime/chat/protocol.py b/src/uipath/runtime/chat/protocol.py index cab41e4..f52294a 100644 --- a/src/uipath/runtime/chat/protocol.py +++ b/src/uipath/runtime/chat/protocol.py @@ -6,7 +6,7 @@ UiPathConversationMessageEvent, ) -from uipath.runtime.result import UiPathRuntimeResult +from uipath.runtime.resumable.trigger import UiPathResumeTrigger class UiPathChatProtocol(Protocol): @@ -35,12 +35,12 @@ async def emit_message_event( async def emit_interrupt_event( self, - interrupt_event: UiPathRuntimeResult, + resume_trigger: UiPathResumeTrigger, ) -> None: """Wrap and send an interrupt event. Args: - interrupt_event: UiPathConversationInterruptEvent to wrap and send + resume_trigger: UiPathResumeTrigger to wrap and send """ ... diff --git a/src/uipath/runtime/chat/runtime.py b/src/uipath/runtime/chat/runtime.py index 2e88c96..190a19d 100644 --- a/src/uipath/runtime/chat/runtime.py +++ b/src/uipath/runtime/chat/runtime.py @@ -86,25 +86,41 @@ async def stream( if ( runtime_result.status == UiPathRuntimeStatus.SUSPENDED - and runtime_result.trigger - and runtime_result.trigger.trigger_type - == UiPathResumeTriggerType.API + and runtime_result.triggers ): - await self.chat_bridge.emit_interrupt_event(runtime_result) - resume_data = await self.chat_bridge.wait_for_resume() - - # Continue with resumed execution - current_input = resume_data - current_options.resume = True - break + api_triggers = [ + t + for t in runtime_result.triggers + if t.trigger_type == UiPathResumeTriggerType.API + ] + + if api_triggers: + resume_map: dict[str, Any] = {} + + for trigger in api_triggers: + await self.chat_bridge.emit_interrupt_event(trigger) + + resume_data = await self.chat_bridge.wait_for_resume() + + assert trigger.interrupt_id is not None, ( + "Trigger interrupt_id cannot be None" + ) + resume_map[trigger.interrupt_id] = resume_data + + current_input = resume_map + current_options.resume = True + break + else: + # No API triggers - yield result and complete + yield event + execution_completed = True else: yield event execution_completed = True + await self.chat_bridge.emit_exchange_end_event() else: yield event - await self.chat_bridge.emit_exchange_end_event() - async def get_schema(self) -> UiPathRuntimeSchema: """Get schema from the delegate runtime.""" return await self.delegate.get_schema() diff --git a/tests/test_chat.py b/tests/test_chat.py index e9c1525..78db976 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -142,13 +142,15 @@ async def stream( if self.suspend_at_message is not None: # Suspend with API trigger + trigger = UiPathResumeTrigger( + interrupt_id="interrupt-1", + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "confirm_tool_call"}, + ) yield UiPathRuntimeResult( status=UiPathRuntimeStatus.SUSPENDED, - trigger=UiPathResumeTrigger( - interrupt_id="interrupt-1", - trigger_type=UiPathResumeTriggerType.API, - payload={"action": "confirm_tool_call"}, - ), + trigger=trigger, + triggers=[trigger], ) return else: @@ -322,7 +324,10 @@ async def test_chat_runtime_handles_api_trigger_suspension(): # Result should be SUCCESSFUL assert isinstance(result, UiPathRuntimeResult) assert result.status == UiPathRuntimeStatus.SUCCESSFUL - assert result.output == {"resumed": True, "input": {"approved": True}} + assert result.output == { + "resumed": True, + "input": {"interrupt-1": {"approved": True}}, + } cast(AsyncMock, bridge.connect).assert_awaited_once() cast(AsyncMock, bridge.disconnect).assert_awaited_once() @@ -369,3 +374,242 @@ async def test_chat_runtime_yields_events_during_suspension_flow(): for event in events: if isinstance(event, UiPathRuntimeResult): assert event.status != UiPathRuntimeStatus.SUSPENDED + + +class MultiTriggerMockRuntime: + """Mock runtime that suspends with multiple API triggers.""" + + def __init__(self) -> None: + self.execution_count = 0 + + async def dispose(self) -> None: + pass + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + """Execute with multiple trigger suspension.""" + result: UiPathRuntimeResult | None = None + async for event in self.stream(input, cast(UiPathStreamOptions, options)): + if isinstance(event, UiPathRuntimeResult): + result = event + return ( + result + if result + else UiPathRuntimeResult(status=UiPathRuntimeStatus.SUCCESSFUL) + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Stream with multiple trigger suspension.""" + self.execution_count += 1 + is_resume = options and options.resume + + if not is_resume: + # Initial execution - suspend with 3 API triggers + trigger_a = UiPathResumeTrigger( + interrupt_id="email-confirm", + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "send_email", "to": "user@example.com"}, + ) + trigger_b = UiPathResumeTrigger( + interrupt_id="file-delete", + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "delete_file", "path": "/logs/old.txt"}, + ) + trigger_c = UiPathResumeTrigger( + interrupt_id="api-call", + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "call_api", "endpoint": "/users"}, + ) + + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + trigger=trigger_a, + triggers=[trigger_a, trigger_b, trigger_c], + ) + else: + # Resumed - verify all triggers resolved + assert input is not None + assert "email-confirm" in input + assert "file-delete" in input + assert "api-call" in input + + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"resumed": True, "input": input}, + ) + + async def get_schema(self) -> UiPathRuntimeSchema: + raise NotImplementedError() + + +class MixedTriggerMockRuntime: + """Mock runtime that suspends with mixed trigger types (API + non-API).""" + + async def dispose(self) -> None: + pass + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + """Execute with mixed triggers.""" + result: UiPathRuntimeResult | None = None + async for event in self.stream(input, cast(UiPathStreamOptions, options)): + if isinstance(event, UiPathRuntimeResult): + result = event + return ( + result + if result + else UiPathRuntimeResult(status=UiPathRuntimeStatus.SUCCESSFUL) + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Stream with mixed trigger types.""" + is_resume = options and options.resume + + if not is_resume: + # Initial execution - 2 API + 1 QUEUE trigger + trigger_a = UiPathResumeTrigger( + interrupt_id="email-confirm", + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "send_email"}, + ) + trigger_b = UiPathResumeTrigger( + interrupt_id="file-delete", + trigger_type=UiPathResumeTriggerType.API, + payload={"action": "delete_file"}, + ) + trigger_c = UiPathResumeTrigger( + interrupt_id="queue-item", + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + payload={"queue": "inbox", "item": "123"}, + ) + + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + trigger=trigger_a, + triggers=[trigger_a, trigger_b, trigger_c], + ) + else: + # Resumed - verify only API triggers resolved + assert input is not None + assert "email-confirm" in input + assert "file-delete" in input + # QUEUE trigger should NOT be in input (handled externally) + + # Suspend again with only QUEUE trigger + trigger_c = UiPathResumeTrigger( + interrupt_id="queue-item", + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + payload={"queue": "inbox", "item": "123"}, + ) + + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + trigger=trigger_c, + triggers=[trigger_c], + ) + + async def get_schema(self) -> UiPathRuntimeSchema: + raise NotImplementedError() + + +@pytest.mark.asyncio +async def test_chat_runtime_handles_multiple_api_triggers(): + """ChatRuntime should resolve all API triggers before resuming.""" + + runtime_impl = MultiTriggerMockRuntime() + bridge = make_chat_bridge_mock() + + # Bridge returns approval for each trigger + cast(AsyncMock, bridge.wait_for_resume).side_effect = [ + {"approved": True}, # email-confirm + {"approved": True}, # file-delete + {"approved": True}, # api-call + ] + + chat_runtime = UiPathChatRuntime( + delegate=runtime_impl, + chat_bridge=bridge, + ) + + result = await chat_runtime.execute({}) + + await chat_runtime.dispose() + + # Result should be SUCCESSFUL + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert isinstance(result.output, dict) + assert result.output["resumed"] is True + + # Verify all 3 triggers were wrapped with interrupt_ids + resume_input = cast(dict[str, Any], result.output["input"]) + assert "email-confirm" in resume_input + assert "file-delete" in resume_input + assert "api-call" in resume_input + assert resume_input["email-confirm"] == {"approved": True} + assert resume_input["file-delete"] == {"approved": True} + assert resume_input["api-call"] == {"approved": True} + + # Bridge should have been called 3 times (once per trigger) + assert cast(AsyncMock, bridge.emit_interrupt_event).await_count == 3 + assert cast(AsyncMock, bridge.wait_for_resume).await_count == 3 + + # Verify each emit_interrupt_event received a trigger + emit_calls = cast(AsyncMock, bridge.emit_interrupt_event).await_args_list + assert emit_calls[0][0][0].interrupt_id == "email-confirm" + assert emit_calls[1][0][0].interrupt_id == "file-delete" + assert emit_calls[2][0][0].interrupt_id == "api-call" + + +@pytest.mark.asyncio +async def test_chat_runtime_filters_non_api_triggers(): + """ChatRuntime should only handle API triggers, pass through non-API triggers.""" + + runtime_impl = MixedTriggerMockRuntime() + bridge = make_chat_bridge_mock() + + # Bridge returns approval for API triggers only + cast(AsyncMock, bridge.wait_for_resume).side_effect = [ + {"approved": True}, # email-confirm + {"approved": True}, # file-delete + ] + + chat_runtime = UiPathChatRuntime( + delegate=runtime_impl, + chat_bridge=bridge, + ) + + result = await chat_runtime.execute({}) + + await chat_runtime.dispose() + + # Result should be SUSPENDED with QUEUE trigger (non-API) + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 1 + assert result.triggers[0].interrupt_id == "queue-item" + assert result.triggers[0].trigger_type == UiPathResumeTriggerType.QUEUE_ITEM + + # Bridge should have been called only 2 times (for 2 API triggers) + assert cast(AsyncMock, bridge.emit_interrupt_event).await_count == 2 + assert cast(AsyncMock, bridge.wait_for_resume).await_count == 2 + + # Verify only API triggers were emitted + emit_calls = cast(AsyncMock, bridge.emit_interrupt_event).await_args_list + assert emit_calls[0][0][0].interrupt_id == "email-confirm" + assert emit_calls[0][0][0].trigger_type == UiPathResumeTriggerType.API + assert emit_calls[1][0][0].interrupt_id == "file-delete" + assert emit_calls[1][0][0].trigger_type == UiPathResumeTriggerType.API diff --git a/uv.lock b/uv.lock index 5977e4e..70621ad 100644 --- a/uv.lock +++ b/uv.lock @@ -991,21 +991,21 @@ wheels = [ [[package]] name = "uipath-core" -version = "0.2.0" +version = "0.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-sdk" }, { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9a/4f/9bf150a21b6af8b56edf7fbca46827806570eab5b37f90c2b76180cf1e79/uipath_core-0.2.0.tar.gz", hash = "sha256:950427fe7921a67468416856faf63192cf717d8adce092d706b070c487f0c076", size = 103072, upload-time = "2026-01-25T11:59:10.871Z" } +sdist = { url = "https://files.pythonhosted.org/packages/bb/81/9be0cb9d8ad2ebc9501a1a061adf6cd29ddfadd8c36bb8e7585848d955e4/uipath_core-0.3.1.tar.gz", hash = "sha256:312b7939083ba10c282cbd9fac713f22ffc1c0a884b19a1215685a9bdd861272", size = 108467, upload-time = "2026-02-06T13:29:57.201Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e8/43/f61f6aace058d61dfa11e3c2116b06f0bc15c45d9d201bf432902f54018f/uipath_core-0.2.0-py3-none-any.whl", hash = "sha256:bb5366bfca7ec4611f91a0035df194a56eef11f447313491557e131e6090f5e6", size = 32826, upload-time = "2026-01-25T11:59:09.203Z" }, + { url = "https://files.pythonhosted.org/packages/d0/46/0ce5b89c8b92f077350a298006b3b3950c7655fb2b51bdd46a3a4a108834/uipath_core-0.3.1-py3-none-any.whl", hash = "sha256:7449ebf6a159083bf28e11975c1e1ed5ad2631706ae635cab89ffdd01859ecc0", size = 34936, upload-time = "2026-02-06T13:29:56.111Z" }, ] [[package]] name = "uipath-runtime" -version = "0.6.3" +version = "0.7.0" source = { editable = "." } dependencies = [ { name = "uipath-core" }, @@ -1027,7 +1027,7 @@ dev = [ ] [package.metadata] -requires-dist = [{ name = "uipath-core", specifier = ">=0.2.0,<0.3.0" }] +requires-dist = [{ name = "uipath-core", specifier = ">=0.3.0,<0.4.0" }] [package.metadata.requires-dev] dev = [