From 29512ddf90f7da5cfc67154734c34378e4a17220 Mon Sep 17 00:00:00 2001 From: Jonathan Jamroga Date: Fri, 12 Jun 2026 13:04:44 -0400 Subject: [PATCH] Decouple actor lock TTL from workflow deadline via heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ActorWorkflow.ResumeActor and SuspendActor used to derive their workflow ctx from the Redis lock TTL via acquireActorLock(ctx, id, 30s, 2s) — the workflow deadline and the lock TTL were a single 28s knob. That meant image pulls / restores that legitimately need more than 28s death-looped forever, while raising the knob also raised how long peers wait to retry an actor after a crashed ateapi replica. Split the two concerns: - Lock TTL stays short (30s constant, internal). Bounds peer failover. - Workflow deadline is a separate operator-configurable knob via the new --actor-workflow-deadline pflag (default 5m). Bounds a single Resume/Suspend. - A heartbeat goroutine refreshes the lock every lockTTL/3 (~10s) for the full workflow duration. On RefreshLock=false or any Redis error (peer stole the lock, Redis blip), the workflow ctx is cancelled with errLostActorLock as the cause so in-flight steps unwind cleanly and the mutual-exclusion invariant is preserved. - The release function stops the heartbeat (waits for goroutine exit) before best-effort ReleaseLock. Adds store.Interface.RefreshLock with a Redis CAS Lua script mirroring the existing ReleaseLock script. --- .../internal/controlapi/functional_test.go | 2 +- cmd/ateapi/internal/controlapi/service.go | 8 +- cmd/ateapi/internal/controlapi/workflow.go | 100 +++++++++++--- .../internal/controlapi/workflow_test.go | 124 ++++++++++++++++++ .../internal/store/ateredis/ateredis.go | 20 +++ .../internal/store/ateredis/ateredis_test.go | 91 +++++++++++++ cmd/ateapi/internal/store/store.go | 6 + cmd/ateapi/main.go | 5 +- 8 files changed, 331 insertions(+), 25 deletions(-) create mode 100644 cmd/ateapi/internal/controlapi/workflow_test.go diff --git a/cmd/ateapi/internal/controlapi/functional_test.go b/cmd/ateapi/internal/controlapi/functional_test.go index 281c7a146..cc575638d 100644 --- a/cmd/ateapi/internal/controlapi/functional_test.go +++ b/cmd/ateapi/internal/controlapi/functional_test.go @@ -303,7 +303,7 @@ func setupTest(t *testing.T, ns string) *testContext { } dialer := NewAteletDialer(workerInformer.GetIndexer(), ateletInformer.GetIndexer()) - service := NewService(persistence, wc, actorTemplateLister, workerPoolLister, sandboxConfigLister, dialer, k8sClient) + service := NewService(persistence, wc, actorTemplateLister, workerPoolLister, sandboxConfigLister, dialer, k8sClient, 30*time.Second) // 5. Start REAL gRPC Server for ATE API grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ateinterceptors.ServerUnaryInterceptor)) diff --git a/cmd/ateapi/internal/controlapi/service.go b/cmd/ateapi/internal/controlapi/service.go index 7ec695b00..a13a33b4b 100644 --- a/cmd/ateapi/internal/controlapi/service.go +++ b/cmd/ateapi/internal/controlapi/service.go @@ -15,6 +15,8 @@ package controlapi import ( + "time" + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" "github.com/agent-substrate/substrate/cmd/ateapi/internal/workercache" listersv1alpha1 "github.com/agent-substrate/substrate/pkg/client/listers/api/v1alpha1" @@ -34,7 +36,8 @@ type Service struct { var _ ateapipb.ControlServer = (*Service)(nil) -// NewService creates a service. +// NewService creates a service. actorWorkflowDeadline bounds how long a single +// Resume/Suspend workflow can run end-to-end. func NewService( persistence store.Interface, workerCache *workercache.Cache, @@ -43,13 +46,14 @@ func NewService( sandboxConfigLister listersv1alpha1.SandboxConfigLister, dialer *AteletDialer, kubeClient kubernetes.Interface, + actorWorkflowDeadline time.Duration, ) *Service { s := &Service{ persistence: persistence, actorTemplateLister: actorTemplateLister, workerPoolLister: workerPoolLister, dialer: dialer, - actorWorkflow: NewActorWorkflow(persistence, workerCache, dialer, actorTemplateLister, workerPoolLister, sandboxConfigLister, kubeClient), + actorWorkflow: NewActorWorkflow(persistence, workerCache, dialer, actorTemplateLister, workerPoolLister, sandboxConfigLister, kubeClient, actorWorkflowDeadline), } return s diff --git a/cmd/ateapi/internal/controlapi/workflow.go b/cmd/ateapi/internal/controlapi/workflow.go index 25c4f36ac..7465914ef 100644 --- a/cmd/ateapi/internal/controlapi/workflow.go +++ b/cmd/ateapi/internal/controlapi/workflow.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" @@ -33,6 +34,18 @@ import ( "k8s.io/client-go/kubernetes" ) +// actorLockTTL is the Redis TTL on the per-actor workflow lock. It bounds how +// long a peer must wait to retry an actor after this process crashes mid-workflow. +const actorLockTTL = 30 * time.Second + +// actorLockHeartbeatInterval is how often the heartbeat refreshes the lock. +// Chosen so we get ~3 attempts before the TTL would otherwise lapse. +const actorLockHeartbeatInterval = actorLockTTL / 3 + +// errLostActorLock is the context cause set when the heartbeat can no longer +// keep the actor lock alive (peer stole it, or Redis returned an error). +var errLostActorLock = errors.New("lost actor lock during workflow") + // WorkflowStep represents a single, idempotent operation in a workflow graph. // Params is the immutable parameters used to start the workflow. // Context is the mutable context fetched or modified during execution. @@ -123,9 +136,14 @@ type ActorWorkflow struct { sandboxConfigLister listersv1alpha1.SandboxConfigLister kubeClient kubernetes.Interface secretCache *envSecretCache + // workflowDeadline is the maximum duration of a single Resume/Suspend + // workflow. The lock is kept alive across this duration by a heartbeat, + // independent of actorLockTTL. + workflowDeadline time.Duration } -// NewActorWorkflow creates a new ActorWorkflow. +// NewActorWorkflow creates a new ActorWorkflow. workflowDeadline bounds how +// long a single Resume/Suspend can run end-to-end. func NewActorWorkflow( store store.Interface, workerCache *workercache.Cache, @@ -134,6 +152,7 @@ func NewActorWorkflow( workerPoolLister listersv1alpha1.WorkerPoolLister, sandboxConfigLister listersv1alpha1.SandboxConfigLister, kubeClient kubernetes.Interface, + workflowDeadline time.Duration, ) *ActorWorkflow { return &ActorWorkflow{ store: store, @@ -144,6 +163,7 @@ func NewActorWorkflow( sandboxConfigLister: sandboxConfigLister, kubeClient: kubeClient, secretCache: newEnvSecretCache(envSecretCacheTTL), + workflowDeadline: workflowDeadline, } } @@ -156,9 +176,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, atespace, id string, bo } state := &ResumeState{} - // Acquire lock and get the timeout context for the workflow - // Lock TTL is 30 seconds, with 2 seconds padding for workflow timeout - ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second) + ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval) if err != nil { return nil, err } @@ -186,9 +204,7 @@ func (w *ActorWorkflow) SuspendActor(ctx context.Context, atespace, id string) ( } state := &SuspendState{} - // Acquire lock and get the timeout context for the workflow - // Lock TTL is 30 seconds, with 2 seconds padding for workflow timeout - ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second) + ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval) if err != nil { return nil, err } @@ -216,9 +232,7 @@ func (w *ActorWorkflow) PauseActor(ctx context.Context, atespace, id string) (*a } state := &PauseState{} - // Acquire lock and get the timeout context for the workflow - // Lock TTL is 30 seconds, with 2 seconds padding for workflow timeout - ctx, releaseLock, err := w.acquireActorLock(ctx, id, 30*time.Second, 2*time.Second) + ctx, releaseLock, err := w.acquireActorLock(ctx, id, actorLockTTL, actorLockHeartbeatInterval) if err != nil { return nil, err } @@ -238,27 +252,71 @@ func (w *ActorWorkflow) PauseActor(ctx context.Context, atespace, id string) (*a return state.Actor, nil } -func (w *ActorWorkflow) acquireActorLock(ctx context.Context, id string, ttl time.Duration, padding time.Duration) (context.Context, func(), error) { +// acquireActorLock takes the per-actor workflow lock and returns a workflow +// context bounded by w.workflowDeadline. A background heartbeat keeps the lock +// alive — independent of lockTTL — for as long as the workflow runs. If the +// heartbeat fails (Redis error or another peer stole the lock) the returned +// context is cancelled with errLostActorLock as the cause, and in-flight steps +// will see ctx.Err() and unwind. The returned release function stops the +// heartbeat, waits for it to exit, then best-effort releases the lock. +func (w *ActorWorkflow) acquireActorLock(ctx context.Context, id string, lockTTL, heartbeatInterval time.Duration) (context.Context, func(), error) { lockKey := "lock:actor:" + id lockValue := uuid.New().String() - // Create a child context for the workflow that expires BEFORE the lock - workflowTimeout := ttl - padding - workflowCtx, cancel := context.WithTimeout(ctx, workflowTimeout) - - acquired, err := w.store.AcquireLock(workflowCtx, lockKey, lockValue, ttl) + acquired, err := w.store.AcquireLock(ctx, lockKey, lockValue, lockTTL) if err != nil { - cancel() return nil, nil, fmt.Errorf("while acquiring lock: %w", err) } if !acquired { - cancel() return nil, nil, status.Error(grpcCodes.Aborted, "another operation is in progress for this actor") } - return workflowCtx, func() { - cancel() + cancellableCtx, cancelCause := context.WithCancelCause(ctx) + workflowCtx, cancelDeadline := context.WithTimeout(cancellableCtx, w.workflowDeadline) + + heartbeatDone := make(chan struct{}) + go w.runLockHeartbeat(workflowCtx, lockKey, lockValue, id, lockTTL, heartbeatInterval, cancelCause, heartbeatDone) + + release := func() { + cancelDeadline() + cancelCause(context.Canceled) + <-heartbeatDone // Use context.Background() to ensure the lock is released even if the workflow context was canceled. w.store.ReleaseLock(context.Background(), lockKey, lockValue) //nolint:errcheck // best-effort release; the lock TTL is the safety net. - }, nil + } + return workflowCtx, release, nil +} + +// runLockHeartbeat refreshes the actor lock on a ticker until ctx is done. If +// a refresh fails or returns false (we no longer own the lock), it cancels the +// workflow context with errLostActorLock so workflow steps tear down promptly. +func (w *ActorWorkflow) runLockHeartbeat(ctx context.Context, lockKey, lockValue, actorID string, lockTTL, heartbeatInterval time.Duration, cancelCause context.CancelCauseFunc, done chan<- struct{}) { + defer close(done) + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ok, err := w.store.RefreshLock(ctx, lockKey, lockValue, lockTTL) + if err != nil { + // If ctx was cancelled out from under us we're already tearing + // down — no need to set a misleading cause. + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + slog.WarnContext(ctx, "Lock heartbeat failed; cancelling workflow", + slog.String("actor_id", actorID), + slog.String("err", err.Error())) + cancelCause(fmt.Errorf("%w: %w", errLostActorLock, err)) + } + return + } + if !ok { + slog.WarnContext(ctx, "Actor lock no longer owned; cancelling workflow", + slog.String("actor_id", actorID)) + cancelCause(errLostActorLock) + return + } + } + } } diff --git a/cmd/ateapi/internal/controlapi/workflow_test.go b/cmd/ateapi/internal/controlapi/workflow_test.go new file mode 100644 index 000000000..57d99e711 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/workflow_test.go @@ -0,0 +1,124 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store/ateredis" + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" +) + +func newLockTestWorkflow(t *testing.T) (*miniredis.Miniredis, *ActorWorkflow) { + t.Helper() + mr, err := miniredis.Run() + if err != nil { + t.Fatalf("miniredis.Run: %v", err) + } + t.Cleanup(mr.Close) + rdb := redis.NewClusterClient(&redis.ClusterOptions{Addrs: []string{mr.Addr()}}) + return mr, &ActorWorkflow{ + store: ateredis.NewPersistence(rdb), + workflowDeadline: 30 * time.Second, + } +} + +func TestAcquireActorLock_HeartbeatKeepsLockAlivePastTTL(t *testing.T) { + mr, w := newLockTestWorkflow(t) + + lockTTL := 150 * time.Millisecond + heartbeat := 40 * time.Millisecond + + ctx, release, err := w.acquireActorLock(context.Background(), "actor-1", lockTTL, heartbeat) + if err != nil { + t.Fatalf("acquireActorLock: %v", err) + } + defer release() + + // Wait through multiple TTLs. If the heartbeat is working the lock key + // must still be in Redis — its TTL is being PEXPIRE'd back to `lockTTL` + // every `heartbeat`. + time.Sleep(4 * lockTTL) + + if !mr.Exists("lock:actor:actor-1") { + t.Fatalf("lock key disappeared from Redis despite heartbeat; ctx err=%v cause=%v", ctx.Err(), context.Cause(ctx)) + } + if ctx.Err() != nil { + t.Fatalf("workflow ctx cancelled while heartbeat was healthy: err=%v cause=%v", ctx.Err(), context.Cause(ctx)) + } +} + +func TestAcquireActorLock_LostLockCancelsWorkflow(t *testing.T) { + mr, w := newLockTestWorkflow(t) + + lockTTL := 200 * time.Millisecond + heartbeat := 30 * time.Millisecond + + ctx, release, err := w.acquireActorLock(context.Background(), "actor-2", lockTTL, heartbeat) + if err != nil { + t.Fatalf("acquireActorLock: %v", err) + } + defer release() + + // Simulate a peer stealing the lock (or the TTL lapsing and someone else + // re-acquiring): wipe our key so the next heartbeat refresh's CAS fails. + mr.Del("lock:actor:actor-2") + + select { + case <-ctx.Done(): + case <-time.After(2 * time.Second): + t.Fatalf("workflow ctx was not cancelled after lock was lost") + } + + if cause := context.Cause(ctx); !errors.Is(cause, errLostActorLock) { + t.Errorf("context.Cause = %v, want errLostActorLock", cause) + } +} + +func TestAcquireActorLock_ReleaseRemovesLock(t *testing.T) { + mr, w := newLockTestWorkflow(t) + + _, release, err := w.acquireActorLock(context.Background(), "actor-3", 200*time.Millisecond, 60*time.Millisecond) + if err != nil { + t.Fatalf("acquireActorLock: %v", err) + } + + if !mr.Exists("lock:actor:actor-3") { + t.Fatalf("lock key not in Redis after acquire") + } + release() + if mr.Exists("lock:actor:actor-3") { + t.Errorf("lock key still in Redis after release") + } +} + +func TestAcquireActorLock_ConflictReturnsAborted(t *testing.T) { + _, w := newLockTestWorkflow(t) + + _, release, err := w.acquireActorLock(context.Background(), "actor-4", 5*time.Second, 1*time.Second) + if err != nil { + t.Fatalf("first acquireActorLock: %v", err) + } + defer release() + + _, _, err = w.acquireActorLock(context.Background(), "actor-4", 5*time.Second, 1*time.Second) + if err == nil { + t.Fatalf("expected second acquireActorLock to fail") + } +} diff --git a/cmd/ateapi/internal/store/ateredis/ateredis.go b/cmd/ateapi/internal/store/ateredis/ateredis.go index 2a27fa7c1..e4bad74dd 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis.go @@ -799,3 +799,23 @@ func (s *Persistence) ReleaseLock(ctx context.Context, key string, value string) } return nil } + +func (s *Persistence) RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) { + var luaRefresh = redis.NewScript(` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("pexpire", KEYS[1], ARGV[2]) + else + return 0 + end + `) + + res, err := luaRefresh.Run(ctx, s.rdb, []string{key}, value, ttl.Milliseconds()).Result() + if err != nil { + return false, fmt.Errorf("while refreshing lock for %q with value %q: %w", key, value, err) + } + n, ok := res.(int64) + if !ok { + return false, fmt.Errorf("while refreshing lock for %q: unexpected result type %T", key, res) + } + return n == 1, nil +} diff --git a/cmd/ateapi/internal/store/ateredis/ateredis_test.go b/cmd/ateapi/internal/store/ateredis/ateredis_test.go index 3d5355151..8d69e3488 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis_test.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis_test.go @@ -807,6 +807,97 @@ func receiveEvent(t *testing.T, ch <-chan store.WorkerEvent) store.WorkerEvent { } } +func TestRefreshLock_ExtendsTTL(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + key := "test-lock" + value := "token-1" + otherValue := "token-2" + ttl := 10 * time.Second + + acquired, err := s.AcquireLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if !acquired { + t.Fatalf("expected lock to be acquired") + } + + // Advance to just before expiry, refresh, and verify another holder still + // can't acquire after the original TTL would have elapsed. + mr.FastForward(8 * time.Second) + refreshed, err := s.RefreshLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("RefreshLock failed: %v", err) + } + if !refreshed { + t.Fatalf("expected lock to be refreshed") + } + + mr.FastForward(5 * time.Second) + stolen, err := s.AcquireLock(ctx, key, otherValue, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if stolen { + t.Errorf("expected refreshed lock to still be held, but it was stolen") + } +} + +func TestRefreshLock_WrongValueReturnsFalse(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + key := "test-lock" + value := "token-1" + otherValue := "token-2" + ttl := 10 * time.Second + + acquired, err := s.AcquireLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if !acquired { + t.Fatalf("expected lock to be acquired") + } + + refreshed, err := s.RefreshLock(ctx, key, otherValue, ttl) + if err != nil { + t.Fatalf("RefreshLock failed: %v", err) + } + if refreshed { + t.Errorf("expected RefreshLock with wrong value to return false") + } +} + +func TestRefreshLock_AfterExpirationReturnsFalse(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + key := "test-lock" + value := "token-1" + ttl := 5 * time.Second + + acquired, err := s.AcquireLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("AcquireLock failed: %v", err) + } + if !acquired { + t.Fatalf("expected lock to be acquired") + } + + mr.FastForward(6 * time.Second) + + refreshed, err := s.RefreshLock(ctx, key, value, ttl) + if err != nil { + t.Fatalf("RefreshLock failed: %v", err) + } + if refreshed { + t.Errorf("expected RefreshLock after expiration to return false") + } +} + func TestAcquireLock_NonReentry(t *testing.T) { mr, s, ctx := setupTest(t) defer mr.Close() diff --git a/cmd/ateapi/internal/store/store.go b/cmd/ateapi/internal/store/store.go index 5de4bdc30..287df48a0 100644 --- a/cmd/ateapi/internal/store/store.go +++ b/cmd/ateapi/internal/store/store.go @@ -105,6 +105,12 @@ type Interface interface { // Returns an error only on database failure. ReleaseLock(ctx context.Context, key string, value string) error + // RefreshLock extends the TTL on a distributed lock iff the stored value still matches. + // Returns true if the TTL was extended. + // Returns false if we no longer own the lock (either expired and reclaimed, or never held). + // Returns an error only on database failure. + RefreshLock(ctx context.Context, key string, value string, ttl time.Duration) (bool, error) + // DebugClearAll drop all data from the database. Useful for debugging / local testing/ DebugClearAll(ctx context.Context) error } diff --git a/cmd/ateapi/main.go b/cmd/ateapi/main.go index e10cebb0c..7236c8819 100644 --- a/cmd/ateapi/main.go +++ b/cmd/ateapi/main.go @@ -72,6 +72,8 @@ var ( sessionIDCAPoolFile = pflag.String("session-id-ca-pool", "", "The file that contains the CA pool for signing session JWTs") workerpoolCACerts = pflag.String("workerpool-ca-certs", "", "The file that contains the CA for verifying workerpool client certificates.") + actorWorkflowDeadline = pflag.Duration("actor-workflow-deadline", 5*time.Minute, "Maximum wall-clock duration of a single Resume/Suspend workflow. A heartbeat keeps the per-actor lock alive across this duration; raise it for slow image registries.") + showVersion = pflag.Bool("version", false, "Print version and exit.") authMode = pflag.String("auth-mode", "mtls", "Auth mode for incoming gRPC: mtls|jwt. 'mtls' (default) relies on transport-level mTLS for client identity. 'jwt' additionally requires a Kubernetes ServiceAccount Bearer token on every RPC. Substrate will drop support for JWT auth mode once the Pod Certificates feature is enabled by default in the minimum supported Kubernetes version.") clientJWTCAFile = pflag.String("client-jwt-ca-cert", ateapiauth.DefaultServiceAccountCAFile, "CA cert file used to verify TLS when fetching the OIDC discovery document and JWKS for JWT authentication. Defaults to the in-cluster service account CA.") @@ -158,7 +160,7 @@ func main() { ateFactory.WaitForCacheSync(stopCh) dialer := controlapi.NewAteletDialer(workerPodInformer.GetIndexer(), ateletPodInformer.GetIndexer()) - sm := controlapi.NewService(redisPersistence, workerCache, actorTemplateLister, workerPoolLister, sandboxConfigLister, dialer, clientset) + sm := controlapi.NewService(redisPersistence, workerCache, actorTemplateLister, workerPoolLister, sandboxConfigLister, dialer, clientset, *actorWorkflowDeadline) jwtIssuerDiscoveryClient := buildK8sServiceAccountIssuerDiscoveryClient(ctx, *clientJWTCAFile, *clientJWTIssuer) if authModeParsed == ateapiauth.ModeJWT && jwtIssuerDiscoveryClient == nil { @@ -247,6 +249,7 @@ func logFlagValues(ctx context.Context) { slog.String("session-id-ca-pool", *sessionIDCAPoolFile), slog.String("workerpool-ca-certs", *workerpoolCACerts), slog.String("auth-mode", *authMode), + slog.Duration("actor-workflow-deadline", *actorWorkflowDeadline), ) }