Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions submitqueue/extension/storage/build_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ type BuildStore interface {
// Get retrieves a build by ID. Returns ErrNotFound if the build is not found.
Get(ctx context.Context, id string) (entity.Build, error)

// Create creates a new build. The build must have a unique ID already assigned.
// Returns ErrAlreadyExists if a build with the same ID already exists.
// GetByBatchID retrieves the single build scheduled for the given batch
// (build.ID is the runner-assigned ID, so callers that only hold a batch ID
// cannot use Get).
// Returns ErrNotFound if no build exists for the batch.
GetByBatchID(ctx context.Context, batchID string) (entity.Build, error)

// Create creates a new build. The build must have a unique ID and batch ID.
// Returns ErrAlreadyExists if either uniqueness constraint is violated.
Create(ctx context.Context, build entity.Build) error

// UpdateStatus updates the status of a build.
Expand Down
15 changes: 15 additions & 0 deletions submitqueue/extension/storage/mock/build_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 26 additions & 8 deletions submitqueue/extension/storage/mysql/build_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,47 @@ func (s *buildStore) Get(ctx context.Context, id string) (ret entity.Build, retE
op := metrics.Begin(s.scope, "get")
defer func() { op.Complete(retErr) }()

return s.scanBuild(ctx,
"SELECT id, batch_id, speculation_path, score, status FROM build WHERE id = ?",
"id="+id, id)
}

// GetByBatchID retrieves the single build scheduled for the given batch.
// Returns ErrNotFound if no build exists for the batch.
func (s *buildStore) GetByBatchID(ctx context.Context, batchID string) (ret entity.Build, retErr error) {
op := metrics.Begin(s.scope, "get_by_batch_id")
defer func() { op.Complete(retErr) }()

return s.scanBuild(ctx,
"SELECT id, batch_id, speculation_path, score, status FROM build WHERE batch_id = ?",
"batch_id="+batchID, batchID)
}

// scanBuild runs a single-row build query and decodes the result, including the
// speculation_path JSON column. label is used only for error context (e.g.
// "id=…" or "batch_id=…"). Returns ErrNotFound when the query matches no row.
func (s *buildStore) scanBuild(ctx context.Context, query, label string, args ...any) (entity.Build, error) {
var build entity.Build
var speculationPathJSON []byte

err := s.db.QueryRowContext(ctx,
"SELECT id, batch_id, speculation_path, score, status FROM build WHERE id = ?",
id,
).Scan(&build.ID, &build.BatchID, &speculationPathJSON, &build.Score, &build.Status)

err := s.db.QueryRowContext(ctx, query, args...).
Scan(&build.ID, &build.BatchID, &speculationPathJSON, &build.Score, &build.Status)
if errors.Is(err, sql.ErrNoRows) {
return entity.Build{}, storage.WrapNotFound(err)
}
if err != nil {
return entity.Build{}, fmt.Errorf("failed to get build entity id=%s from the database: %w", id, err)
return entity.Build{}, fmt.Errorf("failed to get build entity %s from the database: %w", label, err)
}

if err := json.Unmarshal(speculationPathJSON, &build.SpeculationPath); err != nil {
return entity.Build{}, fmt.Errorf("failed to unmarshal speculation_path for build entity id=%s from the database: %w", id, err)
return entity.Build{}, fmt.Errorf("failed to unmarshal speculation_path for build entity %s from the database: %w", label, err)
}

return build, nil
}

// Create creates a new build. The build must have a unique ID already assigned. Returns ErrAlreadyExists if the build ID already exists.
// Create creates a new build. The build must have a unique ID and batch ID.
// Returns ErrAlreadyExists if either uniqueness constraint is violated.
func (s *buildStore) Create(ctx context.Context, build entity.Build) (retErr error) {
op := metrics.Begin(s.scope, "create")
defer func() { op.Complete(retErr) }()
Expand Down
3 changes: 2 additions & 1 deletion submitqueue/extension/storage/mysql/schema/build.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ CREATE TABLE IF NOT EXISTS build (
speculation_path JSON NOT NULL,
score FLOAT NOT NULL,
status VARCHAR(64) NOT NULL,
PRIMARY KEY (id)
PRIMARY KEY (id),
UNIQUE KEY uniq_batch_id (batch_id)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not do enforcements or secondary indexes on storage layer

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
44 changes: 39 additions & 5 deletions submitqueue/orchestrator/controller/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
return nil
}

existing, err := c.store.GetBuildStore().GetByBatchID(ctx, batch.ID)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to get existing build for batch %s: %w", batch.ID, err)
}
} else {
metrics.NamedCounter(c.metricsScope, opName, "build_already_exists", 1)
return c.publishExisting(ctx, existing)
}

// Assemble base (dependency batches in order) and head (this batch).
base, err := c.collectChanges(ctx, batch.Dependencies)
if err != nil {
Expand Down Expand Up @@ -148,11 +159,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
}

// Persist the initial Build snapshot so the buildsignal poll loop has a
// row to UpdateStatus against. ErrAlreadyExists is benign — a redelivery
// of this message after a previous successful Create.
if err := c.store.GetBuildStore().Create(ctx, build); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to persist build %s: %w", build.ID, err)
// row to UpdateStatus against. ErrAlreadyExists means either this exact
// build or another build for the same batch already won a redelivery race;
// publish the stored row so the poll loop follows the source of truth.
if err := c.store.GetBuildStore().Create(ctx, build); err != nil {
if !errors.Is(err, storage.ErrAlreadyExists) {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to persist build %s: %w", build.ID, err)
}
build, err = c.store.GetBuildStore().GetByBatchID(ctx, batch.ID)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to get existing build for batch %s after duplicate create: %w", batch.ID, err)
}
metrics.NamedCounter(c.metricsScope, opName, "build_already_exists", 1)
}

// Hand off to the buildsignal poll loop; it calls Status, updates the
Expand All @@ -173,6 +193,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
return nil // Success - message will be acked
}

func (c *Controller) publishExisting(ctx context.Context, build entity.Build) error {
if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil {
metrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1)
return fmt.Errorf("failed to publish existing build to buildsignal: %w", err)
}
c.logger.Infow("published existing build to buildsignal",
"batch_id", build.BatchID,
"build_id", build.ID,
"status", string(build.Status),
"topic_key", consumer.TopicKeyBuildSignal,
)
return nil
}

// collectChanges loads each batch by ID and concatenates the Change values
// from its contained requests in batch order. Used to build the base
// (dependency batches) and head (this batch) inputs to BuildRunner.Trigger.
Expand Down
76 changes: 66 additions & 10 deletions submitqueue/orchestrator/controller/build/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.Mo
mockRequestStore := storagemock.NewMockRequestStore(ctrl)

mockBuildStore := storagemock.NewMockBuildStore(ctrl)
mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound).AnyTimes()
mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

store := storagemock.NewMockStorage(ctrl)
Expand Down Expand Up @@ -169,6 +170,7 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) {

var created entity.Build
mockBuildStore := storagemock.NewMockBuildStore(ctrl)
mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), headBatch.ID).Return(entity.Build{}, storage.ErrNotFound)
mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, b entity.Build) error {
created = b
Expand Down Expand Up @@ -227,19 +229,22 @@ func TestController_Process_TriggersWithBaseAndHead(t *testing.T) {
assert.Equal(t, published.ID, created.ID)
}

// TestController_Process_BuildStoreAlreadyExistsIsSwallowed covers the
// redelivery case: Create returns ErrAlreadyExists, the controller proceeds
// to publish to buildsignal anyway. The polling loop will pick up the
// existing row via UpdateStatus.
func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) {
// TestController_Process_BuildStoreAlreadyExistsPublishesStoredBuild covers the
// race where another redelivery creates the batch's build after our initial
// GetByBatchID miss. The controller must publish the stored row's build ID,
// not the freshly-triggered duplicate runner ID.
func TestController_Process_BuildStoreAlreadyExistsPublishesStoredBuild(t *testing.T) {
ctrl := gomock.NewController(t)

batch := testBatch()
existing := entity.Build{ID: "build-existing", BatchID: batch.ID, Status: entity.BuildStatusAccepted}

mockBatchStore := storagemock.NewMockBatchStore(ctrl)
mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes()
mockBuildStore := storagemock.NewMockBuildStore(ctrl)
mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound)
mockBuildStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(storage.ErrAlreadyExists)
mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(existing, nil)

store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
Expand All @@ -249,11 +254,13 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) {
br := buildrunnermock.NewMockBuildRunner(ctrl)
br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(entity.BuildID{ID: "build-dup"}, nil)

publishCalled := false
var published entity.BuildID
mockPub := queuemock.NewMockPublisher(ctrl)
mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, _ entityqueue.Message) error {
publishCalled = true
func(_ context.Context, _ string, msg entityqueue.Message) error {
var err error
published, err = entity.BuildIDFromBytes(msg.Payload)
require.NoError(t, err)
return nil
},
).Times(1)
Expand All @@ -271,7 +278,53 @@ func TestController_Process_BuildStoreAlreadyExistsIsSwallowed(t *testing.T) {
delivery.EXPECT().Attempt().Return(1).AnyTimes()

require.NoError(t, controller.Process(context.Background(), delivery))
assert.True(t, publishCalled, "publish to buildsignal must run even when Create reports ErrAlreadyExists")
assert.Equal(t, existing.ID, published.ID)
}

func TestController_Process_ExistingBuildSkipsTriggerAndCreate(t *testing.T) {
ctrl := gomock.NewController(t)

batch := testBatch()
existing := entity.Build{ID: "build-existing", BatchID: batch.ID, Status: entity.BuildStatusAccepted}

mockBatchStore := storagemock.NewMockBatchStore(ctrl)
mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes()
mockBuildStore := storagemock.NewMockBuildStore(ctrl)
mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(existing, nil)

store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes()
// RequestStore is not touched because existing builds skip change assembly.

br := buildrunnermock.NewMockBuildRunner(ctrl)
// No Trigger expectation: redelivery must not launch another external build.

var published entity.BuildID
mockPub := queuemock.NewMockPublisher(ctrl)
mockPub.EXPECT().Publish(gomock.Any(), "buildsignal", gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, msg entityqueue.Message) error {
var err error
published, err = entity.BuildIDFromBytes(msg.Payload)
require.NoError(t, err)
return nil
},
).Times(1)
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}},
)
require.NoError(t, err)
controller := NewController(zaptest.NewLogger(t).Sugar(), tally.NoopScope, store, staticBuildRunnerFactory{r: br}, registry, consumer.TopicKeyBuild, "orchestrator-build")

msg := entityqueue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
delivery.EXPECT().Message().Return(msg).AnyTimes()
delivery.EXPECT().Attempt().Return(1).AnyTimes()

require.NoError(t, controller.Process(context.Background(), delivery))
assert.Equal(t, existing.ID, published.ID)
}

// TestController_Process_TriggerFailure verifies a build-runner failure is
Expand All @@ -285,7 +338,10 @@ func TestController_Process_TriggerFailure(t *testing.T) {
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
store.EXPECT().GetRequestStore().Return(storagemock.NewMockRequestStore(ctrl)).AnyTimes()
// No build store expectation: Trigger failure must short-circuit before Create.
mockBuildStore := storagemock.NewMockBuildStore(ctrl)
mockBuildStore.EXPECT().GetByBatchID(gomock.Any(), batch.ID).Return(entity.Build{}, storage.ErrNotFound)
store.EXPECT().GetBuildStore().Return(mockBuildStore).AnyTimes()
// No Create expectation: Trigger failure must short-circuit before persistence.

br := buildrunnermock.NewMockBuildRunner(ctrl)
br.EXPECT().Trigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand Down
Loading
Loading