From c1ce06f3130bbf62d4e3f6c4fd3290b61d8e3e64 Mon Sep 17 00:00:00 2001 From: Albert Wu Date: Mon, 8 Jun 2026 12:47:18 -0700 Subject: [PATCH 1/2] fix(speculate): gate finalize on the batch's own build result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tryFinalize only consulted dependency batches, never the batch's own build, so a failed build still merged and the request reached "landed". It now fetches the batch's build first and fails the batch when that build failed, only falling through to dependency evaluation once the own build has succeeded (waiting if it is not yet persisted or still in-flight). Enabling change: add BuildStore.GetByBatchID. Build.ID is the runner-assigned ID (e.g. "fake-build-fail-..."), not batch.ID, so a caller holding only a batch ID could not fetch its build via Get. This also fixes cancelBuild, which called Get(ctx, batch.ID) and was a silent no-op (always ErrNotFound); it now resolves the build by batch ID and updates it by build ID. e2e: drive a request to terminal status — assert the happy path reaches "landed" and a "sq-fake=build-fail" request reaches "error" (the end-to-end regression guard for this fix). Make PersistsStartedLogViaGatewayConsumer deterministic by asserting on the durable, append-only request_log row instead of polling the transient current status via the Status RPC. Co-Authored-By: Claude Opus 4.8 (1M context) --- submitqueue/extension/storage/build_store.go | 6 ++ .../storage/mock/build_store_mock.go | 15 +++ .../extension/storage/mysql/build_store.go | 32 +++++-- .../extension/storage/mysql/schema/build.sql | 4 +- .../controller/speculate/speculate.go | 77 ++++++++++++--- .../controller/speculate/speculate_test.go | 96 +++++++++++++++++-- test/e2e/submitqueue/suite_test.go | 82 ++++++++++++---- 7 files changed, 266 insertions(+), 46 deletions(-) diff --git a/submitqueue/extension/storage/build_store.go b/submitqueue/extension/storage/build_store.go index a2f89bc4..7ab8a5fe 100644 --- a/submitqueue/extension/storage/build_store.go +++ b/submitqueue/extension/storage/build_store.go @@ -27,6 +27,12 @@ type BuildStore interface { // Get retrieves a build by ID. Returns ErrNotFound if the build is not found. Get(ctx context.Context, id string) (entity.Build, error) + // GetByBatchID retrieves the build scheduled for the given batch. In the + // current model there is at most one build per batch (build.ID is the + // runner-assigned ID, so callers that only hold a batch ID cannot use Get). + // Returns ErrNotFound if no build exists for the batch. + GetByBatchID(ctx context.Context, batchID string) (entity.Build, error) + // Create creates a new build. The build must have a unique ID already assigned. // Returns ErrAlreadyExists if a build with the same ID already exists. Create(ctx context.Context, build entity.Build) error diff --git a/submitqueue/extension/storage/mock/build_store_mock.go b/submitqueue/extension/storage/mock/build_store_mock.go index 675a2f58..2c4bf171 100644 --- a/submitqueue/extension/storage/mock/build_store_mock.go +++ b/submitqueue/extension/storage/mock/build_store_mock.go @@ -70,6 +70,21 @@ func (mr *MockBuildStoreMockRecorder) Get(ctx, id any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBuildStore)(nil).Get), ctx, id) } +// GetByBatchID mocks base method. +func (m *MockBuildStore) GetByBatchID(ctx context.Context, batchID string) (entity.Build, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetByBatchID", ctx, batchID) + ret0, _ := ret[0].(entity.Build) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetByBatchID indicates an expected call of GetByBatchID. +func (mr *MockBuildStoreMockRecorder) GetByBatchID(ctx, batchID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByBatchID", reflect.TypeOf((*MockBuildStore)(nil).GetByBatchID), ctx, batchID) +} + // UpdateStatus mocks base method. func (m *MockBuildStore) UpdateStatus(ctx context.Context, id string, newStatus entity.BuildStatus) error { m.ctrl.T.Helper() diff --git a/submitqueue/extension/storage/mysql/build_store.go b/submitqueue/extension/storage/mysql/build_store.go index 92233e6b..44672a94 100644 --- a/submitqueue/extension/storage/mysql/build_store.go +++ b/submitqueue/extension/storage/mysql/build_store.go @@ -44,23 +44,41 @@ func (s *buildStore) Get(ctx context.Context, id string) (ret entity.Build, retE op := metrics.Begin(s.scope, "get") defer func() { op.Complete(retErr) }() + return s.scanBuild(ctx, + "SELECT id, batch_id, speculation_path, score, status FROM build WHERE id = ?", + "id="+id, id) +} + +// GetByBatchID retrieves the build scheduled for the given batch. There is at +// most one build per batch in the current model, so it returns that single row +// (LIMIT 1). Returns ErrNotFound if no build exists for the batch. +func (s *buildStore) GetByBatchID(ctx context.Context, batchID string) (ret entity.Build, retErr error) { + op := metrics.Begin(s.scope, "get_by_batch_id") + defer func() { op.Complete(retErr) }() + + return s.scanBuild(ctx, + "SELECT id, batch_id, speculation_path, score, status FROM build WHERE batch_id = ? LIMIT 1", + "batch_id="+batchID, batchID) +} + +// scanBuild runs a single-row build query and decodes the result, including the +// speculation_path JSON column. label is used only for error context (e.g. +// "id=…" or "batch_id=…"). Returns ErrNotFound when the query matches no row. +func (s *buildStore) scanBuild(ctx context.Context, query, label string, args ...any) (entity.Build, error) { var build entity.Build var speculationPathJSON []byte - err := s.db.QueryRowContext(ctx, - "SELECT id, batch_id, speculation_path, score, status FROM build WHERE id = ?", - id, - ).Scan(&build.ID, &build.BatchID, &speculationPathJSON, &build.Score, &build.Status) - + err := s.db.QueryRowContext(ctx, query, args...). + Scan(&build.ID, &build.BatchID, &speculationPathJSON, &build.Score, &build.Status) if errors.Is(err, sql.ErrNoRows) { return entity.Build{}, storage.WrapNotFound(err) } if err != nil { - return entity.Build{}, fmt.Errorf("failed to get build entity id=%s from the database: %w", id, err) + return entity.Build{}, fmt.Errorf("failed to get build entity %s from the database: %w", label, err) } if err := json.Unmarshal(speculationPathJSON, &build.SpeculationPath); err != nil { - return entity.Build{}, fmt.Errorf("failed to unmarshal speculation_path for build entity id=%s from the database: %w", id, err) + return entity.Build{}, fmt.Errorf("failed to unmarshal speculation_path for build entity %s from the database: %w", label, err) } return build, nil diff --git a/submitqueue/extension/storage/mysql/schema/build.sql b/submitqueue/extension/storage/mysql/schema/build.sql index 93bc5744..0f0a192c 100644 --- a/submitqueue/extension/storage/mysql/schema/build.sql +++ b/submitqueue/extension/storage/mysql/schema/build.sql @@ -4,5 +4,7 @@ CREATE TABLE IF NOT EXISTS build ( speculation_path JSON NOT NULL, score FLOAT NOT NULL, status VARCHAR(64) NOT NULL, - PRIMARY KEY (id) + PRIMARY KEY (id), + -- Supports GetByBatchID: SELECT ... WHERE batch_id = ? + INDEX idx_batch_id (batch_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index 05471979..ae108b0d 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -170,8 +170,11 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e return nil } -// tryFinalize publishes to merge and transitions to Merging iff every -// dependency batch has reached Succeeded. Cancelled deps are treated as +// tryFinalize publishes to merge and transitions to Merging iff this batch's +// own build has Succeeded AND every dependency batch has reached Succeeded. +// The own-build gate comes first: a Failed build fails the batch, and a build +// still in flight (or not yet persisted) parks the batch until the next +// buildsignal re-triggers speculate. Cancelled deps are treated as // out-of-the-way: the cancelled batch will never land, so it can no longer // conflict — drop it from the chain and proceed. Failed deps still cascade // via failOnDependency. If some deps are still in flight, the call is a @@ -182,6 +185,37 @@ func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) e // from the chain and re-issue speculation for the surviving ordering(s) // — instead of cascading the failure into requests that could still land. func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error { + // Gate on the batch's own build before considering dependencies + build, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + // The build controller has not persisted the Build yet (race with + // startSpeculation's publish to build). Wait for the next event. + metrics.NamedCounter(c.metricsScope, opName, "waiting_on_build", 1) + return nil + } + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get build for batch %s: %w", batch.ID, err) + } + + switch build.Status { + case entity.BuildStatusSucceeded: + // Own build passed; fall through to dependency evaluation. + case entity.BuildStatusFailed: + return c.failOnBuild(ctx, batch, build) + default: + // Accepted, Running, or any other non-terminal status: the build is + // still in flight. buildsignal re-triggers speculate on every poll, so + // simply wait. + metrics.NamedCounter(c.metricsScope, opName, "waiting_on_build", 1) + c.logger.Debugw("own build not yet succeeded; waiting", + "batch_id", batch.ID, + "build_id", build.ID, + "build_status", string(build.Status), + ) + return nil + } + deps, err := c.fetchDependencies(ctx, batch) if err != nil { return err @@ -230,11 +264,21 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error return nil } +// failOnBuild transitions a Speculating batch to Failed when its own build has +// reached a non-succeeding terminal status, then reconciles via failBatch. +func (c *Controller) failOnBuild(ctx context.Context, batch entity.Batch, build entity.Build) error { + metrics.NamedCounter(c.metricsScope, opName, "build_failed", 1) + c.logger.Warnw("own build in non-succeeding terminal state; failing batch", + "batch_id", batch.ID, + "build_id", build.ID, + "build_status", string(build.Status), + ) + return c.failBatch(ctx, batch) +} + // failOnDependency transitions a Speculating batch to Failed when one of its -// dependencies has reached a non-succeeding terminal state, then publishes to -// the conclude queue so the request store and request log get reconciled. -// Without this transition the batch would sit in Speculating forever — no -// downstream event ever fires for it again. +// dependencies has reached a non-succeeding terminal state, then reconciles +// via failBatch. func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, dep entity.Batch) error { metrics.NamedCounter(c.metricsScope, opName, "dependency_failed", 1) c.logger.Warnw("dependency in non-succeeding terminal state; failing batch", @@ -242,7 +286,14 @@ func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, d "dependency_id", dep.ID, "dependency_state", string(dep.State), ) + return c.failBatch(ctx, batch) +} +// failBatch CASes a Speculating batch to Failed and publishes to the conclude +// queue so the request store and request log get reconciled. Without this +// transition the batch would sit in Speculating forever — no downstream event +// ever fires for it again. +func (c *Controller) failBatch(ctx context.Context, batch entity.Batch) error { newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) @@ -326,17 +377,17 @@ func (c *Controller) cancelBatch(ctx context.Context, batch entity.Batch) error } // cancelBuild flips any in-flight Build entity for the batch to -// BuildStatusCancelled. Builds use build.ID == batch.ID, so a single Get -// covers every build scheduled for the batch. Tolerates ErrNotFound (no -// build was ever scheduled — the batch was cancelled before speculation -// started building) and skips already-terminal builds. +// BuildStatusCancelled. It looks the build up by batch ID (build.ID is the +// runner-assigned ID, not the batch ID). Tolerates ErrNotFound (no build was +// ever scheduled — the batch was cancelled before speculation started building) +// and skips already-terminal builds. // // This is the hook point for a future external CI integration: today the // system has no external runner, so the local state flip is the complete // cancellation. Once a runner exists, it must be invoked here before the // local UpdateStatus. func (c *Controller) cancelBuild(ctx context.Context, batch entity.Batch) error { - build, err := c.store.GetBuildStore().Get(ctx, batch.ID) + build, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) if err != nil { if errors.Is(err, storage.ErrNotFound) { metrics.NamedCounter(c.metricsScope, opName, "cancel_build_not_found", 1) @@ -351,9 +402,9 @@ func (c *Controller) cancelBuild(ctx context.Context, batch entity.Batch) error return nil } - if err := c.store.GetBuildStore().UpdateStatus(ctx, batch.ID, entity.BuildStatusCancelled); err != nil { + if err := c.store.GetBuildStore().UpdateStatus(ctx, build.ID, entity.BuildStatusCancelled); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to cancel build for batch %s: %w", batch.ID, err) + return fmt.Errorf("failed to cancel build %s for batch %s: %w", build.ID, batch.ID, err) } metrics.NamedCounter(c.metricsScope, opName, "cancel_build_done", 1) return nil diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 2342bd17..4c9e3f35 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -51,6 +51,16 @@ func testBatch(state entity.BatchState, deps ...string) entity.Batch { } } +// expectOwnBuildSucceeded returns a BuildStore mock whose GetByBatchID reports +// the batch's own build as Succeeded, satisfying the own-build gate in +// tryFinalize so the Speculating-path tests reach dependency evaluation. +func expectOwnBuildSucceeded(ctrl *gomock.Controller, batchID string) *storagemock.MockBuildStore { + bs := storagemock.NewMockBuildStore(ctrl) + bs.EXPECT().GetByBatchID(gomock.Any(), batchID).Return( + entity.Build{ID: "fake-build-" + batchID, BatchID: batchID, Status: entity.BuildStatusSucceeded}, nil) + return bs +} + // newTestController wires a controller with a registry covering all topics the // speculate controller may publish to. The publisher returns publishErr (or nil). func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { @@ -140,6 +150,7 @@ func TestController_Process_FinalizeNoDeps(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -160,6 +171,7 @@ func TestController_Process_FinalizeAllDepsSucceeded(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -178,6 +190,7 @@ func TestController_Process_WaitingOnDep(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -198,6 +211,7 @@ func TestController_Process_FailedDepFailsBatch(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -220,6 +234,72 @@ func TestController_Process_CancelledDepSkipped(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// tryFinalize: a failed own build must fail the batch (Speculating → Failed) +// and publish to conclude, independent of any dependencies. This is the core +// build-gating fix — previously a failed build still advanced to merge. +func TestController_Process_FailedOwnBuildFailsBatch(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateFailed).Return(nil) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( + entity.Build{ID: "fake-build-fail-1", BatchID: batch.ID, Status: entity.BuildStatusFailed}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// tryFinalize: a build still running parks the batch — no merge, no state change. +func TestController_Process_WaitingOnOwnBuild(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected — the build has not finished. + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( + entity.Build{ID: "fake-build-run-1", BatchID: batch.ID, Status: entity.BuildStatusRunning}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// tryFinalize: when the Build row is not yet persisted (race with the build +// controller), the batch waits — no merge, no state change. +func TestController_Process_OwnBuildNotFoundWaits(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected — no build to gate on yet. + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -358,10 +438,10 @@ func TestController_Process_CancellingTerminalFlow(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{ - ID: batch.ID, BatchID: batch.ID, Status: entity.BuildStatusRunning, + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{ + ID: "fake-build-run-1", BatchID: batch.ID, Status: entity.BuildStatusRunning, }, nil) - buildStore.EXPECT().UpdateStatus(gomock.Any(), batch.ID, entity.BuildStatusCancelled).Return(nil) + buildStore.EXPECT().UpdateStatus(gomock.Any(), "fake-build-run-1", entity.BuildStatusCancelled).Return(nil) depStore := storagemock.NewMockBatchDependentStore(ctrl) depStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.BatchDependent{ @@ -423,8 +503,8 @@ func TestController_Process_CancellingBuildAlreadyTerminal(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{ - ID: batch.ID, BatchID: batch.ID, Status: entity.BuildStatusSucceeded, + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{ + ID: "fake-build-ok-1", BatchID: batch.ID, Status: entity.BuildStatusSucceeded, }, nil) // No UpdateStatus expected — the build is already terminal. @@ -454,7 +534,7 @@ func TestController_Process_CancellingNoBuildYet(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) // No UpdateStatus expected. depStore := storagemock.NewMockBatchDependentStore(ctrl) @@ -484,7 +564,7 @@ func TestController_Process_CancellingNoDependents(t *testing.T) { batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateCancelled).Return(nil) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) depStore := storagemock.NewMockBatchDependentStore(ctrl) depStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.BatchDependent{BatchID: batch.ID, Dependents: []string{}, Version: 1}, nil) @@ -528,7 +608,7 @@ func TestController_Process_CancellingTerminalCASVersionMismatch(t *testing.T) { Return(storage.ErrVersionMismatch) buildStore := storagemock.NewMockBuildStore(ctrl) - buildStore.EXPECT().Get(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 91fb0fb7..ce09efb7 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -167,18 +167,63 @@ func (s *E2EIntegrationSuite) TestPingOrchestrator() { s.log.Logf("Orchestrator ping: %s", resp.Message) } +// TestLandRequest_SinglePR drives a normal request through the orchestrator +// pipeline and asserts it lands. e2e-test-queue is wired with immediate fakes +// (every stage succeeds), so the happy path reaches the terminal "landed" status. func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { + s.landAndWait("github://uber/e2e-service/pull/123/abcdef0123456789abcdef0123456789abcdef01", "landed", 30*time.Second) +} + +// TestLandRequest_BuildFails verifies that a build failure (injected via the +// fake build runner's "sq-fake=build-fail" marker) drives the request to a +// terminal "error" rather than landing. This is the end-to-end guard for the +// speculate build-gating fix: before that fix the batch ignored its own failed +// build and still merged. +func (s *E2EIntegrationSuite) TestLandRequest_BuildFails() { + s.landAndWait("github://uber/e2e-service/pull/124/abcdef0123456789abcdef0123456789abcdef02?sq-fake=build-fail", "error", 30*time.Second) +} + +// landAndWait submits a single-URI land request to e2e-test-queue and asserts +// it reaches the expected terminal status. +func (s *E2EIntegrationSuite) landAndWait(uri, wantStatus string, timeout time.Duration) { + t := s.T() req := &gatewaypb.LandRequest{ Queue: "e2e-test-queue", - Change: &gatewaypb.Change{Uris: []string{"github://uber/e2e-service/pull/123/abcdef0123456789abcdef0123456789abcdef01"}}, + Change: &gatewaypb.Change{Uris: []string{uri}}, Strategy: gatewaypb.Strategy_REBASE, } - s.log.Logf("Sending Land request (single PR) for queue=%s", req.Queue) + s.log.Logf("Sending Land request for queue=%s uri=%s", req.Queue, uri) resp, err := s.gatewayClient.Land(s.ctx, req) - require.NoError(s.T(), err, "Land request failed") - require.NotEmpty(s.T(), resp.Sqid, "SQID should not be empty") - s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, resp.Sqid, "SQID should not be empty") + + final := s.waitForStatus(resp.Sqid, timeout) + assert.Equal(t, wantStatus, final) +} + +// waitForStatus polls the gateway Status RPC until the request reaches a +// terminal status (landed / error / cancelled) and returns it, or fails the +// test if no terminal status is observed within timeout. +func (s *E2EIntegrationSuite) waitForStatus(sqid string, timeout time.Duration) string { + t := s.T() + var final string + require.Eventually(t, func() bool { + resp, err := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if err != nil { + s.log.Logf("Status RPC error for sqid=%s: %v", sqid, err) + return false + } + switch resp.Status { + case "landed", "error", "cancelled": + final = resp.Status + s.log.Logf("sqid=%s reached terminal status=%s last_error=%q", sqid, resp.Status, resp.LastError) + return true + default: + return false + } + }, timeout, 500*time.Millisecond, "sqid=%s did not reach a terminal status within %s", sqid, timeout) + return final } // TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log @@ -186,12 +231,13 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { // entries to the log topic (it never writes the request log itself), and the // gateway's log consumer drains that topic and persists them to storage. // -// We observe this through the gateway Status RPC: immediately after Land the -// status is "accepted" (the gateway's synchronous direct write), and once the -// orchestrator's start controller publishes "started" to the log topic, the -// gateway consumer persists it and Status advances to "started". Seeing -// "started" therefore proves the publish→consume→persist path works across both -// services. +// We assert on the durable request_log table rather than the live Status RPC. +// request_log is append-only and the gateway consumer is its sole writer, so a +// "started" row proves the orchestrator-publish → gateway-consume → persist path +// ran. The Status RPC only returns the *current* status, and "started" is a +// transient intermediate state: on the fast happy path the request reaches a +// terminal status within a single poll interval, so polling Status for "started" +// loses the race. The persisted log row, by contrast, never disappears. func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { t := s.T() @@ -206,15 +252,17 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) require.Eventually(t, func() bool { - resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) - if statusErr != nil { - s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + var count int + queryErr := s.db.QueryRowContext(s.ctx, + "SELECT COUNT(*) FROM request_log WHERE request_id = ? AND status = ?", + sqid, string(entity.RequestStatusStarted)).Scan(&count) + if queryErr != nil { + s.log.Logf("request_log query for sqid=%s not ready yet: %v", sqid, queryErr) return false } - s.log.Logf("Status(%s) = %q", sqid, resp.Status) - return resp.Status == string(entity.RequestStatusStarted) + return count > 0 }, persistTimeout, persistPollInterval, - "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + "request %s should have a %q entry persisted by the gateway log consumer", sqid, entity.RequestStatusStarted) s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) } From d21c6a1c7e60be55c4f8c68fb9a18d687d2c11c0 Mon Sep 17 00:00:00 2001 From: Albert Wu Date: Mon, 8 Jun 2026 19:09:24 -0700 Subject: [PATCH 2/2] fix(speculate): unblock failed build dependents --- submitqueue/extension/storage/build_store.go | 10 +- .../extension/storage/mysql/build_store.go | 10 +- .../extension/storage/mysql/schema/build.sql | 3 +- .../orchestrator/controller/build/build.go | 44 ++++- .../controller/build/build_test.go | 76 ++++++-- .../controller/speculate/speculate.go | 30 ++-- .../controller/speculate/speculate_test.go | 170 ++++++++++++++---- 7 files changed, 274 insertions(+), 69 deletions(-) diff --git a/submitqueue/extension/storage/build_store.go b/submitqueue/extension/storage/build_store.go index 7ab8a5fe..429c3edd 100644 --- a/submitqueue/extension/storage/build_store.go +++ b/submitqueue/extension/storage/build_store.go @@ -27,14 +27,14 @@ type BuildStore interface { // Get retrieves a build by ID. Returns ErrNotFound if the build is not found. Get(ctx context.Context, id string) (entity.Build, error) - // GetByBatchID retrieves the build scheduled for the given batch. In the - // current model there is at most one build per batch (build.ID is the - // runner-assigned ID, so callers that only hold a batch ID cannot use Get). + // GetByBatchID retrieves the single build scheduled for the given batch + // (build.ID is the runner-assigned ID, so callers that only hold a batch ID + // cannot use Get). // Returns ErrNotFound if no build exists for the batch. GetByBatchID(ctx context.Context, batchID string) (entity.Build, error) - // Create creates a new build. The build must have a unique ID already assigned. - // Returns ErrAlreadyExists if a build with the same ID already exists. + // Create creates a new build. The build must have a unique ID and batch ID. + // Returns ErrAlreadyExists if either uniqueness constraint is violated. Create(ctx context.Context, build entity.Build) error // UpdateStatus updates the status of a build. diff --git a/submitqueue/extension/storage/mysql/build_store.go b/submitqueue/extension/storage/mysql/build_store.go index 44672a94..1826db98 100644 --- a/submitqueue/extension/storage/mysql/build_store.go +++ b/submitqueue/extension/storage/mysql/build_store.go @@ -49,15 +49,14 @@ func (s *buildStore) Get(ctx context.Context, id string) (ret entity.Build, retE "id="+id, id) } -// GetByBatchID retrieves the build scheduled for the given batch. There is at -// most one build per batch in the current model, so it returns that single row -// (LIMIT 1). Returns ErrNotFound if no build exists for the batch. +// GetByBatchID retrieves the single build scheduled for the given batch. +// Returns ErrNotFound if no build exists for the batch. func (s *buildStore) GetByBatchID(ctx context.Context, batchID string) (ret entity.Build, retErr error) { op := metrics.Begin(s.scope, "get_by_batch_id") defer func() { op.Complete(retErr) }() return s.scanBuild(ctx, - "SELECT id, batch_id, speculation_path, score, status FROM build WHERE batch_id = ? LIMIT 1", + "SELECT id, batch_id, speculation_path, score, status FROM build WHERE batch_id = ?", "batch_id="+batchID, batchID) } @@ -84,7 +83,8 @@ func (s *buildStore) scanBuild(ctx context.Context, query, label string, args .. return build, nil } -// Create creates a new build. The build must have a unique ID already assigned. Returns ErrAlreadyExists if the build ID already exists. +// Create creates a new build. The build must have a unique ID and batch ID. +// Returns ErrAlreadyExists if either uniqueness constraint is violated. func (s *buildStore) Create(ctx context.Context, build entity.Build) (retErr error) { op := metrics.Begin(s.scope, "create") defer func() { op.Complete(retErr) }() diff --git a/submitqueue/extension/storage/mysql/schema/build.sql b/submitqueue/extension/storage/mysql/schema/build.sql index 0f0a192c..d186022c 100644 --- a/submitqueue/extension/storage/mysql/schema/build.sql +++ b/submitqueue/extension/storage/mysql/schema/build.sql @@ -5,6 +5,5 @@ CREATE TABLE IF NOT EXISTS build ( score FLOAT NOT NULL, status VARCHAR(64) NOT NULL, PRIMARY KEY (id), - -- Supports GetByBatchID: SELECT ... WHERE batch_id = ? - INDEX idx_batch_id (batch_id) + UNIQUE KEY uniq_batch_id (batch_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/orchestrator/controller/build/build.go b/submitqueue/orchestrator/controller/build/build.go index 6a9ac930..1628f479 100644 --- a/submitqueue/orchestrator/controller/build/build.go +++ b/submitqueue/orchestrator/controller/build/build.go @@ -114,6 +114,17 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil } + existing, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get existing build for batch %s: %w", batch.ID, err) + } + } else { + metrics.NamedCounter(c.metricsScope, opName, "build_already_exists", 1) + return c.publishExisting(ctx, existing) + } + // Assemble base (dependency batches in order) and head (this batch). base, err := c.collectChanges(ctx, batch.Dependencies) if err != nil { @@ -148,11 +159,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // Persist the initial Build snapshot so the buildsignal poll loop has a - // row to UpdateStatus against. ErrAlreadyExists is benign — a redelivery - // of this message after a previous successful Create. - if err := c.store.GetBuildStore().Create(ctx, build); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { - metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to persist build %s: %w", build.ID, err) + // row to UpdateStatus against. ErrAlreadyExists means either this exact + // build or another build for the same batch already won a redelivery race; + // publish the stored row so the poll loop follows the source of truth. + if err := c.store.GetBuildStore().Create(ctx, build); err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to persist build %s: %w", build.ID, err) + } + build, err = c.store.GetBuildStore().GetByBatchID(ctx, batch.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) + return fmt.Errorf("failed to get existing build for batch %s after duplicate create: %w", batch.ID, err) + } + metrics.NamedCounter(c.metricsScope, opName, "build_already_exists", 1) } // Hand off to the buildsignal poll loop; it calls Status, updates the @@ -173,6 +193,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil // Success - message will be acked } +func (c *Controller) publishExisting(ctx context.Context, build entity.Build) error { + if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { + metrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1) + return fmt.Errorf("failed to publish existing build to buildsignal: %w", err) + } + c.logger.Infow("published existing build to buildsignal", + "batch_id", build.BatchID, + "build_id", build.ID, + "status", string(build.Status), + "topic_key", consumer.TopicKeyBuildSignal, + ) + return nil +} + // collectChanges loads each batch by ID and concatenates the Change values // from its contained requests in batch order. Used to build the base // (dependency batches) and head (this batch) inputs to BuildRunner.Trigger. diff --git a/submitqueue/orchestrator/controller/build/build_test.go b/submitqueue/orchestrator/controller/build/build_test.go index 212c21b7..bc8cd204 100644 --- a/submitqueue/orchestrator/controller/build/build_test.go +++ b/submitqueue/orchestrator/controller/build/build_test.go @@ -64,6 +64,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound).AnyTimes() mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() store := storagemock.NewMockStorage(ctrl) @@ -169,6 +170,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { var created entity.Build mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), headBatch.ID).Return(entity.Build{}, storage.ErrNotFound) mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, b entity.Build) error { created = b @@ -227,19 +229,22 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) { assert.Equal(t, published.ID, created.ID) } -// TestController_Process_BuildStoreAlreadyExistsIsSwallowed covers the -// redelivery case: Create returns ErrAlreadyExists, the controller proceeds -// to publish to buildsignal anyway. The polling loop will pick up the -// existing row via UpdateStatus. -func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { +// TestController_Process_BuildStoreAlreadyExistsPublishesStoredBuild covers the +// race where another redelivery creates the batch's build after our initial +// GetByBatchID miss. The controller must publish the stored row's build ID, +// not the freshly-triggered duplicate runner ID. +func TestController_Process_BuildStoreAlreadyExistsPublishesStoredBuild(t *testing.T) { ctrl := gomock.NewController(t) batch := testBatch() + existing := entity.Build{ID: "build-existing", BatchID: batch.ID, Status: entity.BuildStatusAccepted} mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(storage.ErrAlreadyExists) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(existing, nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() @@ -249,11 +254,13 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { br := buildrunnermock.NewMockBuildRunner(ctrl) br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil) - publishCalled := false + var published entity.BuildID mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn( - func(_ context.Context, _ string, _ entityqueue.Message) error { - publishCalled = true + func(_ context.Context, _ string, msg entityqueue.Message) error { + var err error + published, err = entity.BuildIDFromBytes(msg.Payload) + require.NoError(t, err) return nil }, ).Times(1) @@ -271,7 +278,53 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) { delivery.EXPECT().Attempt().Return(1).AnyTimes() require.NoError(t, controller.Process(context.Background(), delivery)) - assert.True(t, publishCalled, "publish to buildsignal must run even when Create reports ErrAlreadyExists") + assert.Equal(t, existing.ID, published.ID) +} + +func TestController_Process_ExistingBuildSkipsTriggerAndCreate(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := testBatch() + existing := entity.Build{ID: "build-existing", BatchID: batch.ID, Status: entity.BuildStatusAccepted} + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(existing, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() + // RequestStore is not touched because existing builds skip change assembly. + + br := buildrunnermock.NewMockBuildRunner(ctrl) + // No Trigger expectation: redelivery must not launch another external build. + + var published entity.BuildID + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + var err error + published, err = entity.BuildIDFromBytes(msg.Payload) + require.NoError(t, err) + return nil + }, + ).Times(1) + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, + ) + require.NoError(t, err) + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build") + + msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) + assert.Equal(t, existing.ID, published.ID) } // TestController_Process_TriggerFailure verifies a build-runner failure is @@ -285,7 +338,10 @@ func TestController_Process_TriggerFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes() - // No build store expectation: Trigger failure must short-circuit before Create. + mockBuildStore := storagemock.NewMockBuildStore(ctrl) + mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound) + store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes() + // No Create expectation: Trigger failure must short-circuit before persistence. br := buildrunnermock.NewMockBuildRunner(ctrl) br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). diff --git a/submitqueue/orchestrator/controller/speculate/speculate.go b/submitqueue/orchestrator/controller/speculate/speculate.go index ae108b0d..b6f9e566 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate.go +++ b/submitqueue/orchestrator/controller/speculate/speculate.go @@ -115,12 +115,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r // Terminal state: re-fan-out for self-healing in case a previous publish // was lost. Always re-publish to conclude (idempotent on the batch ID). - // For Cancelled specifically also re-publish to dependents — a crash - // between the terminal CAS and the dependent publish would otherwise - // leave them stuck waiting on a Cancelled dep. + // For terminal states that unblock downstream dependencies by being observed + // as terminal non-success, also re-publish to dependents. A crash between + // the terminal CAS and the dependent publish would otherwise leave them + // stuck waiting. if batch.State.IsTerminal() { metrics.NamedCounter(c.metricsScope, opName, "self_heal_terminal", 1) - if batch.State == entity.BatchStateCancelled { + if batch.State == entity.BatchStateCancelled || batch.State == entity.BatchStateFailed { if err := c.respeculateDependents(ctx, batch); err != nil { return err } @@ -198,10 +199,10 @@ func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error return fmt.Errorf("failed to get build for batch %s: %w", batch.ID, err) } - switch build.Status { - case entity.BuildStatusSucceeded: + switch { + case build.Status == entity.BuildStatusSucceeded: // Own build passed; fall through to dependency evaluation. - case entity.BuildStatusFailed: + case build.Status.IsTerminal(): return c.failOnBuild(ctx, batch, build) default: // Accepted, Running, or any other non-terminal status: the build is @@ -289,16 +290,23 @@ func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, d return c.failBatch(ctx, batch) } -// failBatch CASes a Speculating batch to Failed and publishes to the conclude -// queue so the request store and request log get reconciled. Without this -// transition the batch would sit in Speculating forever — no downstream event -// ever fires for it again. +// failBatch CASes a Speculating batch to Failed, wakes downstream dependents so +// they can observe the terminal dependency, and publishes to the conclude queue +// so the request store and request log get reconciled. Without this transition +// the batch would sit in Speculating forever — no downstream event ever fires +// for it again. func (c *Controller) failBatch(ctx context.Context, batch entity.Batch) error { newVersion := batch.Version + 1 if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err) } + batch.Version = newVersion + batch.State = entity.BatchStateFailed + + if err := c.respeculateDependents(ctx, batch); err != nil { + return err + } if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) diff --git a/submitqueue/orchestrator/controller/speculate/speculate_test.go b/submitqueue/orchestrator/controller/speculate/speculate_test.go index 4c9e3f35..39d501e8 100644 --- a/submitqueue/orchestrator/controller/speculate/speculate_test.go +++ b/submitqueue/orchestrator/controller/speculate/speculate_test.go @@ -61,6 +61,16 @@ func expectOwnBuildSucceeded(ctrl *gomock.Controller, batchID string) *storagemo return bs } +func expectBatchDependents(ctrl *gomock.Controller, batchID string, dependents []string) *storagemock.MockBatchDependentStore { + ds := storagemock.NewMockBatchDependentStore(ctrl) + ds.EXPECT().Get(gomock.Any(), batchID).Return(entity.BatchDependent{ + BatchID: batchID, + Dependents: dependents, + Version: 1, + }, nil) + return ds +} + // newTestController wires a controller with a registry covering all topics the // speculate controller may publish to. The publisher returns publishErr (or nil). func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { @@ -82,6 +92,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock {Key: consumer.TopicKeyBuild, Name: "build", Queue: mockQ}, {Key: consumer.TopicKeyMerge, Name: "merge", Queue: mockQ}, {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) @@ -212,6 +223,7 @@ func TestController_Process_FailedDepFailsBatch(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(expectOwnBuildSucceeded(ctrl, batch.ID)).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(expectBatchDependents(ctrl, batch.ID, nil)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -254,10 +266,64 @@ func TestController_Process_FailedOwnBuildFailsBatch(t *testing.T) { buildStore := storagemock.NewMockBuildStore(ctrl) buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( entity.Build{ID: "fake-build-fail-1", BatchID: batch.ID, Status: entity.BuildStatusFailed}, nil) + depStore := expectBatchDependents(ctrl, batch.ID, []string{"test-queue/batch/2", "test-queue/batch/3"}) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + + type pubRec struct { + topic string + msgID string + } + var records []pubRec + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + records = append(records, pubRec{topic: topic, msgID: msg.ID}) + return nil + }).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + + assert.Equal(t, []pubRec{ + {topic: "speculate", msgID: "test-queue/batch/2"}, + {topic: "speculate", msgID: "test-queue/batch/3"}, + {topic: "conclude", msgID: batch.ID}, + }, records) +} + +// tryFinalize: cancelled is a terminal non-success build status. buildsignal +// emits only one speculate event for terminal statuses, so treating Cancelled as +// "still waiting" would strand the batch in Speculating forever. +func TestController_Process_CancelledOwnBuildFailsBatch(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateFailed).Return(nil) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return( + entity.Build{ID: "fake-build-cancel-1", BatchID: batch.ID, Status: entity.BuildStatusCancelled}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(expectBatchDependents(ctrl, batch.ID, nil)).AnyTimes() controller := newTestController(t, ctrl, store, nil) require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) @@ -321,46 +387,88 @@ func TestController_Process_MergingNoOp(t *testing.T) { require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) } -// Terminal states re-fan-out to conclude for self-healing in case a previous -// publish was lost. State must not change (no UpdateState). The Cancelled -// terminal also re-fans-out dependents and is covered separately in -// TestController_Process_CancelledTerminalSelfHealsDependents. +// Terminal Succeeded re-fans-out to conclude for self-healing in case a +// previous publish was lost. State must not change (no UpdateState). func TestController_Process_TerminalSelfHeals(t *testing.T) { - for _, state := range []entity.BatchState{ - entity.BatchStateSucceeded, - entity.BatchStateFailed, - } { - t.Run(string(state), func(t *testing.T) { - ctrl := gomock.NewController(t) - batch := testBatch(state) + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSucceeded) - batchStore := storagemock.NewMockBatchStore(ctrl) - batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) - // No UpdateState expected. + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected. - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - // Require exactly one publish to the conclude topic for self-healing. - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), "conclude", gomock.Any()).Return(nil).Times(1) + // Require exactly one publish to the conclude topic for self-healing. + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), "conclude", gomock.Any()).Return(nil).Times(1) - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - }, - ) - require.NoError(t, err) + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + }, + ) + require.NoError(t, err) - logger := zaptest.NewLogger(t).Sugar() - controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + logger := zaptest.NewLogger(t).Sugar() + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") - require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) - }) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// Failed is terminal: redelivery must re-fan-out dependents and re-publish to +// conclude so a crash after the Failed CAS cannot strand downstream batches. +func TestController_Process_FailedTerminalSelfHealsDependents(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateFailed) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected. + + depStore := expectBatchDependents(ctrl, batch.ID, []string{"test-queue/batch/2", "test-queue/batch/3"}) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetBatchDependentStore().Return(depStore).AnyTimes() + + type pubRec struct { + topic string + msgID string } + var records []pubRec + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + records = append(records, pubRec{topic: topic, msgID: msg.ID}) + return nil + }).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + logger := zaptest.NewLogger(t).Sugar() + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + + assert.Equal(t, []pubRec{ + {topic: "speculate", msgID: "test-queue/batch/2"}, + {topic: "speculate", msgID: "test-queue/batch/3"}, + {topic: "conclude", msgID: batch.ID}, + }, records) } // Cancelled is terminal: redelivery must re-fan-out dependents (so a crash