From ea97e16ddebc0eb329ee1fa8ed9a51680017aaff Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Fri, 16 Jan 2026 13:30:35 -0800 Subject: [PATCH] refactor: Refactoring Runner to reduce deep nesting The goal is to have easier to read code for future improvements. Note: I also made a minor improvement for Invocation context. PiperOrigin-RevId: 857283632 --- .../java/com/google/adk/runner/Runner.java | 206 ++++++++---------- 1 file changed, 90 insertions(+), 116 deletions(-) diff --git a/core/src/main/java/com/google/adk/runner/Runner.java b/core/src/main/java/com/google/adk/runner/Runner.java index 7df90c51..155511bb 100644 --- a/core/src/main/java/com/google/adk/runner/Runner.java +++ b/core/src/main/java/com/google/adk/runner/Runner.java @@ -312,9 +312,8 @@ private Single appendNewMessageToSession( } if (this.artifactService != null && saveInputBlobsAsArtifacts) { - // The runner directly saves the artifacts (if applicable) in the - // user message and replaces the artifact data with a file name - // placeholder. + // The runner directly saves the artifacts (if applicable) in the user message and replaces + // the artifact data with a file name placeholder. for (int i = 0; i < newMessage.parts().get().size(); i++) { Part part = newMessage.parts().get().get(i); if (part.inlineData().isEmpty()) { @@ -426,12 +425,10 @@ public Flowable runAsync( // Create initial context InvocationContext initialContext = - newInvocationContextBuilder( - session, - Optional.of(newMessage), - /* liveRequestQueue= */ Optional.empty(), - runConfig) + newInvocationContextBuilder(session) .invocationId(invocationId) + .runConfig(runConfig) + .userContent(newMessage) .build(); return Telemetry.traceFlowable( @@ -455,6 +452,9 @@ public Flowable runAsync( : Single.just(null)) .flatMapPublisher( event -> { + if (event == null) { + return Flowable.empty(); + } // Get the updated session after the message and state delta are // applied return this.sessionService @@ -464,80 +464,14 @@ public Flowable runAsync( session.id(), Optional.empty()) .flatMapPublisher( - updatedSession -> { - // Create context with updated session for - // beforeRunCallback - InvocationContext contextWithUpdatedSession = - newInvocationContextBuilder( - updatedSession, - event.content(), - /* liveRequestQueue= */ Optional.empty(), - runConfig) - .invocationId(invocationId) - .agent( - this.findAgentToRun( - updatedSession, rootAgent)) - .build(); - - // Call beforeRunCallback with updated session - Maybe beforeRunEvent = - this.pluginManager - .beforeRunCallback(contextWithUpdatedSession) - .map( - content -> - Event.builder() - .id(Event.generateEventId()) - .invocationId( - contextWithUpdatedSession - .invocationId()) - .author("model") - .content(Optional.of(content)) - .build()); - - // Agent execution - Flowable agentEvents = - contextWithUpdatedSession - .agent() - .runAsync(contextWithUpdatedSession) - .flatMap( - agentEvent -> - this.sessionService - .appendEvent( - updatedSession, agentEvent) - .flatMap( - registeredEvent -> { - // TODO: remove this hack - // after - // deprecating runAsync with - // Session. - copySessionStates( - updatedSession, - session); - return contextWithUpdatedSession - .combinedPlugin() - .onEventCallback( - contextWithUpdatedSession, - registeredEvent) - .defaultIfEmpty( - registeredEvent); - }) - .toFlowable()); - - // If beforeRunCallback returns content, emit it and - // skip - // agent - return beforeRunEvent - .toFlowable() - .switchIfEmpty(agentEvents) - .concatWith( - Completable.defer( - () -> - pluginManager.runAfterRunCallback( - contextWithUpdatedSession))) - .concatWith( - Completable.defer( - () -> compactEvents(updatedSession))); - }); + updatedSession -> + runAgentWithFreshSession( + session, + updatedSession, + event, + invocationId, + runConfig, + rootAgent)); })) .doOnError( throwable -> { @@ -552,6 +486,64 @@ public Flowable runAsync( } } + private Flowable runAgentWithFreshSession( + Session session, + Session updatedSession, + Event event, + String invocationId, + RunConfig runConfig, + BaseAgent rootAgent) { + // Create context with updated session for beforeRunCallback + InvocationContext contextWithUpdatedSession = + newInvocationContextBuilder(updatedSession) + .invocationId(invocationId) + .agent(this.findAgentToRun(updatedSession, rootAgent)) + .runConfig(runConfig) + .userContent(event.content().orElseGet(Content::fromParts)) + .build(); + + // Call beforeRunCallback with updated session + Maybe beforeRunEvent = + this.pluginManager + .beforeRunCallback(contextWithUpdatedSession) + .map( + content -> + Event.builder() + .id(Event.generateEventId()) + .invocationId(contextWithUpdatedSession.invocationId()) + .author("model") + .content(Optional.of(content)) + .build()); + + // Agent execution + Flowable agentEvents = + contextWithUpdatedSession + .agent() + .runAsync(contextWithUpdatedSession) + .flatMap( + agentEvent -> + this.sessionService + .appendEvent(updatedSession, agentEvent) + .flatMap( + registeredEvent -> { + // TODO: remove this hack after deprecating runAsync with Session. + copySessionStates(updatedSession, session); + return contextWithUpdatedSession + .combinedPlugin() + .onEventCallback(contextWithUpdatedSession, registeredEvent) + .defaultIfEmpty(registeredEvent); + }) + .toFlowable()); + + // If beforeRunCallback returns content, emit it and skip agent + return beforeRunEvent + .toFlowable() + .switchIfEmpty(agentEvents) + .concatWith( + Completable.defer(() -> pluginManager.runAfterRunCallback(contextWithUpdatedSession))) + .concatWith(Completable.defer(() -> compactEvents(updatedSession))); + } + private Completable compactEvents(Session session) { return Optional.ofNullable(eventsCompactionConfig) .map(SlidingWindowEventCompactor::new) @@ -590,43 +582,25 @@ private InvocationContext newInvocationContextForLive( runConfigBuilder.setInputAudioTranscription(AudioTranscriptionConfig.builder().build()); } } - return newInvocationContext( - session, /* newMessage= */ Optional.empty(), liveRequestQueue, runConfigBuilder.build()); + InvocationContext.Builder builder = + newInvocationContextBuilder(session) + .runConfig(runConfigBuilder.build()) + .userContent(Content.fromParts()); + liveRequestQueue.ifPresent(builder::liveRequestQueue); + return builder.build(); } - /** - * Creates an {@link InvocationContext} for the given session, request queue, and config. - * - * @return a new {@link InvocationContext}. - */ - private InvocationContext newInvocationContext( - Session session, - Optional newMessage, - Optional liveRequestQueue, - RunConfig runConfig) { - return newInvocationContextBuilder(session, newMessage, liveRequestQueue, runConfig).build(); - } - - private InvocationContext.Builder newInvocationContextBuilder( - Session session, - Optional newMessage, - Optional liveRequestQueue, - RunConfig runConfig) { + private InvocationContext.Builder newInvocationContextBuilder(Session session) { BaseAgent rootAgent = this.agent; - var invocationContextBuilder = - InvocationContext.builder() - .sessionService(this.sessionService) - .artifactService(this.artifactService) - .memoryService(this.memoryService) - .pluginManager(this.pluginManager) - .agent(rootAgent) - .session(session) - .userContent(newMessage.orElseGet(() -> Content.fromParts())) - .runConfig(runConfig) - .resumabilityConfig(this.resumabilityConfig) - .agent(this.findAgentToRun(session, rootAgent)); - liveRequestQueue.ifPresent(invocationContextBuilder::liveRequestQueue); - return invocationContextBuilder; + return InvocationContext.builder() + .sessionService(this.sessionService) + .artifactService(this.artifactService) + .memoryService(this.memoryService) + .pluginManager(this.pluginManager) + .agent(rootAgent) + .session(session) + .resumabilityConfig(this.resumabilityConfig) + .agent(this.findAgentToRun(session, rootAgent)); } /**