From 065bbca51f81258c7891991fed0a2cbecdf1531b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 17:16:56 -0700 Subject: [PATCH 1/8] Prevent task loss on shutdown when server is capable --- .../internal/worker/ActivityPollTask.java | 2 + .../internal/worker/ActivityWorker.java | 7 +- .../worker/AsyncActivityPollTask.java | 2 + .../internal/worker/AsyncNexusPollTask.java | 3 + .../temporal/internal/worker/AsyncPoller.java | 26 ++-- .../worker/AsyncWorkflowPollTask.java | 3 + .../temporal/internal/worker/BasePoller.java | 33 +++-- .../internal/worker/MultiThreadedPoller.java | 5 +- .../worker/NamespaceCapabilities.java | 10 ++ .../internal/worker/NexusPollTask.java | 2 + .../temporal/internal/worker/NexusWorker.java | 7 +- .../internal/worker/SingleWorkerOptions.java | 19 ++- .../internal/worker/WorkflowPollTask.java | 2 + .../internal/worker/WorkflowWorker.java | 67 ++++------ .../main/java/io/temporal/worker/Worker.java | 115 ++++++++++++++---- .../io/temporal/worker/WorkerFactory.java | 6 + .../internal/worker/AsyncPollerTest.java | 2 +- .../internal/worker/SlotSupplierTest.java | 2 + .../worker/StickyQueueBacklogTest.java | 1 + 19 files changed, 213 insertions(+), 101 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index b015039566..1dceb67fb0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -37,6 +37,7 @@ public ActivityPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, double activitiesPerSecond, @Nonnull TrackingSlotSupplier slotSupplier, @@ -53,6 +54,7 @@ public ActivityPollTask( .setNamespace(namespace) .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); if (activitiesPerSecond > 0) { pollRequest.setTaskQueueMetadata( TaskQueueMetadata.newBuilder() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 520ce7a373..d2fddde3f3 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -105,6 +105,7 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), taskQueueActivitiesPerSecond, this.slotSupplier, @@ -113,7 +114,7 @@ public boolean start() { pollerTracker), this.pollTaskExecutor, pollerOptions, - namespaceCapabilities.isPollerAutoscaling(), + namespaceCapabilities, workerMetricsScope); } else { @@ -125,6 +126,7 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), taskQueueActivitiesPerSecond, this.slotSupplier, @@ -133,7 +135,8 @@ public boolean start() { pollerTracker), this.pollTaskExecutor, pollerOptions, - workerMetricsScope); + workerMetricsScope, + namespaceCapabilities); } poller.start(); workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java index 60ebcbf654..b23d161845 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java @@ -43,6 +43,7 @@ public AsyncActivityPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, double activitiesPerSecond, @Nonnull TrackingSlotSupplier slotSupplier, @@ -59,6 +60,7 @@ public AsyncActivityPollTask( .setNamespace(namespace) .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); if (activitiesPerSecond > 0) { pollRequest.setTaskQueueMetadata( TaskQueueMetadata.newBuilder() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java index efc4dc8077..1ba3b84d15 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java @@ -41,6 +41,7 @@ public AsyncNexusPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull Scope metricsScope, @Nonnull Supplier serverCapabilities, @@ -57,6 +58,8 @@ public AsyncNexusPollTask( .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); + if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequest.setDeploymentOptions( WorkerVersioningProtoUtils.deploymentOptionsToProto( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java index 5106343792..7859484bbe 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java @@ -29,7 +29,6 @@ final class AsyncPoller extends BasePoller { private final List> asyncTaskPollers; private final PollerOptions pollerOptions; private final PollerBehaviorAutoscaling pollerBehavior; - private final boolean serverSupportsAutoscaling; private final Scope workerMetricsScope; private Throttler pollRateThrottler; private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = @@ -43,7 +42,7 @@ final class AsyncPoller extends BasePoller { PollTaskAsync asyncTaskPoller, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - boolean serverSupportsAutoscaling, + NamespaceCapabilities namespaceCapabilities, Scope workerMetricsScope) { this( slotSupplier, @@ -51,7 +50,7 @@ final class AsyncPoller extends BasePoller { Collections.singletonList(asyncTaskPoller), taskExecutor, pollerOptions, - serverSupportsAutoscaling, + namespaceCapabilities, workerMetricsScope); } @@ -61,9 +60,9 @@ final class AsyncPoller extends BasePoller { List> asyncTaskPollers, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - boolean serverSupportsAutoscaling, + NamespaceCapabilities namespaceCapabilities, Scope workerMetricsScope) { - super(taskExecutor); + super(taskExecutor, namespaceCapabilities); Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null"); Objects.requireNonNull(slotReservationData, "slotReservation data should not be null"); Objects.requireNonNull(asyncTaskPollers, "asyncTaskPollers should not be null"); @@ -82,7 +81,6 @@ final class AsyncPoller extends BasePoller { + " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported."); } this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior(); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; this.pollerOptions = pollerOptions; this.workerMetricsScope = workerMetricsScope; } @@ -114,7 +112,7 @@ public boolean start() { pollerBehavior.getMinConcurrentTaskPollers(), pollerBehavior.getMaxConcurrentTaskPollers(), pollerBehavior.getInitialConcurrentTaskPollers(), - serverSupportsAutoscaling, + namespaceCapabilities.isPollerAutoscaling(), (newTarget) -> { log.debug( "Updating maximum number of pollers for {} to: {}", @@ -136,12 +134,14 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean return super.shutdown(shutdownManager, interruptTasks) .thenApply( (f) -> { - for (PollTaskAsync asyncTaskPoller : asyncTaskPollers) { - try { - log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel()); - asyncTaskPoller.cancel(new RuntimeException("Shutting down poller")); - } catch (Throwable e) { - log.error("Error while cancelling poll task", e); + if (!namespaceCapabilities.isGracefulPollShutdown()) { + for (PollTaskAsync asyncTaskPoller : asyncTaskPollers) { + try { + log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel()); + asyncTaskPoller.cancel(new RuntimeException("Shutting down poller")); + } catch (Throwable e) { + log.error("Error while cancelling poll task", e); + } } } return null; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java index c30dbc9e18..3bfa796a30 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java @@ -52,6 +52,7 @@ public AsyncWorkflowPollTask( @Nonnull String taskQueue, @Nullable String stickyTaskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, @@ -67,6 +68,8 @@ public AsyncWorkflowPollTask( .setNamespace(Objects.requireNonNull(namespace)) .setIdentity(Objects.requireNonNull(identity)); + pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey); + if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequestBuilder.setDeploymentOptions( WorkerVersioningProtoUtils.deploymentOptionsToProto( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java index 9b8141fc02..9e685bbcae 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java @@ -27,9 +27,14 @@ abstract class BasePoller implements SuspendableWorker { protected ExecutorService pollExecutor; - protected BasePoller(ShutdownableTaskExecutor taskExecutor) { + protected final NamespaceCapabilities namespaceCapabilities; + + protected BasePoller( + ShutdownableTaskExecutor taskExecutor, NamespaceCapabilities namespaceCapabilities) { Objects.requireNonNull(taskExecutor, "taskExecutor should not be null"); this.taskExecutor = taskExecutor; + this.namespaceCapabilities = + Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null"); } @Override @@ -55,15 +60,23 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean return CompletableFuture.completedFuture(null); } - return shutdownManager - // it's ok to forcefully shutdown pollers, because they are stuck in a long poll call - // so we don't risk loosing any progress doing that. - .shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1)) - .exceptionally( - e -> { - log.error("Unexpected exception during shutdown", e); - return null; - }); + CompletableFuture pollExecutorShutdown; + if (namespaceCapabilities.isGracefulPollShutdown()) { + // When graceful poll shutdown is enabled, the server will complete outstanding polls with + // empty responses after ShutdownWorker is called. We simply wait for polls to return. + pollExecutorShutdown = + shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor"); + } else { + // Old behaviour forcibly stops outstanding polls. + pollExecutorShutdown = + shutdownManager.shutdownExecutorNow( + pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1)); + } + return pollExecutorShutdown.exceptionally( + e -> { + log.error("Unexpected exception during shutdown", e); + return null; + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java index 8dcaa6f33a..7fe0335b15 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java @@ -52,8 +52,9 @@ public MultiThreadedPoller( PollTask pollTask, ShutdownableTaskExecutor taskExecutor, PollerOptions pollerOptions, - Scope workerMetricsScope) { - super(taskExecutor); + Scope workerMetricsScope, + NamespaceCapabilities namespaceCapabilities) { + super(taskExecutor, namespaceCapabilities); Objects.requireNonNull(identity, "identity cannot be null"); Objects.requireNonNull(pollTask, "poll service should not be null"); Objects.requireNonNull(pollerOptions, "pollerOptions should not be null"); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java index 4fa9d09a56..96586259b3 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java @@ -9,8 +9,10 @@ */ public final class NamespaceCapabilities { private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); + private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false); private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false); + public boolean isPollerAutoscaling() { return pollerAutoscaling.get(); } @@ -19,6 +21,14 @@ public void setPollerAutoscaling(boolean value) { pollerAutoscaling.set(value); } + public boolean isGracefulPollShutdown() { + return gracefulPollShutdown.get(); + } + + public void setGracefulPollShutdown(boolean value) { + gracefulPollShutdown.set(value); + } + public boolean isWorkerHeartbeats() { return workerHeartbeats.get(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java index 4116825b9e..0ccab59443 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java @@ -34,6 +34,7 @@ public NexusPollTask( @Nonnull String namespace, @Nonnull String taskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, @@ -49,6 +50,7 @@ public NexusPollTask( .setNamespace(namespace) .setIdentity(identity) .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue)); + pollRequest.setWorkerInstanceKey(workerInstanceKey); if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequest.setDeploymentOptions( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index d826e55437..ac364a747e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -111,6 +111,7 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), workerMetricsScope, service.getServerCapabilities(), @@ -118,7 +119,7 @@ public boolean start() { pollerTracker), this.pollTaskExecutor, pollerOptions, - namespaceCapabilities.isPollerAutoscaling(), + namespaceCapabilities, workerMetricsScope); } else { poller = @@ -129,6 +130,7 @@ public boolean start() { namespace, taskQueue, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), this.slotSupplier, workerMetricsScope, @@ -136,7 +138,8 @@ public boolean start() { pollerTracker), this.pollTaskExecutor, pollerOptions, - workerMetricsScope); + workerMetricsScope, + namespaceCapabilities); } poller.start(); workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java index f8baba01db..5593707720 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java @@ -40,6 +40,7 @@ public static final class Builder { private Duration drainStickyTaskQueueTimeout; private boolean usingVirtualThreads; private WorkerDeploymentOptions deploymentOptions; + private String workerInstanceKey; private Builder() {} @@ -64,6 +65,7 @@ private Builder(SingleWorkerOptions options) { this.drainStickyTaskQueueTimeout = options.getDrainStickyTaskQueueTimeout(); this.usingVirtualThreads = options.isUsingVirtualThreads(); this.deploymentOptions = options.getDeploymentOptions(); + this.workerInstanceKey = options.getWorkerInstanceKey(); } public Builder setIdentity(String identity) { @@ -155,6 +157,11 @@ public Builder setDeploymentOptions(WorkerDeploymentOptions deploymentOptions) { return this; } + public Builder setWorkerInstanceKey(String workerInstanceKey) { + this.workerInstanceKey = workerInstanceKey; + return this; + } + public SingleWorkerOptions build() { PollerOptions pollerOptions = this.pollerOptions; if (pollerOptions == null) { @@ -193,7 +200,8 @@ public SingleWorkerOptions build() { this.defaultHeartbeatThrottleInterval, drainStickyTaskQueueTimeout, usingVirtualThreads, - this.deploymentOptions); + this.deploymentOptions, + this.workerInstanceKey); } } @@ -214,6 +222,7 @@ public SingleWorkerOptions build() { private final Duration drainStickyTaskQueueTimeout; private final boolean usingVirtualThreads; private final WorkerDeploymentOptions deploymentOptions; + private final String workerInstanceKey; private SingleWorkerOptions( String identity, @@ -232,7 +241,8 @@ private SingleWorkerOptions( Duration defaultHeartbeatThrottleInterval, Duration drainStickyTaskQueueTimeout, boolean usingVirtualThreads, - WorkerDeploymentOptions deploymentOptions) { + WorkerDeploymentOptions deploymentOptions, + String workerInstanceKey) { this.identity = identity; this.binaryChecksum = binaryChecksum; this.buildId = buildId; @@ -250,6 +260,7 @@ private SingleWorkerOptions( this.drainStickyTaskQueueTimeout = drainStickyTaskQueueTimeout; this.usingVirtualThreads = usingVirtualThreads; this.deploymentOptions = deploymentOptions; + this.workerInstanceKey = workerInstanceKey; } public String getIdentity() { @@ -331,6 +342,10 @@ public WorkerDeploymentOptions getDeploymentOptions() { return deploymentOptions; } + public String getWorkerInstanceKey() { + return workerInstanceKey; + } + public WorkerVersioningOptions getWorkerVersioningOptions() { return new WorkerVersioningOptions( this.getBuildId(), this.isUsingBuildIdForVersioning(), this.getDeploymentOptions()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index cdb5e51639..18607b5d1e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -47,6 +47,7 @@ public WorkflowPollTask( @Nonnull String taskQueue, @Nullable String stickyTaskQueue, @Nonnull String identity, + @Nonnull String workerInstanceKey, @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull StickyQueueBalancer stickyQueueBalancer, @@ -73,6 +74,7 @@ public WorkflowPollTask( PollWorkflowTaskQueueRequest.newBuilder() .setNamespace(Objects.requireNonNull(namespace)) .setIdentity(Objects.requireNonNull(identity)); + pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey); if (versioningOptions.getWorkerDeploymentOptions() != null) { pollRequestBuilder.setDeploymentOptions( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index f98316d5d8..1c36f5c521 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -41,7 +41,6 @@ import org.slf4j.MDC; final class WorkflowWorker implements SuspendableWorker { - private static final String GRACEFUL_SHUTDOWN_MESSAGE = "graceful shutdown"; private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class); private final WorkflowRunLockManager runLocks; @@ -133,6 +132,7 @@ public boolean start() { taskQueue, null, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, @@ -146,6 +146,7 @@ public boolean start() { taskQueue, stickyTaskQueueName, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, @@ -162,6 +163,7 @@ public boolean start() { taskQueue, null, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, @@ -175,7 +177,7 @@ public boolean start() { pollers, this.pollTaskExecutor, pollerOptions, - namespaceCapabilities.isPollerAutoscaling(), + namespaceCapabilities, workerMetricsScope); } else { PollerBehaviorSimpleMaximum pollerBehavior = @@ -193,6 +195,7 @@ public boolean start() { taskQueue, stickyTaskQueueName, options.getIdentity(), + options.getWorkerInstanceKey(), options.getWorkerVersioningOptions(), slotSupplier, stickyQueueBalancer, @@ -202,7 +205,8 @@ public boolean start() { stickyPollerTracker), pollTaskExecutor, pollerOptions, - workerMetricsScope); + workerMetricsScope, + namespaceCapabilities); } poller.start(); workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1); @@ -232,46 +236,23 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout()) : CompletableFuture.completedFuture(null)) .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks)); - return CompletableFuture.allOf( - pollerShutdown.thenCompose( - ignore -> { - ShutdownWorkerRequest.Builder shutdownReq = - ShutdownWorkerRequest.newBuilder() - .setIdentity(options.getIdentity()) - .setNamespace(namespace) - .setTaskQueue(taskQueue) - .setWorkerInstanceKey(workerInstanceKey) - .setReason(GRACEFUL_SHUTDOWN_MESSAGE) - .addAllTaskQueueTypes(activeTaskQueueTypesSupplier.get()); - if (stickyTaskQueueName != null) { - shutdownReq.setStickyTaskQueue(stickyTaskQueueName); - } - if (heartbeatSupplier != null) { - shutdownReq.setWorkerHeartbeat( - heartbeatSupplier.get().toBuilder() - .setStatus(WorkerStatus.WORKER_STATUS_SHUTTING_DOWN) - .build()); - } - return shutdownManager.waitOnWorkerShutdownRequest( - service.futureStub().shutdownWorker(shutdownReq.build())); - }), - pollerShutdown - .thenCompose( - ignore -> - !interruptTasks - ? shutdownManager.waitForSupplierPermitsReleasedUnlimited( - slotSupplier, supplierName) - : CompletableFuture.completedFuture(null)) - .thenCompose( - ignore -> - pollTaskExecutor != null - ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks) - : CompletableFuture.completedFuture(null)) - .exceptionally( - e -> { - log.error("Unexpected exception during shutdown", e); - return null; - })); + return pollerShutdown + .thenCompose( + ignore -> + !interruptTasks + ? shutdownManager.waitForSupplierPermitsReleasedUnlimited( + slotSupplier, supplierName) + : CompletableFuture.completedFuture(null)) + .thenCompose( + ignore -> + pollTaskExecutor != null + ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks) + : CompletableFuture.completedFuture(null)) + .exceptionally( + e -> { + log.error("Unexpected exception during shutdown", e); + return null; + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index ce599c6ad7..1415fda39e 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -13,6 +13,7 @@ import io.temporal.api.worker.v1.WorkerHostInfo; import io.temporal.api.worker.v1.WorkerPollerInfo; import io.temporal.api.worker.v1.WorkerSlotsInfo; +import io.temporal.api.workflowservice.v1.ShutdownWorkerRequest; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.Experimental; @@ -26,6 +27,7 @@ import io.temporal.internal.worker.*; import io.temporal.internal.worker.TaskCounter; import io.temporal.serviceclient.MetricsTag; +import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.Version; import io.temporal.worker.tuning.*; import io.temporal.workflow.Functions; @@ -59,7 +61,13 @@ public final class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); private final WorkerOptions options; private final String taskQueue; + private final String workerInstanceKey = UUID.randomUUID().toString(); private final List plugins; + private final WorkflowServiceStubs service; + private final String namespace; + private final String identity; + private final String stickyTaskQueueName; + private final NamespaceCapabilities namespaceCapabilities; final SyncWorkflowWorker workflowWorker; final SyncActivityWorker activityWorker; final SyncNexusWorker nexusWorker; @@ -106,22 +114,31 @@ private static final class TaskSnapshot { @Nonnull NamespaceCapabilities namespaceCapabilities) { Objects.requireNonNull(client, "client should not be null"); + this.namespaceCapabilities = + Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null"); this.plugins = Objects.requireNonNull(plugins, "plugins should not be null"); Preconditions.checkArgument( !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); this.taskQueue = taskQueue; + this.service = client.getWorkflowServiceStubs(); this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults(); this.clientOptions = client.getOptions(); this.cache = cache; factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); WorkflowClientOptions clientOptions = client.getOptions(); String namespace = clientOptions.getNamespace(); + this.namespace = namespace; Map tags = new ImmutableMap.Builder(1).put(MetricsTag.TASK_QUEUE, taskQueue).build(); Scope taggedScope = metricsScope.tagged(tags); SingleWorkerOptions activityOptions = toActivityOptions( - factoryOptions, this.options, clientOptions, contextPropagators, taggedScope); + factoryOptions, + this.options, + clientOptions, + contextPropagators, + taggedScope, + workerInstanceKey); if (this.options.isLocalActivityWorkerOnly()) { activityWorker = null; } else { @@ -149,7 +166,12 @@ private static final class TaskSnapshot { SingleWorkerOptions nexusOptions = toNexusOptions( - factoryOptions, this.options, clientOptions, contextPropagators, taggedScope); + factoryOptions, + this.options, + clientOptions, + contextPropagators, + taggedScope, + workerInstanceKey); SlotSupplier nexusSlotSupplier = this.options.getWorkerTuner() == null ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentNexusExecutionSize()) @@ -167,10 +189,16 @@ private static final class TaskSnapshot { clientOptions, taskQueue, contextPropagators, - taggedScope); + taggedScope, + workerInstanceKey); SingleWorkerOptions localActivityOptions = toLocalActivityOptions( - factoryOptions, this.options, clientOptions, contextPropagators, taggedScope); + factoryOptions, + this.options, + clientOptions, + contextPropagators, + taggedScope, + workerInstanceKey); SlotSupplier workflowSlotSupplier = this.options.getWorkerTuner() == null @@ -183,6 +211,10 @@ private static final class TaskSnapshot { : this.options.getWorkerTuner().getLocalActivitySlotSupplier(); attachMetricsToResourceController(taggedScope, localActivitySlotSupplier); + this.identity = singleWorkerOptions.getIdentity(); + this.stickyTaskQueueName = + useStickyTaskQueue ? getStickyTaskQueueName(client.getOptions().getIdentity()) : null; + workflowWorker = new SyncWorkflowWorker( client, @@ -194,7 +226,7 @@ private static final class TaskSnapshot { localActivityOptions, runLocks, cache, - useStickyTaskQueue ? getStickyTaskQueueName(client.getOptions().getIdentity()) : null, + stickyTaskQueueName, workflowThreadExecutor, eagerActivityDispatcher, workflowSlotSupplier, @@ -454,19 +486,40 @@ void start() { } CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptUserTasks) { - shuttingDown = true; - CompletableFuture workflowWorkerShutdownFuture = - workflowWorker.shutdown(shutdownManager, interruptUserTasks); - CompletableFuture nexusWorkerShutdownFuture = - nexusWorker.shutdown(shutdownManager, interruptUserTasks); + ShutdownWorkerRequest.Builder requestBuilder = + ShutdownWorkerRequest.newBuilder() + .setNamespace(namespace) + .setIdentity(identity) + .setWorkerInstanceKey(workerInstanceKey) + .setTaskQueue(taskQueue) + .setReason("graceful shutdown") + .addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) + .addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_NEXUS); if (activityWorker != null) { - return CompletableFuture.allOf( - activityWorker.shutdown(shutdownManager, interruptUserTasks), - workflowWorkerShutdownFuture, - nexusWorkerShutdownFuture); - } else { - return CompletableFuture.allOf(workflowWorkerShutdownFuture, nexusWorkerShutdownFuture); + requestBuilder.addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY); + } + if (stickyTaskQueueName != null) { + requestBuilder.setStickyTaskQueue(stickyTaskQueueName); } + CompletableFuture shutdownWorkerRpc = + shutdownManager.waitOnWorkerShutdownRequest( + service.futureStub().shutdownWorker(requestBuilder.build())); + + return shutdownWorkerRpc.thenCompose( + ignore -> { + CompletableFuture workflowWorkerShutdownFuture = + workflowWorker.shutdown(shutdownManager, interruptUserTasks); + CompletableFuture nexusWorkerShutdownFuture = + nexusWorker.shutdown(shutdownManager, interruptUserTasks); + if (activityWorker != null) { + return CompletableFuture.allOf( + activityWorker.shutdown(shutdownManager, interruptUserTasks), + workflowWorkerShutdownFuture, + nexusWorkerShutdownFuture); + } else { + return CompletableFuture.allOf(workflowWorkerShutdownFuture, nexusWorkerShutdownFuture); + } + }); } boolean isTerminated() { @@ -826,8 +879,10 @@ private static SingleWorkerOptions toActivityOptions( WorkerOptions options, WorkflowClientOptions clientOptions, List contextPropagators, - Scope metricsScope) { - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + Scope metricsScope, + String workerInstanceKey) { + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setUsingVirtualThreads(options.isUsingVirtualThreadsOnActivityWorker()) .setPollerOptions( PollerOptions.newBuilder() @@ -848,8 +903,10 @@ private static SingleWorkerOptions toNexusOptions( WorkerOptions options, WorkflowClientOptions clientOptions, List contextPropagators, - Scope metricsScope) { - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + Scope metricsScope, + String workerInstanceKey) { + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior( @@ -870,7 +927,8 @@ private static SingleWorkerOptions toWorkflowWorkerOptions( WorkflowClientOptions clientOptions, String taskQueue, List contextPropagators, - Scope metricsScope) { + Scope metricsScope, + String workerInstanceKey) { Map tags = new ImmutableMap.Builder(1).put(MetricsTag.TASK_QUEUE, taskQueue).build(); @@ -899,7 +957,8 @@ private static SingleWorkerOptions toWorkflowWorkerOptions( } } - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior( @@ -921,8 +980,10 @@ private static SingleWorkerOptions toLocalActivityOptions( WorkerOptions options, WorkflowClientOptions clientOptions, List contextPropagators, - Scope metricsScope) { - return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators) + Scope metricsScope, + String workerInstanceKey) { + return toSingleWorkerOptions( + factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) @@ -939,7 +1000,8 @@ private static SingleWorkerOptions.Builder toSingleWorkerOptions( WorkerFactoryOptions factoryOptions, WorkerOptions options, WorkflowClientOptions clientOptions, - List contextPropagators) { + List contextPropagators, + String workerInstanceKey) { String buildId = null; if (options.getBuildId() != null) { buildId = options.getBuildId(); @@ -962,7 +1024,8 @@ private static SingleWorkerOptions.Builder toSingleWorkerOptions( .setWorkerInterceptors(factoryOptions.getWorkerInterceptors()) .setMaxHeartbeatThrottleInterval(options.getMaxHeartbeatThrottleInterval()) .setDefaultHeartbeatThrottleInterval(options.getDefaultHeartbeatThrottleInterval()) - .setDeploymentOptions(options.getDeploymentOptions()); + .setDeploymentOptions(options.getDeploymentOptions()) + .setWorkerInstanceKey(workerInstanceKey); } /** diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 741990624d..08d2683ee3 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -279,6 +279,12 @@ public synchronized void start() { if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) { namespaceCapabilities.setPollerAutoscaling(true); } + if (describeNamespaceResponse + .getNamespaceInfo() + .getCapabilities() + .getWorkerPollCompleteOnShutdown()) { + namespaceCapabilities.setGracefulPollShutdown(true); + } // Build plugin execution chain (reverse order for proper nesting) Consumer startChain = WorkerFactory::doStart; diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java index 2ade977627..5faa34ca7c 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/AsyncPollerTest.java @@ -133,7 +133,7 @@ private AsyncPoller newPoller( pollTask, taskExecutor, options, - false, + new NamespaceCapabilities(), new NoopScope()); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java index e4223c0b54..c6f11a61a1 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java @@ -80,6 +80,7 @@ public void supplierIsCalledAppropriately() { TASK_QUEUE, "stickytaskqueue", "", + "test-instance-key", new WorkerVersioningOptions("", false, null), trackingSS, stickyQueueBalancer, @@ -172,6 +173,7 @@ public void asyncPollerSupplierIsCalledAppropriately() throws Exception { TASK_QUEUE, null, "", + "test-instance-key", new WorkerVersioningOptions("", false, null), trackingSS, metricsScope, diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index 59538ac8b2..3bd9e1f909 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -68,6 +68,7 @@ public void stickyQueueBacklogResetTest() { "taskqueue", "stickytaskqueue", "", + "test-instance-key", new WorkerVersioningOptions("", false, null), slotSupplier, stickyQueueBalancer, From 594756c69f4ff49a34b75f473cdcc055713e6256 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 17:45:15 -0700 Subject: [PATCH 2/8] Don't block on shutdown RPC in interrupt mode --- temporal-sdk/src/main/java/io/temporal/worker/Worker.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index 1415fda39e..fad5c899b8 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -505,7 +505,13 @@ CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interr shutdownManager.waitOnWorkerShutdownRequest( service.futureStub().shutdownWorker(requestBuilder.build())); - return shutdownWorkerRpc.thenCompose( + // When interrupting tasks (shutdownNow), fire the RPC but don't block on it — proceed to + // shut down pollers immediately. For graceful shutdown, wait for the RPC so the server can + // complete outstanding polls with empty responses before we start waiting on them. + CompletableFuture preShutdown = + interruptUserTasks ? CompletableFuture.completedFuture(null) : shutdownWorkerRpc; + + return preShutdown.thenCompose( ignore -> { CompletableFuture workflowWorkerShutdownFuture = workflowWorker.shutdown(shutdownManager, interruptUserTasks); From 7c69c28a225c3725296c965e3b974142e24d9ed0 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 17:54:51 -0700 Subject: [PATCH 3/8] Add test --- .../worker/GracefulPollShutdownTest.java | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java new file mode 100644 index 0000000000..edd8968695 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java @@ -0,0 +1,149 @@ +package io.temporal.internal.worker; + +import com.uber.m3.tally.NoopScope; +import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +/** + * Tests that an in-flight poll survives shutdown when graceful poll shutdown is enabled, and is + * killed promptly when it is not. + */ +@RunWith(Parameterized.class) +public class GracefulPollShutdownTest { + + @Parameterized.Parameter public boolean graceful; + + @Parameterized.Parameters(name = "graceful={0}") + public static Object[] data() { + return new Object[] {true, false}; + } + + @Test(timeout = 10_000) + public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception { + NamespaceCapabilities capabilities = new NamespaceCapabilities(); + capabilities.setGracefulPollShutdown(graceful); + + AtomicReference processedTask = new AtomicReference<>(); + CountDownLatch taskProcessedLatch = new CountDownLatch(1); + ShutdownableTaskExecutor taskExecutor = + new ShutdownableTaskExecutor() { + @Override + public void process(@NonNull String task) { + processedTask.set(task); + taskProcessedLatch.countDown(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public CompletableFuture shutdown( + ShutdownManager shutdownManager, boolean interruptTasks) { + return CompletableFuture.completedFuture(null); + } + + @Override + public void awaitTermination(long timeout, TimeUnit unit) {} + }; + + // -- poll task: first call returns immediately, second blocks until released -- + CountDownLatch secondPollStarted = new CountDownLatch(1); + CountDownLatch releasePoll = new CountDownLatch(1); + + MultiThreadedPoller.PollTask pollTask = + new MultiThreadedPoller.PollTask() { + private int callCount = 0; + + @Override + public synchronized String poll() { + callCount++; + if (callCount == 1) { + return "task-1"; + } else if (callCount == 2) { + secondPollStarted.countDown(); + try { + releasePoll.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + return "task-2"; + } + // Subsequent calls just block until interrupted (simulates long poll) + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + } + }; + + // -- create poller with 1 thread so polls are sequential -- + MultiThreadedPoller poller = + new MultiThreadedPoller<>( + "test-identity", + pollTask, + taskExecutor, + PollerOptions.newBuilder() + .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) + .setPollThreadNamePrefix("test-poller") + .build(), + new NoopScope(), + capabilities); + + poller.start(); + + // Wait for the first task to be processed (proves poller is running) + assertTrue("first task should be processed", taskProcessedLatch.await(5, TimeUnit.SECONDS)); + assertEquals("task-1", processedTask.get()); + + // Wait for the second poll to be in-flight + assertTrue("second poll should start", secondPollStarted.await(5, TimeUnit.SECONDS)); + + // Trigger shutdown (don't interrupt tasks) + ShutdownManager shutdownManager = new ShutdownManager(); + CompletableFuture shutdownFuture = poller.shutdown(shutdownManager, false); + + if (graceful) { + // In graceful mode the poller waits for the in-flight poll to complete. + // The shutdown should NOT have completed yet since the poll is still blocked. + assertFalse("shutdown should not complete while poll is in-flight", shutdownFuture.isDone()); + + // Simulate the server returning the poll response (as it would after ShutdownWorker RPC) + releasePoll.countDown(); + + // Wait for shutdown to complete - the poll should return "task-2" and be processed + shutdownFuture.get(5, TimeUnit.SECONDS); + + assertEquals("task-2", processedTask.get()); + } else { + // In legacy mode the poller forcefully interrupts in-flight polls. + // Shutdown should complete quickly without releasing the blocked poll. + shutdownFuture.get(5, TimeUnit.SECONDS); + + // The second task should NOT have been processed since the poll was killed. + assertNotEquals( + "task-2 should not be processed in legacy mode", "task-2", processedTask.get()); + } + + shutdownManager.close(); + } +} From b6140f54d14d0b633477b6816c46da08991906ca Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 27 Mar 2026 18:18:11 -0700 Subject: [PATCH 4/8] Format fix --- .../internal/worker/GracefulPollShutdownTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java index edd8968695..45c2d1f7f7 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java @@ -1,18 +1,17 @@ package io.temporal.internal.worker; +import static org.junit.Assert.*; + import com.uber.m3.tally.NoopScope; import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Tests that an in-flight poll survives shutdown when graceful poll shutdown is enabled, and is From 9960fa86580aaf39397fde258623e61556c0f924 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Sat, 28 Mar 2026 10:24:14 -0700 Subject: [PATCH 5/8] Test fixes --- .../io/temporal/internal/worker/StickyQueueBacklogTest.java | 1 + .../java/io/temporal/internal/worker/WorkflowWorkerTest.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index 3bd9e1f909..ab806c960b 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -98,6 +98,7 @@ public void stickyQueueBacklogResetTest() { .setKind(TaskQueueKind.TASK_QUEUE_KIND_STICKY) .build()) .setNamespace("default") + .setWorkerInstanceKey("test-instance-key") .build()))) .thenReturn(pollResponse); if (throwOnPoll) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index d4e6e947ca..7cbf5f4ea5 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -80,6 +80,7 @@ public void concurrentPollRequestLockTest() throws Exception { SingleWorkerOptions.newBuilder() .setIdentity("test_identity") .setBuildId(UUID.randomUUID().toString()) + .setWorkerInstanceKey(UUID.randomUUID().toString()) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(3)) @@ -252,6 +253,7 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { SingleWorkerOptions.newBuilder() .setIdentity("test_identity") .setBuildId(UUID.randomUUID().toString()) + .setWorkerInstanceKey(UUID.randomUUID().toString()) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) @@ -397,6 +399,7 @@ public boolean isAnyTypeSupported() { SingleWorkerOptions.newBuilder() .setIdentity("test_identity") .setBuildId(UUID.randomUUID().toString()) + .setWorkerInstanceKey(UUID.randomUUID().toString()) .setPollerOptions( PollerOptions.newBuilder() .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) From 67a160388981e74347aabdbe2f18f9ffa1126f3c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 2 Apr 2026 10:17:46 -0700 Subject: [PATCH 6/8] Review comments --- .../temporal/internal/worker/BasePoller.java | 3 ++- .../worker/NamespaceCapabilities.java | 14 ++++++++++---- .../io/temporal/worker/WorkerFactory.java | 19 ++----------------- .../worker/GracefulPollShutdownTest.java | 4 +++- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java index 9e685bbcae..febd6241af 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java @@ -65,7 +65,8 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean // When graceful poll shutdown is enabled, the server will complete outstanding polls with // empty responses after ShutdownWorker is called. We simply wait for polls to return. pollExecutorShutdown = - shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor"); + shutdownManager.shutdownExecutor( + pollExecutor, this + "#pollExecutor", Duration.ofSeconds(80)); } else { // Old behaviour forcibly stops outstanding polls. pollExecutorShutdown = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java index 96586259b3..7c98310514 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java @@ -1,5 +1,6 @@ package io.temporal.internal.worker; +import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -13,12 +14,17 @@ public final class NamespaceCapabilities { private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false); - public boolean isPollerAutoscaling() { - return pollerAutoscaling.get(); + public void setFromCapabilities(Capabilities capabilities) { + if (capabilities.getPollerAutoscaling()) { + pollerAutoscaling.set(true); + } + if (capabilities.getWorkerPollCompleteOnShutdown()) { + gracefulPollShutdown.set(true); + } } - public void setPollerAutoscaling(boolean value) { - pollerAutoscaling.set(value); + public boolean isPollerAutoscaling() { + return pollerAutoscaling.get(); } public boolean isGracefulPollShutdown() { diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 08d2683ee3..392930369e 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -268,23 +268,8 @@ public synchronized void start() { DescribeNamespaceRequest.newBuilder() .setNamespace(workflowClient.getOptions().getNamespace()) .build()); - if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getWorkerHeartbeats()) { - namespaceCapabilities.setWorkerHeartbeats(true); - } else { - log.debug( - "Server does not support worker heartbeats for namespace {}", - workflowClient.getOptions().getNamespace()); - } - - if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) { - namespaceCapabilities.setPollerAutoscaling(true); - } - if (describeNamespaceResponse - .getNamespaceInfo() - .getCapabilities() - .getWorkerPollCompleteOnShutdown()) { - namespaceCapabilities.setGracefulPollShutdown(true); - } + namespaceCapabilities.setFromCapabilities( + describeNamespaceResponse.getNamespaceInfo().getCapabilities()); // Build plugin execution chain (reverse order for proper nesting) Consumer startChain = WorkerFactory::doStart; diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java index 45c2d1f7f7..3497ffca48 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java @@ -3,6 +3,7 @@ import static org.junit.Assert.*; import com.uber.m3.tally.NoopScope; +import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities; import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -30,7 +31,8 @@ public static Object[] data() { @Test(timeout = 10_000) public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception { NamespaceCapabilities capabilities = new NamespaceCapabilities(); - capabilities.setGracefulPollShutdown(graceful); + capabilities.setFromCapabilities( + Capabilities.newBuilder().setWorkerPollCompleteOnShutdown(true).build()); AtomicReference processedTask = new AtomicReference<>(); CountDownLatch taskProcessedLatch = new CountDownLatch(1); From 6db60f3eeada1af3750de2c256c7f69cc023e66c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 30 Apr 2026 11:47:58 -0700 Subject: [PATCH 7/8] Fix merge issues --- .../worker/NamespaceCapabilities.java | 3 +- .../internal/worker/SyncWorkflowWorker.java | 12 -- .../internal/worker/WorkflowWorker.java | 15 -- .../main/java/io/temporal/worker/Worker.java | 26 ++-- .../io/temporal/worker/WorkerFactory.java | 2 +- .../worker/GracefulPollShutdownTest.java | 6 +- .../internal/worker/WorkflowWorkerTest.java | 85 ----------- .../temporal/worker/WorkerShutdownTest.java | 141 ++++++++++++++++++ 8 files changed, 160 insertions(+), 130 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/WorkerShutdownTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java index 7c98310514..a3410fa258 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java @@ -13,7 +13,6 @@ public final class NamespaceCapabilities { private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false); private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false); - public void setFromCapabilities(Capabilities capabilities) { if (capabilities.getPollerAutoscaling()) { pollerAutoscaling.set(true); @@ -34,7 +33,7 @@ public boolean isGracefulPollShutdown() { public void setGracefulPollShutdown(boolean value) { gracefulPollShutdown.set(value); } - + public boolean isWorkerHeartbeats() { return workerHeartbeats.get(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java index 51ab7a7009..18cf7fd4a5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java @@ -3,9 +3,7 @@ import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue; import io.temporal.api.common.v1.Payloads; -import io.temporal.api.enums.v1.TaskQueueType; import io.temporal.api.taskqueue.v1.TaskQueue; -import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; @@ -24,11 +22,9 @@ import io.temporal.workflow.Functions.Func1; import java.lang.reflect.Type; import java.time.Duration; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; -import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -64,8 +60,6 @@ public SyncWorkflowWorker( @Nonnull WorkflowClient client, @Nonnull String namespace, @Nonnull String taskQueue, - @Nonnull String workerInstanceKey, - @Nonnull Supplier> activeTaskQueueTypesSupplier, @Nonnull SingleWorkerOptions singleWorkerOptions, @Nonnull SingleWorkerOptions localActivityOptions, @Nonnull WorkflowRunLockManager runLocks, @@ -123,8 +117,6 @@ public SyncWorkflowWorker( client.getWorkflowServiceStubs(), namespace, taskQueue, - workerInstanceKey, - activeTaskQueueTypesSupplier, stickyTaskQueueName, singleWorkerOptions, runLocks, @@ -250,10 +242,6 @@ public TrackingSlotSupplier getLocalActivitySlotSupplier( return laWorker.getSlotSupplier(); } - public void setHeartbeatSupplier(Supplier supplier) { - workflowWorker.setHeartbeatSupplier(supplier); - } - public boolean hasStickyQueue() { return workflowWorker.hasStickyQueue(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 1c36f5c521..a128c7b75b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -13,11 +13,8 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.QueryResultType; import io.temporal.api.enums.v1.TaskQueueKind; -import io.temporal.api.enums.v1.TaskQueueType; -import io.temporal.api.enums.v1.WorkerStatus; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.failure.v1.Failure; -import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.api.workflowservice.v1.*; import io.temporal.failure.ApplicationFailure; import io.temporal.internal.logging.LoggerTag; @@ -33,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -57,9 +53,6 @@ final class WorkflowWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final EagerActivityDispatcher eagerActivityDispatcher; private final TrackingSlotSupplier slotSupplier; - private volatile Supplier heartbeatSupplier; - private final String workerInstanceKey; - private final Supplier> activeTaskQueueTypesSupplier; private final TaskCounter taskCounter = new TaskCounter(); private final PollerTracker pollerTracker = new PollerTracker(); @@ -78,8 +71,6 @@ public WorkflowWorker( @Nonnull WorkflowServiceStubs service, @Nonnull String namespace, @Nonnull String taskQueue, - @Nonnull String workerInstanceKey, - @Nonnull Supplier> activeTaskQueueTypesSupplier, @Nullable String stickyTaskQueueName, @Nonnull SingleWorkerOptions options, @Nonnull WorkflowRunLockManager runLocks, @@ -91,8 +82,6 @@ public WorkflowWorker( this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); - this.workerInstanceKey = Objects.requireNonNull(workerInstanceKey); - this.activeTaskQueueTypesSupplier = Objects.requireNonNull(activeTaskQueueTypesSupplier); this.options = Objects.requireNonNull(options); this.stickyTaskQueueName = stickyTaskQueueName; this.pollerOptions = getPollerOptions(options); @@ -344,10 +333,6 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() { .orElse(null); } - public void setHeartbeatSupplier(Supplier supplier) { - this.heartbeatSupplier = supplier; - } - public TrackingSlotSupplier getSlotSupplier() { return slotSupplier; } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index fad5c899b8..19a52c65e1 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -27,8 +27,8 @@ import io.temporal.internal.worker.*; import io.temporal.internal.worker.TaskCounter; import io.temporal.serviceclient.MetricsTag; -import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.Version; +import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.tuning.*; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; @@ -67,17 +67,16 @@ public final class Worker { private final String namespace; private final String identity; private final String stickyTaskQueueName; - private final NamespaceCapabilities namespaceCapabilities; final SyncWorkflowWorker workflowWorker; final SyncActivityWorker activityWorker; final SyncNexusWorker nexusWorker; private final AtomicBoolean started = new AtomicBoolean(); private volatile boolean shuttingDown = false; - private final String workerInstanceKey = UUID.randomUUID().toString(); private volatile Instant startTime; private final WorkflowClientOptions clientOptions; private final @Nonnull WorkflowExecutorCache cache; private final Map previousHeartbeatSnapshots = new ConcurrentHashMap<>(); + private volatile Supplier heartbeatSupplier; private static final class TaskSnapshot { final int processed; @@ -114,8 +113,7 @@ private static final class TaskSnapshot { @Nonnull NamespaceCapabilities namespaceCapabilities) { Objects.requireNonNull(client, "client should not be null"); - this.namespaceCapabilities = - Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null"); + Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null"); this.plugins = Objects.requireNonNull(plugins, "plugins should not be null"); Preconditions.checkArgument( !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); @@ -220,8 +218,6 @@ private static final class TaskSnapshot { client, namespace, taskQueue, - workerInstanceKey, - this::getActiveTaskQueueTypes, singleWorkerOptions, localActivityOptions, runLocks, @@ -493,14 +489,16 @@ CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interr .setWorkerInstanceKey(workerInstanceKey) .setTaskQueue(taskQueue) .setReason("graceful shutdown") - .addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) - .addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_NEXUS); - if (activityWorker != null) { - requestBuilder.addTaskQueueTypes(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY); - } + .addAllTaskQueueTypes(getActiveTaskQueueTypes()); if (stickyTaskQueueName != null) { requestBuilder.setStickyTaskQueue(stickyTaskQueueName); } + if (heartbeatSupplier != null) { + requestBuilder.setWorkerHeartbeat( + heartbeatSupplier.get().toBuilder() + .setStatus(WorkerStatus.WORKER_STATUS_SHUTTING_DOWN) + .build()); + } CompletableFuture shutdownWorkerRpc = shutdownManager.waitOnWorkerShutdownRequest( service.futureStub().shutdownWorker(requestBuilder.build())); @@ -550,6 +548,10 @@ String getWorkerInstanceKey() { return workerInstanceKey; } + void setHeartbeatSupplier(Supplier supplier) { + this.heartbeatSupplier = supplier; + } + List getActiveTaskQueueTypes() { List types = new ArrayList<>(); if (workflowWorker.isAnyTypeSupported()) { diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 392930369e..c9bb8eb210 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -312,7 +312,7 @@ private void doStart() { Supplier heartbeatSupplier = worker.buildHeartbeatCallback(workerGroupingKey); hbManager.registerWorker(namespace, worker.getWorkerInstanceKey(), heartbeatSupplier); - worker.workflowWorker.setHeartbeatSupplier(heartbeatSupplier); + worker.setHeartbeatSupplier(heartbeatSupplier); } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java index 3497ffca48..ef0e93495a 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java @@ -9,7 +9,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.checkerframework.checker.nullness.qual.NonNull; +import javax.annotation.Nonnull; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -32,14 +32,14 @@ public static Object[] data() { public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception { NamespaceCapabilities capabilities = new NamespaceCapabilities(); capabilities.setFromCapabilities( - Capabilities.newBuilder().setWorkerPollCompleteOnShutdown(true).build()); + Capabilities.newBuilder().setWorkerPollCompleteOnShutdown(graceful).build()); AtomicReference processedTask = new AtomicReference<>(); CountDownLatch taskProcessedLatch = new CountDownLatch(1); ShutdownableTaskExecutor taskExecutor = new ShutdownableTaskExecutor() { @Override - public void process(@NonNull String task) { + public void process(@Nonnull String task) { processedTask.set(task); taskProcessedLatch.countDown(); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index 7cbf5f4ea5..d4f1824c26 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -14,7 +14,6 @@ import com.uber.m3.util.ImmutableMap; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.common.v1.WorkflowType; -import io.temporal.api.enums.v1.TaskQueueType; import io.temporal.api.workflowservice.v1.*; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.internal.common.InternalUtils; @@ -30,12 +29,8 @@ import io.temporal.worker.tuning.SlotSupplier; import io.temporal.worker.tuning.WorkflowSlotInfo; import java.time.Duration; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import org.junit.Test; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -74,8 +69,6 @@ public void concurrentPollRequestLockTest() throws Exception { client, "default", "task_queue", - "test-worker-instance-key", - java.util.Collections::emptyList, "sticky_task_queue", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -247,8 +240,6 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { client, "default", "task_queue", - "test-worker-instance-key", - java.util.Collections::emptyList, "sticky_task_queue", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -393,8 +384,6 @@ public boolean isAnyTypeSupported() { client, "default", "taskQueue", - "test-worker-instance-key", - java.util.Collections::emptyList, "sticky", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -447,80 +436,6 @@ public boolean isAnyTypeSupported() { worker.shutdown(new ShutdownManager(), true).get(); } - @Test - public void activeTaskQueueTypesEvaluatedAtShutdownTime() throws Exception { - WorkflowServiceStubs client = mock(WorkflowServiceStubs.class); - when(client.getServerCapabilities()) - .thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build()); - - WorkflowRunLockManager runLockManager = new WorkflowRunLockManager(); - Scope metricsScope = new NoopScope(); - WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLockManager, metricsScope); - SlotSupplier slotSupplier = new FixedSizeSlotSupplier<>(10); - - WorkflowTaskHandler taskHandler = mock(WorkflowTaskHandler.class); - when(taskHandler.isAnyTypeSupported()).thenReturn(true); - - // Supplier that starts with WORKFLOW only, then adds NEXUS later - AtomicReference> typesRef = - new AtomicReference<>(Arrays.asList(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)); - Supplier> supplier = typesRef::get; - - EagerActivityDispatcher eagerActivityDispatcher = mock(EagerActivityDispatcher.class); - WorkflowWorker worker = - new WorkflowWorker( - client, - "default", - "task_queue", - "test-worker-instance-key", - supplier, - null, - SingleWorkerOptions.newBuilder() - .setIdentity("test_identity") - .setBuildId(UUID.randomUUID().toString()) - .setPollerOptions( - PollerOptions.newBuilder() - .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) - .build()) - .setMetricsScope(metricsScope) - .build(), - runLockManager, - cache, - taskHandler, - eagerActivityDispatcher, - slotSupplier, - new NamespaceCapabilities()); - - // Simulate registering Nexus after construction - typesRef.set( - Arrays.asList( - TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW, - TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY, - TaskQueueType.TASK_QUEUE_TYPE_NEXUS)); - - WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = - mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); - when(client.futureStub()).thenReturn(futureStub); - when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class))) - .thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build())); - - worker.shutdown(new ShutdownManager(), true).get(5, TimeUnit.SECONDS); - - org.mockito.ArgumentCaptor captor = - org.mockito.ArgumentCaptor.forClass(ShutdownWorkerRequest.class); - verify(futureStub).shutdownWorker(captor.capture()); - List shutdownTypes = captor.getValue().getTaskQueueTypesList(); - assertTrue( - "ShutdownWorkerRequest should include NEXUS type added after construction", - shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_NEXUS)); - assertTrue( - "ShutdownWorkerRequest should include WORKFLOW type", - shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)); - assertTrue( - "ShutdownWorkerRequest should include ACTIVITY type", - shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY)); - } - private ReplayWorkflowFactory setUpMockWorkflowFactory() throws Throwable { ReplayWorkflow mockWorkflow = mock(ReplayWorkflow.class); ReplayWorkflowFactory mockFactory = mock(ReplayWorkflowFactory.class); diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerShutdownTest.java new file mode 100644 index 0000000000..d48e39725f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerShutdownTest.java @@ -0,0 +1,141 @@ +package io.temporal.worker; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.google.common.util.concurrent.Futures; +import com.uber.m3.tally.NoopScope; +import com.uber.m3.tally.Scope; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; +import io.temporal.api.workflowservice.v1.ShutdownWorkerRequest; +import io.temporal.api.workflowservice.v1.ShutdownWorkerResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.internal.sync.WorkflowThreadExecutor; +import io.temporal.internal.worker.NamespaceCapabilities; +import io.temporal.internal.worker.ShutdownManager; +import io.temporal.internal.worker.WorkflowExecutorCache; +import io.temporal.internal.worker.WorkflowRunLockManager; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.shared.TestNexusServices; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class WorkerShutdownTest { + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + void run(); + } + + public static class TestWorkflowImpl implements TestWorkflow { + @Override + public void run() {} + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + void doThing(); + } + + public static class TestActivityImpl implements TestActivity { + @Override + public void doThing() {} + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync((ctx, details, now) -> "Hello " + now); + } + } + + /** + * Verifies that the active task queue types in the ShutdownWorkerRequest are evaluated at + * shutdown time, not at Worker construction time. Types registered after construction must be + * reflected in the request. + */ + @Test + public void activeTaskQueueTypesEvaluatedAtShutdownTime() throws Exception { + WorkflowServiceStubs service = mock(WorkflowServiceStubs.class); + when(service.getServerCapabilities()) + .thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + + WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = + mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); + when(service.futureStub()).thenReturn(futureStub); + when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class))) + .thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build())); + + WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = + mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(service.blockingStub()).thenReturn(blockingStub); + when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); + + WorkflowClient client = mock(WorkflowClient.class); + when(client.getWorkflowServiceStubs()).thenReturn(service); + when(client.getOptions()) + .thenReturn( + WorkflowClientOptions.newBuilder() + .setNamespace("test-ns") + .validateAndBuildWithDefaults()); + + Scope metricsScope = new NoopScope(); + WorkflowRunLockManager runLocks = new WorkflowRunLockManager(); + WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLocks, metricsScope); + WorkflowThreadExecutor wfThreadExecutor = mock(WorkflowThreadExecutor.class); + + Worker worker = + new Worker( + client, + "test-task-queue", + WorkerFactoryOptions.newBuilder().build(), + WorkerOptions.newBuilder().build(), + metricsScope, + runLocks, + cache, + false, + wfThreadExecutor, + Collections.emptyList(), + Collections.emptyList(), + new NamespaceCapabilities()); + + // Register types AFTER worker construction. The request built by shutdown should reflect + // these registrations, proving that getActiveTaskQueueTypes() is evaluated lazily. + worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker.registerActivitiesImplementations(new TestActivityImpl()); + worker.registerNexusServiceImplementation(new TestNexusServiceImpl()); + + worker.shutdown(new ShutdownManager(), true).get(5, TimeUnit.SECONDS); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(ShutdownWorkerRequest.class); + verify(futureStub).shutdownWorker(captor.capture()); + List shutdownTypes = captor.getValue().getTaskQueueTypesList(); + assertTrue( + "ShutdownWorkerRequest should include WORKFLOW type registered after construction", + shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)); + assertTrue( + "ShutdownWorkerRequest should include ACTIVITY type registered after construction", + shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY)); + assertTrue( + "ShutdownWorkerRequest should include NEXUS type registered after construction", + shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_NEXUS)); + } +} From 2598773a7dddbf2c38022b4b5837b3ca24a28996 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 30 Apr 2026 16:03:29 -0700 Subject: [PATCH 8/8] Use latest CLI --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c5c1339b81..775324b5d2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,7 @@ jobs: report_paths: "**/build/test-results/test/TEST-*.xml" unit_test_jdk8: - name: Unit test with docker service [JDK8] + name: Unit test with CLI runs-on: ubuntu-latest-16-cores timeout-minutes: 30 steps: @@ -82,9 +82,9 @@ jobs: - name: Set up Gradle uses: gradle/actions/setup-gradle@ac396bf1a80af16236baf54bd7330ae21dc6ece5 # v6 - - name: Start containerized server and dependencies + - name: Start CLI server env: - TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-151.0 + TEMPORAL_CLI_VERSION: 1.7.0 run: | wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz tar -xzf temporal_cli.tar.gz