diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index c23a47c5..66414d4b 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -54,6 +54,7 @@ type GatewayServer struct { landController *controller.LandController cancelController *controller.CancelController statusController *controller.StatusController + listController *controller.ListController } // Ping delegates to the controller @@ -76,6 +77,11 @@ func (s *GatewayServer) Status(ctx context.Context, req *pb.StatusRequest) (*pb. return s.statusController.Status(ctx, req) } +// List delegates to the controller +func (s *GatewayServer) List(ctx context.Context, req *pb.ListRequest) (*pb.ListResponse, error) { + return s.listController.List(ctx, req) +} + func main() { code := 0 if err := run(); err != nil { @@ -236,6 +242,7 @@ func run() error { return fmt.Errorf("failed to create storage: %w", err) } requestLogStore := store.GetRequestLogStore() + requestSummaryStore := store.GetRequestSummaryStore() // Load queue configurations from YAML. Path is required so the gateway // can reject requests for unknown queues at the edge. @@ -251,13 +258,15 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) landController := controller.NewLandController(logger.Sugar(), scope, cnt, store, queueConfigs, registry) - cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry) + cancelController := controller.NewCancelController(logger.Sugar(), scope, store, registry) statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore) + listController := controller.NewListController(logger.Sugar(), scope, requestSummaryStore, queueConfigs) gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, cancelController: cancelController, statusController: statusController, + listController: listController, } pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer) diff --git a/submitqueue/core/request/BUILD.bazel b/submitqueue/core/request/BUILD.bazel index c61803b3..83af3226 100644 --- a/submitqueue/core/request/BUILD.bazel +++ b/submitqueue/core/request/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "log.go", "request.go", + "store.go", ], importpath = "github.com/uber/submitqueue/submitqueue/core/request", visibility = ["//visibility:public"], diff --git a/submitqueue/core/request/store.go b/submitqueue/core/request/store.go new file mode 100644 index 00000000..ad6d2c5e --- /dev/null +++ b/submitqueue/core/request/store.go @@ -0,0 +1,34 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package request + +import ( + "context" + "fmt" + + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" +) + +// PersistLog inserts the immutable request log entry and updates the request summary read model. +func PersistLog(ctx context.Context, store storage.Storage, log entity.RequestLog) error { + if err := store.GetRequestLogStore().Insert(ctx, log); err != nil { + return fmt.Errorf("failed to insert request log for request_id=%s: %w", log.RequestID, err) + } + if err := store.GetRequestSummaryStore().UpsertFromLog(ctx, log); err != nil { + return fmt.Errorf("failed to upsert request summary for request_id=%s: %w", log.RequestID, err) + } + return nil +} diff --git a/submitqueue/entity/BUILD.bazel b/submitqueue/entity/BUILD.bazel index 21a57602..f86f5bf0 100644 --- a/submitqueue/entity/BUILD.bazel +++ b/submitqueue/entity/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "queue_config.go", "request.go", "request_log.go", + "request_summary.go", "speculation_tree.go", ], importpath = "github.com/uber/submitqueue/submitqueue/entity", diff --git a/submitqueue/entity/request_log.go b/submitqueue/entity/request_log.go index 6dee8839..88d63d62 100644 --- a/submitqueue/entity/request_log.go +++ b/submitqueue/entity/request_log.go @@ -16,6 +16,7 @@ package entity import ( "encoding/json" + "strings" "time" ) @@ -96,6 +97,10 @@ const ( type RequestLog struct { // RequestID is the ID of the request this log entry belongs to. References entity.Request.ID. RequestID string `json:"request_id"` + // Queue is the queue this request belongs to. New log producers should set it explicitly. + Queue string `json:"queue"` + // ChangeURIs are the original change URIs submitted with the request. They are populated by gateway-originated accepted logs. + ChangeURIs []string `json:"change_uris"` // TimestampMs is the time this log entry was created, in milliseconds since Unix epoch. TimestampMs int64 `json:"timestamp_ms"` // Status is the request status at the time this log entry was created. It may contain requests states from the state machine and also display-friendly intermediate statuses. @@ -117,11 +122,24 @@ type RequestLog struct { // lastError is the last error message associated with the status at the time of this log entry, empty string if no error. // metadata is a set of key-value pairs providing additional context for this log entry. Not constrained to any specific format or schema, used for display or debugging purposes. func NewRequestLog(requestID string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog { + return NewRequestLogWithDetails(requestID, QueueFromRequestID(requestID), nil, status, requestVersion, lastError, metadata) +} + +// NewRequestLogWithDetails creates a new RequestLog with queue and change information for list summaries. +func NewRequestLogWithDetails(requestID, queue string, changeURIs []string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog { if metadata == nil { metadata = make(map[string]string) } + if queue == "" { + queue = QueueFromRequestID(requestID) + } + if changeURIs == nil { + changeURIs = []string{} + } return RequestLog{ RequestID: requestID, + Queue: queue, + ChangeURIs: changeURIs, TimestampMs: time.Now().UnixMilli(), Status: status, RequestVersion: requestVersion, @@ -146,5 +164,53 @@ func RequestLogFromBytes(data []byte) (RequestLog, error) { if log.Metadata == nil { log.Metadata = make(map[string]string) } + if log.ChangeURIs == nil { + log.ChangeURIs = []string{} + } + if log.Queue == "" { + log.Queue = QueueFromRequestID(log.RequestID) + } return log, nil } + +// QueueFromRequestID extracts the queue from the current "/" request ID format. +// It strips only a trailing numeric path segment so queue names may contain slashes. +func QueueFromRequestID(requestID string) string { + idx := strings.LastIndex(requestID, "/") + if idx <= 0 || idx == len(requestID)-1 { + return "" + } + for _, r := range requestID[idx+1:] { + if r < '0' || r > '9' { + return "" + } + } + return requestID[:idx] +} + +// IsKnownRequestStatus returns true if status is a public request status emitted by SubmitQueue. +func IsKnownRequestStatus(status RequestStatus) bool { + switch status { + case RequestStatusAccepted, + RequestStatusStarted, + RequestStatusValidating, + RequestStatusValidated, + RequestStatusBatching, + RequestStatusBatched, + RequestStatusScored, + RequestStatusSpeculating, + RequestStatusSpeculated, + RequestStatusBuilding, + RequestStatusBuilt, + RequestStatusWaitingPath, + RequestStatusLanding, + RequestStatusProcessing, + RequestStatusLanded, + RequestStatusError, + RequestStatusCancelling, + RequestStatusCancelled: + return true + default: + return false + } +} diff --git a/submitqueue/entity/request_log_test.go b/submitqueue/entity/request_log_test.go index 602bf7e2..3ab04ca7 100644 --- a/submitqueue/entity/request_log_test.go +++ b/submitqueue/entity/request_log_test.go @@ -51,6 +51,8 @@ func TestRequestLog_ToBytes(t *testing.T) { func TestRequestLogFromBytes(t *testing.T) { original := RequestLog{ RequestID: "my-queue/999", + Queue: "my-queue", + ChangeURIs: []string{"github://uber/repo/pull/1/abcdef"}, TimestampMs: 1709568000000, Status: RequestStatusProcessing, RequestVersion: 3, @@ -65,6 +67,8 @@ func TestRequestLogFromBytes(t *testing.T) { require.NoError(t, err) assert.Equal(t, original.RequestID, deserialized.RequestID) + assert.Equal(t, original.Queue, deserialized.Queue) + assert.Equal(t, original.ChangeURIs, deserialized.ChangeURIs) assert.Equal(t, original.TimestampMs, deserialized.TimestampMs) assert.Equal(t, original.Status, deserialized.Status) assert.Equal(t, original.RequestVersion, deserialized.RequestVersion) @@ -103,6 +107,8 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { name: "with all fields populated", log: RequestLog{ RequestID: "queue1/100", + Queue: "queue1", + ChangeURIs: []string{}, TimestampMs: 1709568000000, Status: RequestStatusLanded, RequestVersion: 5, @@ -114,6 +120,8 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { name: "with error", log: RequestLog{ RequestID: "queue2/200", + Queue: "queue2", + ChangeURIs: []string{}, TimestampMs: 1709568001000, Status: RequestStatusError, RequestVersion: 2, @@ -125,6 +133,8 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { name: "with zero version", log: RequestLog{ RequestID: "queue3/300", + Queue: "queue3", + ChangeURIs: []string{}, TimestampMs: 1709568002000, Status: RequestStatusStarted, RequestVersion: 0, @@ -146,3 +156,10 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { }) } } + +func TestQueueFromRequestID(t *testing.T) { + assert.Equal(t, "queue", QueueFromRequestID("queue/100")) + assert.Equal(t, "org/queue", QueueFromRequestID("org/queue/100")) + assert.Empty(t, QueueFromRequestID("queue/not-a-number")) + assert.Empty(t, QueueFromRequestID("queue")) +} diff --git a/submitqueue/entity/request_summary.go b/submitqueue/entity/request_summary.go new file mode 100644 index 00000000..71e282e7 --- /dev/null +++ b/submitqueue/entity/request_summary.go @@ -0,0 +1,29 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// RequestSummary is the gateway-owned current view used for queue/time-window listing. +type RequestSummary struct { + RequestID string + Queue string + ChangeURIs []string + Status RequestStatus + LastError string + Metadata map[string]string + StartedAtMs int64 + UpdatedAtMs int64 + CompletedAtMs int64 + Terminal bool +} diff --git a/submitqueue/extension/storage/BUILD.bazel b/submitqueue/extension/storage/BUILD.bazel index 867eef0f..57d775c5 100644 --- a/submitqueue/extension/storage/BUILD.bazel +++ b/submitqueue/extension/storage/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "change_store.go", "request_log_store.go", "request_store.go", + "request_summary_store.go", "speculation_tree_store.go", "storage.go", ], diff --git a/submitqueue/extension/storage/mock/BUILD.bazel b/submitqueue/extension/storage/mock/BUILD.bazel index 55c5d808..d4b246cc 100644 --- a/submitqueue/extension/storage/mock/BUILD.bazel +++ b/submitqueue/extension/storage/mock/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "change_store_mock.go", "request_log_store_mock.go", "request_store_mock.go", + "request_summary_store_mock.go", "speculation_tree_store_mock.go", "storage_mock.go", ], diff --git a/submitqueue/extension/storage/mock/request_summary_store_mock.go b/submitqueue/extension/storage/mock/request_summary_store_mock.go new file mode 100644 index 00000000..eaa60a59 --- /dev/null +++ b/submitqueue/extension/storage/mock/request_summary_store_mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: request_summary_store.go +// +// Generated by this command: +// +// mockgen -source=request_summary_store.go -destination=mock/request_summary_store_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/submitqueue/entity" + storage "github.com/uber/submitqueue/submitqueue/extension/storage" + gomock "go.uber.org/mock/gomock" +) + +// MockRequestSummaryStore is a mock of RequestSummaryStore interface. +type MockRequestSummaryStore struct { + ctrl *gomock.Controller + recorder *MockRequestSummaryStoreMockRecorder + isgomock struct{} +} + +// MockRequestSummaryStoreMockRecorder is the mock recorder for MockRequestSummaryStore. +type MockRequestSummaryStoreMockRecorder struct { + mock *MockRequestSummaryStore +} + +// NewMockRequestSummaryStore creates a new mock instance. +func NewMockRequestSummaryStore(ctrl *gomock.Controller) *MockRequestSummaryStore { + mock := &MockRequestSummaryStore{ctrl: ctrl} + mock.recorder = &MockRequestSummaryStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRequestSummaryStore) EXPECT() *MockRequestSummaryStoreMockRecorder { + return m.recorder +} + +// List mocks base method. +func (m *MockRequestSummaryStore) List(ctx context.Context, opts storage.RequestSummaryListOptions) (storage.RequestSummaryListResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List", ctx, opts) + ret0, _ := ret[0].(storage.RequestSummaryListResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// List indicates an expected call of List. +func (mr *MockRequestSummaryStoreMockRecorder) List(ctx, opts any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockRequestSummaryStore)(nil).List), ctx, opts) +} + +// UpsertFromLog mocks base method. +func (m *MockRequestSummaryStore) UpsertFromLog(ctx context.Context, log entity.RequestLog) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpsertFromLog", ctx, log) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpsertFromLog indicates an expected call of UpsertFromLog. +func (mr *MockRequestSummaryStoreMockRecorder) UpsertFromLog(ctx, log any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertFromLog", reflect.TypeOf((*MockRequestSummaryStore)(nil).UpsertFromLog), ctx, log) +} diff --git a/submitqueue/extension/storage/mock/storage_mock.go b/submitqueue/extension/storage/mock/storage_mock.go index 4133bc2a..7b87157d 100644 --- a/submitqueue/extension/storage/mock/storage_mock.go +++ b/submitqueue/extension/storage/mock/storage_mock.go @@ -138,6 +138,20 @@ func (mr *MockStorageMockRecorder) GetRequestStore() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRequestStore", reflect.TypeOf((*MockStorage)(nil).GetRequestStore)) } +// GetRequestSummaryStore mocks base method. +func (m *MockStorage) GetRequestSummaryStore() storage.RequestSummaryStore { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRequestSummaryStore") + ret0, _ := ret[0].(storage.RequestSummaryStore) + return ret0 +} + +// GetRequestSummaryStore indicates an expected call of GetRequestSummaryStore. +func (mr *MockStorageMockRecorder) GetRequestSummaryStore() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRequestSummaryStore", reflect.TypeOf((*MockStorage)(nil).GetRequestSummaryStore)) +} + // GetSpeculationTreeStore mocks base method. func (m *MockStorage) GetSpeculationTreeStore() storage.SpeculationTreeStore { m.ctrl.T.Helper() diff --git a/submitqueue/extension/storage/mysql/BUILD.bazel b/submitqueue/extension/storage/mysql/BUILD.bazel index 671f83a6..603f86d3 100644 --- a/submitqueue/extension/storage/mysql/BUILD.bazel +++ b/submitqueue/extension/storage/mysql/BUILD.bazel @@ -1,4 +1,4 @@ -load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "mysql", @@ -9,6 +9,7 @@ go_library( "change_store.go", "request_log_store.go", "request_store.go", + "request_summary_store.go", "speculation_tree_store.go", "storage.go", ], @@ -22,3 +23,13 @@ go_library( "@com_github_uber_go_tally_v4//:tally", ], ) + +go_test( + name = "mysql_test", + srcs = ["request_summary_store_test.go"], + embed = [":mysql"], + deps = [ + "//submitqueue/entity", + "@com_github_stretchr_testify//assert", + ], +) diff --git a/submitqueue/extension/storage/mysql/request_log_store.go b/submitqueue/extension/storage/mysql/request_log_store.go index 42c98190..66607a9b 100644 --- a/submitqueue/extension/storage/mysql/request_log_store.go +++ b/submitqueue/extension/storage/mysql/request_log_store.go @@ -46,6 +46,18 @@ func (r *requestLogStore) Insert(ctx context.Context, log entity.RequestLog) (re op := metrics.Begin(r.scope, "insert") defer func() { op.Complete(retErr) }() + if log.Queue == "" { + log.Queue = entity.QueueFromRequestID(log.RequestID) + } + if log.ChangeURIs == nil { + log.ChangeURIs = []string{} + } + + changeURIsJSON, err := json.Marshal(log.ChangeURIs) + if err != nil { + return fmt.Errorf("failed to marshal change URIs for request log request_id=%s: %w", log.RequestID, err) + } + metadataJSON, err := json.Marshal(log.Metadata) if err != nil { return fmt.Errorf("failed to marshal metadata for request log request_id=%s: %w", log.RequestID, err) @@ -57,8 +69,8 @@ func (r *requestLogStore) Insert(ctx context.Context, log entity.RequestLog) (re salt := rand.Int64() _, err = r.db.ExecContext(ctx, - "INSERT INTO request_log (request_id, timestamp_ms, salt, status, request_version, last_error, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)", - log.RequestID, log.TimestampMs, salt, log.Status, log.RequestVersion, log.LastError, metadataJSON, + "INSERT INTO request_log (request_id, queue, change_uri, timestamp_ms, salt, status, request_version, last_error, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + log.RequestID, log.Queue, changeURIsJSON, log.TimestampMs, salt, log.Status, log.RequestVersion, log.LastError, metadataJSON, ) if err != nil { return fmt.Errorf("failed to insert request log for request_id=%s timestamp_ms=%d: %w", log.RequestID, log.TimestampMs, err) @@ -75,7 +87,7 @@ func (r *requestLogStore) List(ctx context.Context, requestID string) (ret []ent defer func() { op.Complete(retErr) }() rows, err := r.db.QueryContext(ctx, - "SELECT request_id, timestamp_ms, status, request_version, last_error, metadata FROM request_log WHERE request_id = ? ORDER BY timestamp_ms ASC, salt ASC", + "SELECT request_id, queue, change_uri, timestamp_ms, status, request_version, last_error, metadata FROM request_log WHERE request_id = ? ORDER BY timestamp_ms ASC, salt ASC", requestID, ) if err != nil { @@ -86,13 +98,18 @@ func (r *requestLogStore) List(ctx context.Context, requestID string) (ret []ent var logs []entity.RequestLog for rows.Next() { var log entity.RequestLog + var changeURIsJSON []byte var metadataJSON []byte - err := rows.Scan(&log.RequestID, &log.TimestampMs, &log.Status, &log.RequestVersion, &log.LastError, &metadataJSON) + err := rows.Scan(&log.RequestID, &log.Queue, &changeURIsJSON, &log.TimestampMs, &log.Status, &log.RequestVersion, &log.LastError, &metadataJSON) if err != nil { return nil, fmt.Errorf("failed to scan request log row for request_id=%s: %w", requestID, err) } + if err := json.Unmarshal(changeURIsJSON, &log.ChangeURIs); err != nil { + return nil, fmt.Errorf("failed to unmarshal change URIs for request log request_id=%s: %w", requestID, err) + } + if err := json.Unmarshal(metadataJSON, &log.Metadata); err != nil { return nil, fmt.Errorf("failed to unmarshal metadata for request log request_id=%s: %w", requestID, err) } diff --git a/submitqueue/extension/storage/mysql/request_summary_store.go b/submitqueue/extension/storage/mysql/request_summary_store.go new file mode 100644 index 00000000..41803d6d --- /dev/null +++ b/submitqueue/extension/storage/mysql/request_summary_store.go @@ -0,0 +1,396 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" +) + +const activeCompletedAtMs = int64(1<<63 - 1) + +// maxSummaryUpsertAttempts bounds the optimistic-concurrency retry loop in UpsertFromLog. Writes +// to a single request's summary are almost always serialized through the pipeline, so contention +// is rare and a small bound is sufficient; exceeding it surfaces as storage.ErrVersionMismatch. +const maxSummaryUpsertAttempts = 8 + +// initialSummaryVersion is the optimistic-lock version a freshly inserted summary row starts at. +const initialSummaryVersion = int64(1) + +type requestSummaryStore struct { + db *sql.DB + scope tally.Scope +} + +type requestSummaryRow struct { + summary entity.RequestSummary + requestVersion int32 + statusTimestampMs int64 + winnerTerminalVersion bool + dbCompletedAtMs int64 + // version is the optimistic-lock version of the summary row itself, distinct from + // requestVersion (the reconciliation version sourced from the Request entity). It is an + // internal read-model concern, populated on read and never exposed through the + // RequestSummaryStore interface; insert owns the initial value. + version int64 +} + +// NewRequestSummaryStore creates a new MySQL-backed RequestSummaryStore. +func NewRequestSummaryStore(db *sql.DB, scope tally.Scope) storage.RequestSummaryStore { + return &requestSummaryStore{db: db, scope: scope} +} + +// UpsertFromLog incrementally merges one request-log event into the summary read model. +// +// This repo forbids database transactions, so the merge uses optimistic concurrency instead of a +// SELECT ... FOR UPDATE: read the current summary without a lock, merge the incoming log in memory, +// then write back with a conditional update guarded by the row version (the same pattern as +// requestStore.UpdateState). A concurrent writer — another gateway write path or a redelivered log +// event — invalidates the read, in which case we re-read and re-merge. Merges are monotonic (the +// winner only advances by request version or timestamp), so the loop converges; re-applying a log +// that has already been merged is a no-op. +func (s *requestSummaryStore) UpsertFromLog(ctx context.Context, log entity.RequestLog) (retErr error) { + op := metrics.Begin(s.scope, "upsert_from_log") + defer func() { op.Complete(retErr) }() + + if log.Queue == "" { + log.Queue = entity.QueueFromRequestID(log.RequestID) + } + + for attempt := 0; attempt < maxSummaryUpsertAttempts; attempt++ { + existing, found, err := s.get(ctx, log.RequestID) + if err != nil { + return err + } + + if !found { + if log.Queue == "" { + return fmt.Errorf("request summary upsert requires queue for request_id=%s", log.RequestID) + } + inserted, err := s.insert(ctx, rowFromLog(log)) + if err != nil { + return err + } + if inserted { + return nil + } + // Lost the insert race; another writer created the row first. Retry into the merge path. + continue + } + + // Version arithmetic is owned here, not in the conditional write: compute the next version + // and only assign it on a successful write (newVersion = oldVersion + 1). + next := mergeSummary(existing, log) + updated, err := s.update(ctx, existing.version, existing.version+1, next) + if err != nil { + return err + } + if updated { + return nil + } + // The row version moved under us; re-read and merge against the new winner. + } + + return fmt.Errorf("request summary upsert for request_id=%s did not converge after %d attempts: %w", log.RequestID, maxSummaryUpsertAttempts, storage.ErrVersionMismatch) +} + +// List returns a page of request summaries matching the queue, time window, and optional statuses. +func (s *requestSummaryStore) List(ctx context.Context, opts storage.RequestSummaryListOptions) (ret storage.RequestSummaryListResult, retErr error) { + op := metrics.Begin(s.scope, "list") + defer func() { op.Complete(retErr) }() + + if opts.Limit <= 0 { + return storage.RequestSummaryListResult{}, fmt.Errorf("request summary list requires a positive limit") + } + + args := []any{opts.Queue, opts.EndTimeMs, opts.StartTimeMs} + clauses := []string{ + "queue = ?", + "started_at_ms < ?", + "completed_at_ms >= ?", + } + + if len(opts.Statuses) > 0 { + placeholders := make([]string, len(opts.Statuses)) + for i, status := range opts.Statuses { + placeholders[i] = "?" + args = append(args, status) + } + clauses = append(clauses, "status IN ("+strings.Join(placeholders, ", ")+")") + } + + if opts.Cursor != nil { + clauses = append(clauses, "(started_at_ms < ? OR (started_at_ms = ? AND request_id < ?))") + args = append(args, opts.Cursor.StartedAtMs, opts.Cursor.StartedAtMs, opts.Cursor.RequestID) + } + + args = append(args, opts.Limit+1) + query := "SELECT request_id, queue, change_uri, status, request_version, status_timestamp_ms, winner_terminal_version, last_error, metadata, started_at_ms, updated_at_ms, completed_at_ms, terminal, version FROM request_summary WHERE " + + strings.Join(clauses, " AND ") + + " ORDER BY started_at_ms DESC, request_id DESC LIMIT ?" + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return storage.RequestSummaryListResult{}, fmt.Errorf("failed to list request summaries for queue=%s: %w", opts.Queue, err) + } + defer rows.Close() + + var summaries []entity.RequestSummary + for rows.Next() { + row, err := scanRequestSummary(rows) + if err != nil { + return storage.RequestSummaryListResult{}, err + } + summaries = append(summaries, row.summary) + } + if err := rows.Err(); err != nil { + return storage.RequestSummaryListResult{}, fmt.Errorf("failed to iterate request summaries for queue=%s: %w", opts.Queue, err) + } + + var nextCursor *storage.RequestSummaryCursor + if len(summaries) > opts.Limit { + summaries = summaries[:opts.Limit] + last := summaries[len(summaries)-1] + nextCursor = &storage.RequestSummaryCursor{StartedAtMs: last.StartedAtMs, RequestID: last.RequestID} + } + + return storage.RequestSummaryListResult{Requests: summaries, NextCursor: nextCursor}, nil +} + +// get reads the current summary row without locking. Returns found=false when no row exists. +func (s *requestSummaryStore) get(ctx context.Context, requestID string) (requestSummaryRow, bool, error) { + row := s.db.QueryRowContext(ctx, + "SELECT request_id, queue, change_uri, status, request_version, status_timestamp_ms, winner_terminal_version, last_error, metadata, started_at_ms, updated_at_ms, completed_at_ms, terminal, version FROM request_summary WHERE request_id = ?", + requestID, + ) + summary, err := scanRequestSummary(row) + if errors.Is(err, sql.ErrNoRows) { + return requestSummaryRow{}, false, nil + } + if err != nil { + return requestSummaryRow{}, false, fmt.Errorf("failed to get request summary for request_id=%s: %w", requestID, err) + } + return summary, true, nil +} + +// insert creates a fresh summary row at version 1. Returns inserted=false (no error) when a +// concurrent writer already created the row (duplicate primary key), so the caller can re-read +// and merge instead. +func (s *requestSummaryStore) insert(ctx context.Context, row requestSummaryRow) (bool, error) { + changeURIsJSON, metadataJSON, err := marshalSummaryJSON(row.summary) + if err != nil { + return false, err + } + _, err = s.db.ExecContext(ctx, + "INSERT INTO request_summary (request_id, queue, change_uri, status, request_version, status_timestamp_ms, winner_terminal_version, last_error, metadata, started_at_ms, updated_at_ms, completed_at_ms, terminal, version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + row.summary.RequestID, row.summary.Queue, changeURIsJSON, row.summary.Status, row.requestVersion, row.statusTimestampMs, row.winnerTerminalVersion, row.summary.LastError, metadataJSON, row.summary.StartedAtMs, row.summary.UpdatedAtMs, row.dbCompletedAtMs, row.summary.Terminal, initialSummaryVersion, + ) + if err != nil { + var mysqlErr *mysql.MySQLError + // MySQL error 1062 is "Duplicate entry": another writer inserted this request_id first. + if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { + return false, nil + } + return false, fmt.Errorf("failed to insert request summary request_id=%s: %w", row.summary.RequestID, err) + } + return true, nil +} + +// update is a pure conditional write guarded by the row version: it writes newVersion only if the +// persisted version still matches oldVersion. Returns updated=false (no error) on a version +// mismatch so the caller can re-read and retry. Version arithmetic is owned by the caller. +func (s *requestSummaryStore) update(ctx context.Context, oldVersion, newVersion int64, row requestSummaryRow) (bool, error) { + changeURIsJSON, metadataJSON, err := marshalSummaryJSON(row.summary) + if err != nil { + return false, err + } + result, err := s.db.ExecContext(ctx, + "UPDATE request_summary SET queue = ?, change_uri = ?, status = ?, request_version = ?, status_timestamp_ms = ?, winner_terminal_version = ?, last_error = ?, metadata = ?, started_at_ms = ?, updated_at_ms = ?, completed_at_ms = ?, terminal = ?, version = ? WHERE request_id = ? AND version = ?", + row.summary.Queue, changeURIsJSON, row.summary.Status, row.requestVersion, row.statusTimestampMs, row.winnerTerminalVersion, row.summary.LastError, metadataJSON, row.summary.StartedAtMs, row.summary.UpdatedAtMs, row.dbCompletedAtMs, row.summary.Terminal, newVersion, row.summary.RequestID, oldVersion, + ) + if err != nil { + return false, fmt.Errorf("failed to update request summary request_id=%s: %w", row.summary.RequestID, err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return false, fmt.Errorf("failed to read rows affected for request summary request_id=%s: %w", row.summary.RequestID, err) + } + return rowsAffected == 1, nil +} + +type summaryScanner interface { + Scan(dest ...any) error +} + +func scanRequestSummary(scanner summaryScanner) (requestSummaryRow, error) { + var row requestSummaryRow + var changeURIsJSON []byte + var metadataJSON []byte + err := scanner.Scan( + &row.summary.RequestID, + &row.summary.Queue, + &changeURIsJSON, + &row.summary.Status, + &row.requestVersion, + &row.statusTimestampMs, + &row.winnerTerminalVersion, + &row.summary.LastError, + &metadataJSON, + &row.summary.StartedAtMs, + &row.summary.UpdatedAtMs, + &row.dbCompletedAtMs, + &row.summary.Terminal, + &row.version, + ) + if err != nil { + return requestSummaryRow{}, err + } + if err := json.Unmarshal(changeURIsJSON, &row.summary.ChangeURIs); err != nil { + return requestSummaryRow{}, fmt.Errorf("failed to unmarshal request summary change URIs for request_id=%s: %w", row.summary.RequestID, err) + } + if err := json.Unmarshal(metadataJSON, &row.summary.Metadata); err != nil { + return requestSummaryRow{}, fmt.Errorf("failed to unmarshal request summary metadata for request_id=%s: %w", row.summary.RequestID, err) + } + if row.summary.ChangeURIs == nil { + row.summary.ChangeURIs = []string{} + } + if row.summary.Metadata == nil { + row.summary.Metadata = map[string]string{} + } + if row.dbCompletedAtMs == activeCompletedAtMs { + row.summary.CompletedAtMs = 0 + } else { + row.summary.CompletedAtMs = row.dbCompletedAtMs + } + return row, nil +} + +func rowFromLog(log entity.RequestLog) requestSummaryRow { + terminal := entity.IsRequestStateTerminal(entity.RequestState(string(log.Status))) + completedAtMs := activeCompletedAtMs + if terminal { + completedAtMs = log.TimestampMs + } + return requestSummaryRow{ + summary: entity.RequestSummary{ + RequestID: log.RequestID, + Queue: log.Queue, + ChangeURIs: append([]string(nil), log.ChangeURIs...), + Status: log.Status, + LastError: log.LastError, + Metadata: cloneMetadata(log.Metadata), + StartedAtMs: log.TimestampMs, + UpdatedAtMs: log.TimestampMs, + CompletedAtMs: completedAtMsForEntity(completedAtMs), + Terminal: terminal, + }, + requestVersion: log.RequestVersion, + statusTimestampMs: log.TimestampMs, + winnerTerminalVersion: isTerminalVersion(log), + dbCompletedAtMs: completedAtMs, + } +} + +func mergeSummary(existing requestSummaryRow, log entity.RequestLog) requestSummaryRow { + next := existing + if log.Queue != "" { + next.summary.Queue = log.Queue + } + if len(next.summary.ChangeURIs) == 0 && len(log.ChangeURIs) > 0 { + next.summary.ChangeURIs = append([]string(nil), log.ChangeURIs...) + } + if log.TimestampMs > 0 && (next.summary.StartedAtMs == 0 || log.TimestampMs < next.summary.StartedAtMs) { + next.summary.StartedAtMs = log.TimestampMs + } + + if shouldReplaceWinner(existing, log) { + incoming := rowFromLog(log) + next.summary.Status = incoming.summary.Status + next.summary.LastError = incoming.summary.LastError + next.summary.Metadata = incoming.summary.Metadata + next.summary.UpdatedAtMs = incoming.summary.UpdatedAtMs + next.summary.CompletedAtMs = incoming.summary.CompletedAtMs + next.summary.Terminal = incoming.summary.Terminal + next.requestVersion = incoming.requestVersion + next.statusTimestampMs = incoming.statusTimestampMs + next.winnerTerminalVersion = incoming.winnerTerminalVersion + next.dbCompletedAtMs = incoming.dbCompletedAtMs + } + return next +} + +func shouldReplaceWinner(existing requestSummaryRow, log entity.RequestLog) bool { + incomingTerminalVersion := isTerminalVersion(log) + if incomingTerminalVersion { + if !existing.winnerTerminalVersion { + return true + } + return log.RequestVersion > existing.requestVersion || + (log.RequestVersion == existing.requestVersion && log.TimestampMs > existing.statusTimestampMs) + } + if existing.winnerTerminalVersion { + return false + } + return log.TimestampMs > existing.statusTimestampMs +} + +func isTerminalVersion(log entity.RequestLog) bool { + return log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(string(log.Status))) +} + +func completedAtMsForEntity(dbValue int64) int64 { + if dbValue == activeCompletedAtMs { + return 0 + } + return dbValue +} + +func marshalSummaryJSON(summary entity.RequestSummary) ([]byte, []byte, error) { + changeURIs := summary.ChangeURIs + if changeURIs == nil { + changeURIs = []string{} + } + changeURIsJSON, err := json.Marshal(changeURIs) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal request summary change URIs for request_id=%s: %w", summary.RequestID, err) + } + metadataJSON, err := json.Marshal(summary.Metadata) + if err != nil { + return nil, nil, fmt.Errorf("failed to marshal request summary metadata for request_id=%s: %w", summary.RequestID, err) + } + return changeURIsJSON, metadataJSON, nil +} + +func cloneMetadata(metadata map[string]string) map[string]string { + if metadata == nil { + return map[string]string{} + } + clone := make(map[string]string, len(metadata)) + for k, v := range metadata { + clone[k] = v + } + return clone +} diff --git a/submitqueue/extension/storage/mysql/request_summary_store_test.go b/submitqueue/extension/storage/mysql/request_summary_store_test.go new file mode 100644 index 00000000..9a006a56 --- /dev/null +++ b/submitqueue/extension/storage/mysql/request_summary_store_test.go @@ -0,0 +1,85 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/uber/submitqueue/submitqueue/entity" +) + +func TestMergeSummary_GuardsWinner(t *testing.T) { + existing := rowFromLog(entity.RequestLog{ + RequestID: "q/1", + Queue: "q", + ChangeURIs: []string{"change"}, + TimestampMs: 200, + Status: entity.RequestStatusBuilding, + Metadata: map[string]string{}, + }) + + older := mergeSummary(existing, entity.RequestLog{ + RequestID: "q/1", + Queue: "q", + TimestampMs: 100, + Status: entity.RequestStatusAccepted, + Metadata: map[string]string{"ignored": "true"}, + }) + assert.Equal(t, entity.RequestStatusBuilding, older.summary.Status) + assert.Equal(t, int64(100), older.summary.StartedAtMs) + assert.Equal(t, int64(200), older.summary.UpdatedAtMs) + + terminal := mergeSummary(older, entity.RequestLog{ + RequestID: "q/1", + Queue: "q", + TimestampMs: 150, + Status: entity.RequestStatusLanded, + RequestVersion: 1, + LastError: "", + Metadata: map[string]string{"terminal": "true"}, + }) + assert.Equal(t, entity.RequestStatusLanded, terminal.summary.Status) + assert.Equal(t, int32(1), terminal.requestVersion) + assert.True(t, terminal.winnerTerminalVersion) + assert.True(t, terminal.summary.Terminal) + assert.Equal(t, int64(150), terminal.summary.CompletedAtMs) + + laterNonTerminal := mergeSummary(terminal, entity.RequestLog{ + RequestID: "q/1", + Queue: "q", + TimestampMs: 300, + Status: entity.RequestStatusBuilding, + Metadata: map[string]string{"ignored": "true"}, + }) + assert.Equal(t, entity.RequestStatusLanded, laterNonTerminal.summary.Status) + assert.Equal(t, int64(150), laterNonTerminal.summary.UpdatedAtMs) + + higherTerminal := mergeSummary(laterNonTerminal, entity.RequestLog{ + RequestID: "q/1", + Queue: "q", + TimestampMs: 250, + Status: entity.RequestStatusError, + RequestVersion: 2, + LastError: "boom", + Metadata: map[string]string{"winner": "true"}, + }) + assert.Equal(t, entity.RequestStatusError, higherTerminal.summary.Status) + assert.Equal(t, int32(2), higherTerminal.requestVersion) + assert.Equal(t, int64(250), higherTerminal.summary.UpdatedAtMs) + assert.Equal(t, "boom", higherTerminal.summary.LastError) + assert.Equal(t, map[string]string{"winner": "true"}, higherTerminal.summary.Metadata) + assert.Equal(t, []string{"change"}, higherTerminal.summary.ChangeURIs) +} diff --git a/submitqueue/extension/storage/mysql/schema/request_log.sql b/submitqueue/extension/storage/mysql/schema/request_log.sql index 8866eeac..2f6057b0 100644 --- a/submitqueue/extension/storage/mysql/schema/request_log.sql +++ b/submitqueue/extension/storage/mysql/schema/request_log.sql @@ -1,5 +1,7 @@ CREATE TABLE IF NOT EXISTS request_log ( request_id VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + change_uri JSON NOT NULL, timestamp_ms BIGINT NOT NULL, salt BIGINT NOT NULL, status VARCHAR(64) NOT NULL, diff --git a/submitqueue/extension/storage/mysql/schema/request_summary.sql b/submitqueue/extension/storage/mysql/schema/request_summary.sql new file mode 100644 index 00000000..39ee5093 --- /dev/null +++ b/submitqueue/extension/storage/mysql/schema/request_summary.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS request_summary ( + request_id VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + change_uri JSON NOT NULL, + status VARCHAR(64) NOT NULL, + request_version INT NOT NULL, + status_timestamp_ms BIGINT NOT NULL, + winner_terminal_version BOOLEAN NOT NULL, + last_error TEXT NOT NULL, + metadata JSON NOT NULL, + started_at_ms BIGINT NOT NULL, + updated_at_ms BIGINT NOT NULL, + completed_at_ms BIGINT NOT NULL, + terminal BOOLEAN NOT NULL, + version BIGINT NOT NULL, + PRIMARY KEY (request_id), + KEY idx_request_summary_queue_started (queue, started_at_ms DESC, request_id DESC), + KEY idx_request_summary_queue_completed (queue, completed_at_ms) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/extension/storage/mysql/storage.go b/submitqueue/extension/storage/mysql/storage.go index 661fac9c..13363142 100644 --- a/submitqueue/extension/storage/mysql/storage.go +++ b/submitqueue/extension/storage/mysql/storage.go @@ -32,6 +32,7 @@ type mysqlStorage struct { buildStore storage.BuildStore speculationTreeStore storage.SpeculationTreeStore requestLogStore storage.RequestLogStore + requestSummaryStore storage.RequestSummaryStore } // NewStorage creates a new MySQL storage. @@ -45,6 +46,7 @@ func NewStorage(db *sql.DB, scope tally.Scope) (storage.Storage, error) { buildStore: NewBuildStore(db, scope.SubScope("build_store")), speculationTreeStore: NewSpeculationTreeStore(db, scope.SubScope("speculation_tree_store")), requestLogStore: NewRequestLogStore(db, scope.SubScope("request_log_store")), + requestSummaryStore: NewRequestSummaryStore(db, scope.SubScope("request_summary_store")), }, nil } @@ -83,6 +85,11 @@ func (f *mysqlStorage) GetRequestLogStore() storage.RequestLogStore { return f.requestLogStore } +// GetRequestSummaryStore returns the MySQL-backed RequestSummaryStore. +func (f *mysqlStorage) GetRequestSummaryStore() storage.RequestSummaryStore { + return f.requestSummaryStore +} + // Close closes the underlying database connection. func (f *mysqlStorage) Close() error { return f.db.Close() diff --git a/submitqueue/extension/storage/request_summary_store.go b/submitqueue/extension/storage/request_summary_store.go new file mode 100644 index 00000000..6c231129 --- /dev/null +++ b/submitqueue/extension/storage/request_summary_store.go @@ -0,0 +1,54 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=request_summary_store.go -destination=mock/request_summary_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/submitqueue/entity" +) + +// RequestSummaryStore maintains the gateway-owned read model for queue/time-window listing. +type RequestSummaryStore interface { + // UpsertFromLog incrementally merges one request-log event into the summary read model. + UpsertFromLog(ctx context.Context, log entity.RequestLog) error + + // List returns a page of request summaries matching the queue, time window, and optional statuses. + List(ctx context.Context, opts RequestSummaryListOptions) (RequestSummaryListResult, error) +} + +// RequestSummaryListOptions defines a page-in request for RequestSummaryStore. +type RequestSummaryListOptions struct { + Queue string + StartTimeMs int64 + EndTimeMs int64 + Statuses []entity.RequestStatus + Cursor *RequestSummaryCursor + Limit int +} + +// RequestSummaryCursor is the stable cursor position under newest-started-first ordering. +type RequestSummaryCursor struct { + StartedAtMs int64 + RequestID string +} + +// RequestSummaryListResult is one page of request summaries. +type RequestSummaryListResult struct { + Requests []entity.RequestSummary + NextCursor *RequestSummaryCursor +} diff --git a/submitqueue/extension/storage/storage.go b/submitqueue/extension/storage/storage.go index a02bef73..699c97b4 100644 --- a/submitqueue/extension/storage/storage.go +++ b/submitqueue/extension/storage/storage.go @@ -65,6 +65,9 @@ type Storage interface { // GetRequestLogStore returns the RequestLogStore instance. GetRequestLogStore() RequestLogStore + // GetRequestSummaryStore returns the RequestSummaryStore instance. + GetRequestSummaryStore() RequestSummaryStore + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } diff --git a/submitqueue/gateway/controller/BUILD.bazel b/submitqueue/gateway/controller/BUILD.bazel index 57d06701..3dc1e4ad 100644 --- a/submitqueue/gateway/controller/BUILD.bazel +++ b/submitqueue/gateway/controller/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "cancel.go", "land.go", + "list.go", "ping.go", "status.go", ], @@ -31,6 +32,7 @@ go_test( srcs = [ "cancel_test.go", "land_test.go", + "list_test.go", "ping_test.go", "status_test.go", ], diff --git a/submitqueue/gateway/controller/cancel.go b/submitqueue/gateway/controller/cancel.go index cc4e136d..5f8e05b9 100644 --- a/submitqueue/gateway/controller/cancel.go +++ b/submitqueue/gateway/controller/cancel.go @@ -23,6 +23,7 @@ import ( "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" @@ -35,21 +36,21 @@ import ( // and returns a response. The orchestrator-side cancel controller performs the actual // state transitions and emits the terminal RequestStatusCancelled log entry. type CancelController struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - requestLogStore storage.RequestLogStore - registry consumer.TopicRegistry + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + registry consumer.TopicRegistry } // NewCancelController creates a new instance of the gateway cancel controller. // The controller writes a RequestStatusCancelling log entry through requestLogStore and // publishes cancel requests to the topic registered under consumer.TopicKeyCancel. -func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *CancelController { +func NewCancelController(logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, registry consumer.TopicRegistry) *CancelController { return &CancelController{ - logger: logger, - metricsScope: scope, - requestLogStore: requestLogStore, - registry: registry, + logger: logger, + metricsScope: scope, + store: store, + registry: registry, } } @@ -89,7 +90,7 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (* // controller writes its "accepted" log entry synchronously to the same store, so // a NotFound here reliably means "this sqid was never accepted by the gateway" // rather than "in flight" — there is no false-negative race window. - if _, err := c.requestLogStore.List(ctx, cancelRequest.ID); err != nil { + if _, err := c.store.GetRequestLogStore().List(ctx, cancelRequest.ID); err != nil { if storage.IsNotFound(err) { c.metricsScope.Counter("cancel_request_not_found").Inc(1) return nil, errs.NewUserError(&RequestNotFoundError{Sqid: cancelRequest.ID}) @@ -105,8 +106,8 @@ func (c *CancelController) Cancel(ctx context.Context, req *pb.CancelRequest) (* metadata["reason"] = cancelRequest.Reason } logEntry := entity.NewRequestLog(cancelRequest.ID, entity.RequestStatusCancelling, 0, "", metadata) - if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { - return nil, fmt.Errorf("CancelController failed to insert cancelling log for sqid=%s: %w", cancelRequest.ID, err) + if err := request.PersistLog(ctx, c.store, logEntry); err != nil { + return nil, fmt.Errorf("CancelController failed to persist cancelling log for sqid=%s: %w", cancelRequest.ID, err) } if err := c.publishToQueue(ctx, cancelRequest); err != nil { diff --git a/submitqueue/gateway/controller/cancel_test.go b/submitqueue/gateway/controller/cancel_test.go index 43526797..cb78f1af 100644 --- a/submitqueue/gateway/controller/cancel_test.go +++ b/submitqueue/gateway/controller/cancel_test.go @@ -57,27 +57,40 @@ func newCancelTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controlle return registry } -// newRequestLogStoreNoop returns a RequestLogStore mock whose List returns a single -// dummy entry (so existence check passes) and whose Insert silently succeeds for any input. -func newRequestLogStoreNoop(t *testing.T, ctrl *gomock.Controller) *storagemock.MockRequestLogStore { +// newCancelStorageNoop returns a storage mock whose request log existence check, +// log insert, and summary upsert all succeed. +func newCancelStorageNoop(t *testing.T, ctrl *gomock.Controller) storage.Storage { t.Helper() - store := storagemock.NewMockRequestLogStore(ctrl) - store.EXPECT().List(gomock.Any(), gomock.Any()).Return([]entity.RequestLog{{}}, nil).AnyTimes() - store.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().List(gomock.Any(), gomock.Any()).Return([]entity.RequestLog{{}}, nil).AnyTimes() + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + summaryStore.EXPECT().UpsertFromLog(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + store.EXPECT().GetRequestSummaryStore().Return(summaryStore).AnyTimes() + return store +} + +func newCancelStorage(t *testing.T, ctrl *gomock.Controller, logStore storage.RequestLogStore, summaryStore storage.RequestSummaryStore) storage.Storage { + t.Helper() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + store.EXPECT().GetRequestSummaryStore().Return(summaryStore).AnyTimes() return store } func TestNewCancelController(t *testing.T) { ctrl := gomock.NewController(t) - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newRequestLogStoreNoop(t, ctrl), newCancelTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorageNoop(t, ctrl), newCancelTestRegistryWithNoopPublisher(t, ctrl)) require.NotNil(t, controller) } func TestCancel_HappyPath(t *testing.T) { ctrl := gomock.NewController(t) - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newRequestLogStoreNoop(t, ctrl), newCancelTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorageNoop(t, ctrl), newCancelTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.CancelRequest{Sqid: "test-queue/42", Reason: "user changed their mind"} @@ -90,7 +103,7 @@ func TestCancel_HappyPath(t *testing.T) { func TestCancel_ReturnsErrorOnEmptySqid(t *testing.T) { ctrl := gomock.NewController(t) - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newRequestLogStoreNoop(t, ctrl), newCancelTestRegistryWithNoopPublisher(t, ctrl)) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorageNoop(t, ctrl), newCancelTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.CancelRequest{Sqid: "", Reason: "anything"} @@ -115,7 +128,7 @@ func TestCancel_PublishesToQueue(t *testing.T) { }, ) - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newRequestLogStoreNoop(t, ctrl), registry) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorageNoop(t, ctrl), registry) ctx := context.Background() req := &pb.CancelRequest{Sqid: "my-queue/7", Reason: "obsolete change"} @@ -147,6 +160,8 @@ func TestCancel_InsertsCancellingLog(t *testing.T) { return nil }, ).Times(1) + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + summaryStore.EXPECT().UpsertFromLog(gomock.Any(), gomock.Any()).Return(nil) registry, publisher := newCancelTestRegistry(t, ctrl) insertedBeforePublish := false @@ -157,7 +172,7 @@ func TestCancel_InsertsCancellingLog(t *testing.T) { }, ) - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, logStore, registry) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorage(t, ctrl, logStore, summaryStore), registry) req := &pb.CancelRequest{Sqid: "my-queue/42", Reason: "obsolete change"} _, err := controller.Cancel(context.Background(), req) @@ -177,12 +192,13 @@ func TestCancel_LogInsertFailure(t *testing.T) { logStore := storagemock.NewMockRequestLogStore(ctrl) logStore.EXPECT().List(gomock.Any(), "q/1").Return([]entity.RequestLog{{}}, nil) logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(fmt.Errorf("db unavailable")) + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) registry, publisher := newCancelTestRegistry(t, ctrl) // No Publish expectation: log insert must fail before publish runs. _ = publisher - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, logStore, registry) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorage(t, ctrl, logStore, summaryStore), registry) _, err := controller.Cancel(context.Background(), &pb.CancelRequest{Sqid: "q/1"}) require.Error(t, err) } @@ -193,7 +209,7 @@ func TestCancel_ReturnsErrorOnPublishFailure(t *testing.T) { registry, publisher := newCancelTestRegistry(t, ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newRequestLogStoreNoop(t, ctrl), registry) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorageNoop(t, ctrl), registry) ctx := context.Background() req := &pb.CancelRequest{Sqid: "test-queue/1"} @@ -211,12 +227,13 @@ func TestCancel_UnknownSqidIsUserError(t *testing.T) { logStore := storagemock.NewMockRequestLogStore(ctrl) logStore.EXPECT().List(gomock.Any(), "ghost/1").Return(nil, storage.ErrNotFound) // No Insert expectation: existence check must short-circuit before Insert. + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) registry, publisher := newCancelTestRegistry(t, ctrl) // No Publish expectation: existence check must short-circuit before Publish. _ = publisher - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, logStore, registry) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorage(t, ctrl, logStore, summaryStore), registry) _, err := controller.Cancel(context.Background(), &pb.CancelRequest{Sqid: "ghost/1"}) require.Error(t, err) assert.True(t, IsRequestNotFound(err)) @@ -236,11 +253,12 @@ func TestCancel_RequestLogLookupFailure(t *testing.T) { logStore := storagemock.NewMockRequestLogStore(ctrl) logStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, fmt.Errorf("log backend down")) + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) registry, publisher := newCancelTestRegistry(t, ctrl) _ = publisher - controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, logStore, registry) + controller := NewCancelController(zap.NewNop().Sugar(), tally.NoopScope, newCancelStorage(t, ctrl, logStore, summaryStore), registry) _, err := controller.Cancel(context.Background(), &pb.CancelRequest{Sqid: "q/1"}) require.Error(t, err) assert.False(t, errs.IsUserError(err)) diff --git a/submitqueue/gateway/controller/land.go b/submitqueue/gateway/controller/land.go index 2567a049..b27a1852 100644 --- a/submitqueue/gateway/controller/land.go +++ b/submitqueue/gateway/controller/land.go @@ -25,6 +25,7 @@ import ( entityqueue "github.com/uber/submitqueue/entity/messagequeue" "github.com/uber/submitqueue/extension/counter" "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/queueconfig" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -132,9 +133,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p // Record the accepted status in the request log for reconciliation. Once the request materializes as a Request entity, the status might be updated to "new". // It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a entityqueue. // Gateway has to stay consistent with the request log. - logEntry := entity.NewRequestLog(landRequest.ID, entity.RequestStatusAccepted, 0, "", nil) - if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { - return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", landRequest.ID, err) + logEntry := entity.NewRequestLogWithDetails(landRequest.ID, queue, change.URIs, entity.RequestStatusAccepted, 0, "", nil) + if err := request.PersistLog(ctx, c.store, logEntry); err != nil { + return nil, fmt.Errorf("LandController failed to persist request log for sqid=%s: %w", landRequest.ID, err) } c.logger.Debugw("land request created", diff --git a/submitqueue/gateway/controller/land_test.go b/submitqueue/gateway/controller/land_test.go index a3ac0707..44cbb291 100644 --- a/submitqueue/gateway/controller/land_test.go +++ b/submitqueue/gateway/controller/land_test.go @@ -67,8 +67,11 @@ func newTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) con func noopStorage(ctrl *gomock.Controller) storage.Storage { logStore := storagemock.NewMockRequestLogStore(ctrl) logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + summaryStore.EXPECT().UpsertFromLog(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + store.EXPECT().GetRequestSummaryStore().Return(summaryStore).AnyTimes() return store } diff --git a/submitqueue/gateway/controller/list.go b/submitqueue/gateway/controller/list.go new file mode 100644 index 00000000..654da52c --- /dev/null +++ b/submitqueue/gateway/controller/list.go @@ -0,0 +1,243 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "sort" + "time" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/queueconfig" + "github.com/uber/submitqueue/submitqueue/extension/storage" + pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" + "go.uber.org/zap" +) + +const ( + defaultListPageSize = 50 + maxListPageSize = 200 +) + +// ListController handles queue request listing for the gateway. +type ListController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + requestSummaryStore storage.RequestSummaryStore + queueConfigs queueconfig.Store +} + +// NewListController creates a new instance of the gateway list controller. +func NewListController(logger *zap.SugaredLogger, scope tally.Scope, requestSummaryStore storage.RequestSummaryStore, queueConfigs queueconfig.Store) *ListController { + return &ListController{ + logger: logger, + metricsScope: scope.SubScope("list_controller"), + requestSummaryStore: requestSummaryStore, + queueConfigs: queueConfigs, + } +} + +// List returns request summaries for one queue whose lifecycles overlap a time window. +func (c *ListController) List(ctx context.Context, req *pb.ListRequest) (*pb.ListResponse, error) { + start := time.Now() + defer func() { + c.metricsScope.Timer("list_latency").Record(time.Since(start)) + }() + c.metricsScope.Counter("list_count").Inc(1) + + if req.Queue == "" { + return nil, fmt.Errorf("ListController requires the request to have a queue name specified: %w", ErrInvalidRequest) + } + if req.StartTimeMs <= 0 || req.EndTimeMs <= 0 || req.StartTimeMs >= req.EndTimeMs { + return nil, fmt.Errorf("ListController requires a valid non-empty time window: %w", ErrInvalidRequest) + } + if req.PageSize < 0 { + return nil, fmt.Errorf("ListController page_size must be non-negative: %w", ErrInvalidRequest) + } + + statuses, err := canonicalStatuses(req.Statuses) + if err != nil { + return nil, fmt.Errorf("ListController invalid status filter: %w", err) + } + + if _, err := c.queueConfigs.Get(ctx, req.Queue); err != nil { + if errors.Is(err, queueconfig.ErrNotFound) { + return nil, errs.NewUserError(&UnrecognizedQueueError{Queue: req.Queue}) + } + return nil, fmt.Errorf("ListController failed to look up queue %q: %w", req.Queue, err) + } + + limit := int(req.PageSize) + if limit == 0 { + limit = defaultListPageSize + } + if limit > maxListPageSize { + limit = maxListPageSize + } + + cursor, err := decodeListPageToken(req.PageToken) + if err != nil { + return nil, fmt.Errorf("ListController invalid page token: %w", err) + } + if cursor != nil && !cursor.matches(req.Queue, req.StartTimeMs, req.EndTimeMs, statuses) { + return nil, fmt.Errorf("ListController page token does not match query: %w", ErrInvalidRequest) + } + + result, err := c.requestSummaryStore.List(ctx, storage.RequestSummaryListOptions{ + Queue: req.Queue, + StartTimeMs: req.StartTimeMs, + EndTimeMs: req.EndTimeMs, + Statuses: statuses, + Cursor: cursorStorage(cursor), + Limit: limit, + }) + if err != nil { + return nil, fmt.Errorf("ListController failed to list request summaries for queue=%s: %w", req.Queue, err) + } + + resp := &pb.ListResponse{ + Requests: make([]*pb.RequestSummary, 0, len(result.Requests)), + } + for _, summary := range result.Requests { + resp.Requests = append(resp.Requests, protoRequestSummary(summary)) + } + if result.NextCursor != nil { + token, err := encodeListPageToken(listPageToken{ + Queue: req.Queue, + StartTimeMs: req.StartTimeMs, + EndTimeMs: req.EndTimeMs, + Statuses: statusesToStrings(statuses), + StartedAtMs: result.NextCursor.StartedAtMs, + RequestID: result.NextCursor.RequestID, + }) + if err != nil { + return nil, fmt.Errorf("ListController failed to encode next page token: %w", err) + } + resp.NextPageToken = token + } + + c.logger.Debugw("request summaries listed", + "queue", req.Queue, + "count", len(resp.Requests), + "has_next_page", resp.NextPageToken != "", + ) + + return resp, nil +} + +type listPageToken struct { + Queue string `json:"queue"` + StartTimeMs int64 `json:"start_time_ms"` + EndTimeMs int64 `json:"end_time_ms"` + Statuses []string `json:"statuses"` + StartedAtMs int64 `json:"started_at_ms"` + RequestID string `json:"request_id"` +} + +func (t listPageToken) matches(queue string, startTimeMs, endTimeMs int64, statuses []entity.RequestStatus) bool { + return t.Queue == queue && + t.StartTimeMs == startTimeMs && + t.EndTimeMs == endTimeMs && + equalStrings(t.Statuses, statusesToStrings(statuses)) +} + +func encodeListPageToken(token listPageToken) (string, error) { + data, err := json.Marshal(token) + if err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(data), nil +} + +func decodeListPageToken(raw string) (*listPageToken, error) { + if raw == "" { + return nil, nil + } + data, err := base64.RawURLEncoding.DecodeString(raw) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidRequest, err) + } + var token listPageToken + if err := json.Unmarshal(data, &token); err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidRequest, err) + } + return &token, nil +} + +func cursorStorage(token *listPageToken) *storage.RequestSummaryCursor { + if token == nil { + return nil + } + return &storage.RequestSummaryCursor{StartedAtMs: token.StartedAtMs, RequestID: token.RequestID} +} + +func canonicalStatuses(raw []string) ([]entity.RequestStatus, error) { + seen := make(map[entity.RequestStatus]struct{}, len(raw)) + var statuses []entity.RequestStatus + for _, statusRaw := range raw { + status := entity.RequestStatus(statusRaw) + if !entity.IsKnownRequestStatus(status) { + return nil, fmt.Errorf("unknown status %q: %w", statusRaw, ErrInvalidRequest) + } + if _, ok := seen[status]; ok { + continue + } + seen[status] = struct{}{} + statuses = append(statuses, status) + } + sort.Slice(statuses, func(i, j int) bool { return statuses[i] < statuses[j] }) + return statuses, nil +} + +func statusesToStrings(statuses []entity.RequestStatus) []string { + out := make([]string, len(statuses)) + for i, status := range statuses { + out[i] = string(status) + } + return out +} + +func equalStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func protoRequestSummary(summary entity.RequestSummary) *pb.RequestSummary { + return &pb.RequestSummary{ + Sqid: summary.RequestID, + Queue: summary.Queue, + ChangeUris: summary.ChangeURIs, + Status: string(summary.Status), + LastError: summary.LastError, + Metadata: summary.Metadata, + StartedAtMs: summary.StartedAtMs, + UpdatedAtMs: summary.UpdatedAtMs, + CompletedAtMs: summary.CompletedAtMs, + Terminal: summary.Terminal, + } +} diff --git a/submitqueue/gateway/controller/list_test.go b/submitqueue/gateway/controller/list_test.go new file mode 100644 index 00000000..7c0289e9 --- /dev/null +++ b/submitqueue/gateway/controller/list_test.go @@ -0,0 +1,224 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/queueconfig" + qcmock "github.com/uber/submitqueue/submitqueue/extension/queueconfig/mock" + "github.com/uber/submitqueue/submitqueue/extension/storage" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +func TestList_ReturnsSummaries(t *testing.T) { + ctrl := gomock.NewController(t) + + qcs := qcmock.NewMockStore(ctrl) + qcs.EXPECT().Get(gomock.Any(), "q").Return(entity.QueueConfig{}, nil) + + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + summaryStore.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, opts storage.RequestSummaryListOptions) (storage.RequestSummaryListResult, error) { + assert.Equal(t, "q", opts.Queue) + assert.Equal(t, int64(100), opts.StartTimeMs) + assert.Equal(t, int64(200), opts.EndTimeMs) + assert.Equal(t, []entity.RequestStatus{entity.RequestStatusBuilding, entity.RequestStatusLanded}, opts.Statuses) + assert.Equal(t, defaultListPageSize, opts.Limit) + require.Nil(t, opts.Cursor) + return storage.RequestSummaryListResult{ + Requests: []entity.RequestSummary{ + { + RequestID: "q/2", + Queue: "q", + ChangeURIs: []string{"github://uber/repo/pull/2/abcdef"}, + Status: entity.RequestStatusBuilding, + LastError: "last", + Metadata: map[string]string{"k": "v"}, + StartedAtMs: 150, + UpdatedAtMs: 175, + CompletedAtMs: 0, + Terminal: false, + }, + }, + NextCursor: &storage.RequestSummaryCursor{StartedAtMs: 150, RequestID: "q/2"}, + }, nil + }, + ) + + controller := NewListController(zap.NewNop().Sugar(), tally.NoopScope, summaryStore, qcs) + resp, err := controller.List(context.Background(), &pb.ListRequest{ + Queue: "q", + StartTimeMs: 100, + EndTimeMs: 200, + Statuses: []string{"landed", "building", "landed"}, + }) + + require.NoError(t, err) + require.Len(t, resp.Requests, 1) + assert.Equal(t, "q/2", resp.Requests[0].Sqid) + assert.Equal(t, []string{"github://uber/repo/pull/2/abcdef"}, resp.Requests[0].ChangeUris) + assert.Equal(t, "building", resp.Requests[0].Status) + assert.Equal(t, "last", resp.Requests[0].LastError) + assert.Equal(t, map[string]string{"k": "v"}, resp.Requests[0].Metadata) + assert.Equal(t, int64(150), resp.Requests[0].StartedAtMs) + assert.Equal(t, int64(175), resp.Requests[0].UpdatedAtMs) + assert.NotEmpty(t, resp.NextPageToken) +} + +func TestList_UsesPageTokenCursor(t *testing.T) { + ctrl := gomock.NewController(t) + + qcs := qcmock.NewMockStore(ctrl) + qcs.EXPECT().Get(gomock.Any(), "q").Return(entity.QueueConfig{}, nil) + + token, err := encodeListPageToken(listPageToken{ + Queue: "q", + StartTimeMs: 100, + EndTimeMs: 200, + Statuses: []string{"building"}, + StartedAtMs: 150, + RequestID: "q/2", + }) + require.NoError(t, err) + + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + summaryStore.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, opts storage.RequestSummaryListOptions) (storage.RequestSummaryListResult, error) { + require.NotNil(t, opts.Cursor) + assert.Equal(t, int64(150), opts.Cursor.StartedAtMs) + assert.Equal(t, "q/2", opts.Cursor.RequestID) + assert.Equal(t, 10, opts.Limit) + return storage.RequestSummaryListResult{}, nil + }, + ) + + controller := NewListController(zap.NewNop().Sugar(), tally.NoopScope, summaryStore, qcs) + _, err = controller.List(context.Background(), &pb.ListRequest{ + Queue: "q", + StartTimeMs: 100, + EndTimeMs: 200, + Statuses: []string{"building"}, + PageSize: 10, + PageToken: token, + }) + require.NoError(t, err) +} + +func TestList_Errors(t *testing.T) { + tests := []struct { + name string + req *pb.ListRequest + setupQueueConfig func(*qcmock.MockStore) + wantInvalid bool + wantUnrecognized bool + }{ + { + name: "empty queue", + req: &pb.ListRequest{StartTimeMs: 1, EndTimeMs: 2}, + wantInvalid: true, + }, + { + name: "invalid window", + req: &pb.ListRequest{Queue: "q", StartTimeMs: 2, EndTimeMs: 1}, + wantInvalid: true, + }, + { + name: "invalid page size", + req: &pb.ListRequest{Queue: "q", StartTimeMs: 1, EndTimeMs: 2, PageSize: -1}, + wantInvalid: true, + }, + { + name: "unknown status", + req: &pb.ListRequest{Queue: "q", StartTimeMs: 1, EndTimeMs: 2, Statuses: []string{"wat"}}, + wantInvalid: true, + }, + { + name: "unrecognized queue", + req: &pb.ListRequest{Queue: "missing", StartTimeMs: 1, EndTimeMs: 2}, + setupQueueConfig: func(qcs *qcmock.MockStore) { + qcs.EXPECT().Get(gomock.Any(), "missing").Return(entity.QueueConfig{}, queueconfig.ErrNotFound) + }, + wantUnrecognized: true, + }, + { + name: "queue store failure", + req: &pb.ListRequest{Queue: "q", StartTimeMs: 1, EndTimeMs: 2}, + setupQueueConfig: func(qcs *qcmock.MockStore) { + qcs.EXPECT().Get(gomock.Any(), "q").Return(entity.QueueConfig{}, fmt.Errorf("backend down")) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + qcs := qcmock.NewMockStore(ctrl) + if tt.setupQueueConfig != nil { + tt.setupQueueConfig(qcs) + } + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + controller := NewListController(zap.NewNop().Sugar(), tally.NoopScope, summaryStore, qcs) + + _, err := controller.List(context.Background(), tt.req) + + require.Error(t, err) + assert.Equal(t, tt.wantUnrecognized, IsUnrecognizedQueue(err)) + if tt.wantInvalid { + assert.True(t, IsInvalidRequest(err)) + } + if tt.wantUnrecognized { + assert.True(t, errs.IsUserError(err)) + } + }) + } +} + +func TestList_RejectsMismatchedPageToken(t *testing.T) { + ctrl := gomock.NewController(t) + + qcs := qcmock.NewMockStore(ctrl) + qcs.EXPECT().Get(gomock.Any(), "q").Return(entity.QueueConfig{}, nil) + + token, err := encodeListPageToken(listPageToken{ + Queue: "other", + StartTimeMs: 1, + EndTimeMs: 2, + }) + require.NoError(t, err) + + summaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + controller := NewListController(zap.NewNop().Sugar(), tally.NoopScope, summaryStore, qcs) + + _, err = controller.List(context.Background(), &pb.ListRequest{ + Queue: "q", + StartTimeMs: 1, + EndTimeMs: 2, + PageToken: token, + }) + + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} diff --git a/submitqueue/gateway/controller/log/BUILD.bazel b/submitqueue/gateway/controller/log/BUILD.bazel index 60647c61..87463c68 100644 --- a/submitqueue/gateway/controller/log/BUILD.bazel +++ b/submitqueue/gateway/controller/log/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//core/metrics", "//submitqueue/core/consumer", + "//submitqueue/core/request", "//submitqueue/entity", "//submitqueue/extension/storage", "@com_github_uber_go_tally_v4//:tally", diff --git a/submitqueue/gateway/controller/log/log.go b/submitqueue/gateway/controller/log/log.go index 311681a0..4224b637 100644 --- a/submitqueue/gateway/controller/log/log.go +++ b/submitqueue/gateway/controller/log/log.go @@ -21,6 +21,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/submitqueue/core/consumer" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" @@ -87,10 +88,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "attempt", delivery.Attempt(), ) - // Persist request log to storage - if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { + // Persist request log and summary to storage. + if err := corerequest.PersistLog(ctx, c.store, logEntry); err != nil { metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1) - return fmt.Errorf("failed to insert request log: %w", err) + return fmt.Errorf("failed to persist request log: %w", err) } return nil // Success - message will be acked diff --git a/submitqueue/gateway/controller/log/log_test.go b/submitqueue/gateway/controller/log/log_test.go index c88fad1a..1ecdf6f9 100644 --- a/submitqueue/gateway/controller/log/log_test.go +++ b/submitqueue/gateway/controller/log/log_test.go @@ -54,8 +54,11 @@ func TestController_Process(t *testing.T) { setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { mockLogStore := storagemock.NewMockRequestLogStore(ctrl) mockLogStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + mockSummaryStore := storagemock.NewMockRequestSummaryStore(ctrl) + mockSummaryStore.EXPECT().UpsertFromLog(gomock.Any(), gomock.Any()).Return(nil) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestLogStore().Return(mockLogStore).AnyTimes() + store.EXPECT().GetRequestSummaryStore().Return(mockSummaryStore).AnyTimes() return store }, wantErr: false, @@ -76,8 +79,10 @@ func TestController_Process(t *testing.T) { setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { mockLogStore := storagemock.NewMockRequestLogStore(ctrl) mockLogStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database connection failed")) + mockSummaryStore := storagemock.NewMockRequestSummaryStore(ctrl) store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestLogStore().Return(mockLogStore).AnyTimes() + store.EXPECT().GetRequestSummaryStore().Return(mockSummaryStore).AnyTimes() return store }, wantErr: true, diff --git a/submitqueue/gateway/proto/gateway.proto b/submitqueue/gateway/proto/gateway.proto index e6482b37..d45d1398 100644 --- a/submitqueue/gateway/proto/gateway.proto +++ b/submitqueue/gateway/proto/gateway.proto @@ -126,6 +126,53 @@ message StatusResponse { map metadata = 3; } +// ListRequest defines a request to list queue requests whose lifecycles overlap a time window. +message ListRequest { + // Name of the queue whose requests should be returned. + string queue = 1; + // Inclusive lower bound for lifecycle-overlap filtering, in Unix epoch milliseconds. + int64 start_time_ms = 2; + // Exclusive upper bound for lifecycle-overlap filtering, in Unix epoch milliseconds. + int64 end_time_ms = 3; + // Optional current status filters. Values use the same customer-facing strings returned by Status. + repeated string statuses = 4; + // Maximum number of requests to return. Zero means the server default. + int32 page_size = 5; + // Opaque token returned by a previous List response. + string page_token = 6; +} + +// RequestSummary is the current gateway-owned view of one request for queue-listing UX. +message RequestSummary { + // Globally unique request ID, as returned by Land. + string sqid = 1; + // Queue the request belongs to. + string queue = 2; + // Change URIs submitted with the request. + repeated string change_uris = 3; + // Current customer-friendly status of the request. + string status = 4; + // Last error message associated with the current status. Empty string if there is no error. + string last_error = 5; + // Free-form key-value metadata associated with the current status. + map metadata = 6; + // Time the request entered SubmitQueue, in Unix epoch milliseconds. + int64 started_at_ms = 7; + // Time the visible summary state last changed, in Unix epoch milliseconds. + int64 updated_at_ms = 8; + // Time the request completed, in Unix epoch milliseconds. Zero if the request is not terminal. + int64 completed_at_ms = 9; + // True when status is terminal, e.g. "landed", "error", or "cancelled". + bool terminal = 10; +} + +// ListResponse defines a page of queue request summaries. +message ListResponse { + repeated RequestSummary requests = 1; + // Opaque token for the next page. Empty when no further page exists. + string next_page_token = 2; +} + // *************** // Error messages, returned as `google.rpc.Status` messages. // *************** @@ -181,4 +228,7 @@ service SubmitQueueGateway { // Status returns the current status of a previously submitted request, identified by its sqid. // The status is eventually consistent with the request store and reconciled from the append-only request log. rpc Status(StatusRequest) returns (StatusResponse) {} + + // List returns request summaries for one queue whose lifecycles overlap a time window. + rpc List(ListRequest) returns (ListResponse) {} } diff --git a/submitqueue/gateway/protopb/gateway.pb.go b/submitqueue/gateway/protopb/gateway.pb.go index 85d1aff2..fbe39e16 100644 --- a/submitqueue/gateway/protopb/gateway.pb.go +++ b/submitqueue/gateway/protopb/gateway.pb.go @@ -592,6 +592,278 @@ func (x *StatusResponse) GetMetadata() map[string]string { return nil } +// ListRequest defines a request to list queue requests whose lifecycles overlap a time window. +type ListRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Name of the queue whose requests should be returned. + Queue string `protobuf:"bytes,1,opt,name=queue,proto3" json:"queue,omitempty"` + // Inclusive lower bound for lifecycle-overlap filtering, in Unix epoch milliseconds. + StartTimeMs int64 `protobuf:"varint,2,opt,name=start_time_ms,json=startTimeMs,proto3" json:"start_time_ms,omitempty"` + // Exclusive upper bound for lifecycle-overlap filtering, in Unix epoch milliseconds. + EndTimeMs int64 `protobuf:"varint,3,opt,name=end_time_ms,json=endTimeMs,proto3" json:"end_time_ms,omitempty"` + // Optional current status filters. Values use the same customer-facing strings returned by Status. + Statuses []string `protobuf:"bytes,4,rep,name=statuses,proto3" json:"statuses,omitempty"` + // Maximum number of requests to return. Zero means the server default. + PageSize int32 `protobuf:"varint,5,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + // Opaque token returned by a previous List response. + PageToken string `protobuf:"bytes,6,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListRequest) Reset() { + *x = ListRequest{} + mi := &file_gateway_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListRequest) ProtoMessage() {} + +func (x *ListRequest) ProtoReflect() protoreflect.Message { + mi := &file_gateway_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListRequest.ProtoReflect.Descriptor instead. +func (*ListRequest) Descriptor() ([]byte, []int) { + return file_gateway_proto_rawDescGZIP(), []int{9} +} + +func (x *ListRequest) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +func (x *ListRequest) GetStartTimeMs() int64 { + if x != nil { + return x.StartTimeMs + } + return 0 +} + +func (x *ListRequest) GetEndTimeMs() int64 { + if x != nil { + return x.EndTimeMs + } + return 0 +} + +func (x *ListRequest) GetStatuses() []string { + if x != nil { + return x.Statuses + } + return nil +} + +func (x *ListRequest) GetPageSize() int32 { + if x != nil { + return x.PageSize + } + return 0 +} + +func (x *ListRequest) GetPageToken() string { + if x != nil { + return x.PageToken + } + return "" +} + +// RequestSummary is the current gateway-owned view of one request for queue-listing UX. +type RequestSummary struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Globally unique request ID, as returned by Land. + Sqid string `protobuf:"bytes,1,opt,name=sqid,proto3" json:"sqid,omitempty"` + // Queue the request belongs to. + Queue string `protobuf:"bytes,2,opt,name=queue,proto3" json:"queue,omitempty"` + // Change URIs submitted with the request. + ChangeUris []string `protobuf:"bytes,3,rep,name=change_uris,json=changeUris,proto3" json:"change_uris,omitempty"` + // Current customer-friendly status of the request. + Status string `protobuf:"bytes,4,opt,name=status,proto3" json:"status,omitempty"` + // Last error message associated with the current status. Empty string if there is no error. + LastError string `protobuf:"bytes,5,opt,name=last_error,json=lastError,proto3" json:"last_error,omitempty"` + // Free-form key-value metadata associated with the current status. + Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // Time the request entered SubmitQueue, in Unix epoch milliseconds. + StartedAtMs int64 `protobuf:"varint,7,opt,name=started_at_ms,json=startedAtMs,proto3" json:"started_at_ms,omitempty"` + // Time the visible summary state last changed, in Unix epoch milliseconds. + UpdatedAtMs int64 `protobuf:"varint,8,opt,name=updated_at_ms,json=updatedAtMs,proto3" json:"updated_at_ms,omitempty"` + // Time the request completed, in Unix epoch milliseconds. Zero if the request is not terminal. + CompletedAtMs int64 `protobuf:"varint,9,opt,name=completed_at_ms,json=completedAtMs,proto3" json:"completed_at_ms,omitempty"` + // True when status is terminal, e.g. "landed", "error", or "cancelled". + Terminal bool `protobuf:"varint,10,opt,name=terminal,proto3" json:"terminal,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RequestSummary) Reset() { + *x = RequestSummary{} + mi := &file_gateway_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RequestSummary) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestSummary) ProtoMessage() {} + +func (x *RequestSummary) ProtoReflect() protoreflect.Message { + mi := &file_gateway_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestSummary.ProtoReflect.Descriptor instead. +func (*RequestSummary) Descriptor() ([]byte, []int) { + return file_gateway_proto_rawDescGZIP(), []int{10} +} + +func (x *RequestSummary) GetSqid() string { + if x != nil { + return x.Sqid + } + return "" +} + +func (x *RequestSummary) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +func (x *RequestSummary) GetChangeUris() []string { + if x != nil { + return x.ChangeUris + } + return nil +} + +func (x *RequestSummary) GetStatus() string { + if x != nil { + return x.Status + } + return "" +} + +func (x *RequestSummary) GetLastError() string { + if x != nil { + return x.LastError + } + return "" +} + +func (x *RequestSummary) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *RequestSummary) GetStartedAtMs() int64 { + if x != nil { + return x.StartedAtMs + } + return 0 +} + +func (x *RequestSummary) GetUpdatedAtMs() int64 { + if x != nil { + return x.UpdatedAtMs + } + return 0 +} + +func (x *RequestSummary) GetCompletedAtMs() int64 { + if x != nil { + return x.CompletedAtMs + } + return 0 +} + +func (x *RequestSummary) GetTerminal() bool { + if x != nil { + return x.Terminal + } + return false +} + +// ListResponse defines a page of queue request summaries. +type ListResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Requests []*RequestSummary `protobuf:"bytes,1,rep,name=requests,proto3" json:"requests,omitempty"` + // Opaque token for the next page. Empty when no further page exists. + NextPageToken string `protobuf:"bytes,2,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListResponse) Reset() { + *x = ListResponse{} + mi := &file_gateway_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListResponse) ProtoMessage() {} + +func (x *ListResponse) ProtoReflect() protoreflect.Message { + mi := &file_gateway_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListResponse.ProtoReflect.Descriptor instead. +func (*ListResponse) Descriptor() ([]byte, []int) { + return file_gateway_proto_rawDescGZIP(), []int{11} +} + +func (x *ListResponse) GetRequests() []*RequestSummary { + if x != nil { + return x.Requests + } + return nil +} + +func (x *ListResponse) GetNextPageToken() string { + if x != nil { + return x.NextPageToken + } + return "" +} + // Generic error with metadata. Each custom error type should extend this message. type Error struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -603,7 +875,7 @@ type Error struct { func (x *Error) Reset() { *x = Error{} - mi := &file_gateway_proto_msgTypes[9] + mi := &file_gateway_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -615,7 +887,7 @@ func (x *Error) String() string { func (*Error) ProtoMessage() {} func (x *Error) ProtoReflect() protoreflect.Message { - mi := &file_gateway_proto_msgTypes[9] + mi := &file_gateway_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -628,7 +900,7 @@ func (x *Error) ProtoReflect() protoreflect.Message { // Deprecated: Use Error.ProtoReflect.Descriptor instead. func (*Error) Descriptor() ([]byte, []int) { - return file_gateway_proto_rawDescGZIP(), []int{9} + return file_gateway_proto_rawDescGZIP(), []int{12} } func (x *Error) GetMessage() string { @@ -651,7 +923,7 @@ type UnrecognizedQueueError struct { func (x *UnrecognizedQueueError) Reset() { *x = UnrecognizedQueueError{} - mi := &file_gateway_proto_msgTypes[10] + mi := &file_gateway_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -663,7 +935,7 @@ func (x *UnrecognizedQueueError) String() string { func (*UnrecognizedQueueError) ProtoMessage() {} func (x *UnrecognizedQueueError) ProtoReflect() protoreflect.Message { - mi := &file_gateway_proto_msgTypes[10] + mi := &file_gateway_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -676,7 +948,7 @@ func (x *UnrecognizedQueueError) ProtoReflect() protoreflect.Message { // Deprecated: Use UnrecognizedQueueError.ProtoReflect.Descriptor instead. func (*UnrecognizedQueueError) Descriptor() ([]byte, []int) { - return file_gateway_proto_rawDescGZIP(), []int{10} + return file_gateway_proto_rawDescGZIP(), []int{13} } func (x *UnrecognizedQueueError) GetError() *Error { @@ -706,7 +978,7 @@ type RequestNotFoundError struct { func (x *RequestNotFoundError) Reset() { *x = RequestNotFoundError{} - mi := &file_gateway_proto_msgTypes[11] + mi := &file_gateway_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -718,7 +990,7 @@ func (x *RequestNotFoundError) String() string { func (*RequestNotFoundError) ProtoMessage() {} func (x *RequestNotFoundError) ProtoReflect() protoreflect.Message { - mi := &file_gateway_proto_msgTypes[11] + mi := &file_gateway_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -731,7 +1003,7 @@ func (x *RequestNotFoundError) ProtoReflect() protoreflect.Message { // Deprecated: Use RequestNotFoundError.ProtoReflect.Descriptor instead. func (*RequestNotFoundError) Descriptor() ([]byte, []int) { - return file_gateway_proto_rawDescGZIP(), []int{11} + return file_gateway_proto_rawDescGZIP(), []int{14} } func (x *RequestNotFoundError) GetError() *Error { @@ -781,7 +1053,35 @@ const file_gateway_proto_rawDesc = "" + "\bmetadata\x18\x03 \x03(\v26.uber.submitqueue.gateway.StatusResponse.MetadataEntryR\bmetadata\x1a;\n" + "\rMetadataEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"!\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xbf\x01\n" + + "\vListRequest\x12\x14\n" + + "\x05queue\x18\x01 \x01(\tR\x05queue\x12\"\n" + + "\rstart_time_ms\x18\x02 \x01(\x03R\vstartTimeMs\x12\x1e\n" + + "\vend_time_ms\x18\x03 \x01(\x03R\tendTimeMs\x12\x1a\n" + + "\bstatuses\x18\x04 \x03(\tR\bstatuses\x12\x1b\n" + + "\tpage_size\x18\x05 \x01(\x05R\bpageSize\x12\x1d\n" + + "\n" + + "page_token\x18\x06 \x01(\tR\tpageToken\"\xaf\x03\n" + + "\x0eRequestSummary\x12\x12\n" + + "\x04sqid\x18\x01 \x01(\tR\x04sqid\x12\x14\n" + + "\x05queue\x18\x02 \x01(\tR\x05queue\x12\x1f\n" + + "\vchange_uris\x18\x03 \x03(\tR\n" + + "changeUris\x12\x16\n" + + "\x06status\x18\x04 \x01(\tR\x06status\x12\x1d\n" + + "\n" + + "last_error\x18\x05 \x01(\tR\tlastError\x12R\n" + + "\bmetadata\x18\x06 \x03(\v26.uber.submitqueue.gateway.RequestSummary.MetadataEntryR\bmetadata\x12\"\n" + + "\rstarted_at_ms\x18\a \x01(\x03R\vstartedAtMs\x12\"\n" + + "\rupdated_at_ms\x18\b \x01(\x03R\vupdatedAtMs\x12&\n" + + "\x0fcompleted_at_ms\x18\t \x01(\x03R\rcompletedAtMs\x12\x1a\n" + + "\bterminal\x18\n" + + " \x01(\bR\bterminal\x1a;\n" + + "\rMetadataEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"|\n" + + "\fListResponse\x12D\n" + + "\brequests\x18\x01 \x03(\v2(.uber.submitqueue.gateway.RequestSummaryR\brequests\x12&\n" + + "\x0fnext_page_token\x18\x02 \x01(\tR\rnextPageToken\"!\n" + "\x05Error\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"e\n" + "\x16UnrecognizedQueueError\x125\n" + @@ -795,12 +1095,13 @@ const file_gateway_proto_rawDesc = "" + "\n" + "\x06REBASE\x10\x01\x12\x11\n" + "\rSQUASH_REBASE\x10\x02\x12\t\n" + - "\x05MERGE\x10\x032\x84\x03\n" + + "\x05MERGE\x10\x032\xdd\x03\n" + "\x12SubmitQueueGateway\x12W\n" + "\x04Ping\x12%.uber.submitqueue.gateway.PingRequest\x1a&.uber.submitqueue.gateway.PingResponse\"\x00\x12W\n" + "\x04Land\x12%.uber.submitqueue.gateway.LandRequest\x1a&.uber.submitqueue.gateway.LandResponse\"\x00\x12]\n" + "\x06Cancel\x12'.uber.submitqueue.gateway.CancelRequest\x1a(.uber.submitqueue.gateway.CancelResponse\"\x00\x12]\n" + - "\x06Status\x12'.uber.submitqueue.gateway.StatusRequest\x1a(.uber.submitqueue.gateway.StatusResponse\"\x00Bg\n" + + "\x06Status\x12'.uber.submitqueue.gateway.StatusRequest\x1a(.uber.submitqueue.gateway.StatusResponse\"\x00\x12W\n" + + "\x04List\x12%.uber.submitqueue.gateway.ListRequest\x1a&.uber.submitqueue.gateway.ListResponse\"\x00Bg\n" + "\x1ccom.uber.submitqueue.gatewayB\fGatewayProtoP\x01Z7github.com/uber/submitqueue/submitqueue/gateway/protopbb\x06proto3" var ( @@ -816,7 +1117,7 @@ func file_gateway_proto_rawDescGZIP() []byte { } var file_gateway_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_gateway_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_gateway_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_gateway_proto_goTypes = []any{ (Strategy)(0), // 0: uber.submitqueue.gateway.Strategy (*PingRequest)(nil), // 1: uber.submitqueue.gateway.PingRequest @@ -828,30 +1129,38 @@ var file_gateway_proto_goTypes = []any{ (*CancelResponse)(nil), // 7: uber.submitqueue.gateway.CancelResponse (*StatusRequest)(nil), // 8: uber.submitqueue.gateway.StatusRequest (*StatusResponse)(nil), // 9: uber.submitqueue.gateway.StatusResponse - (*Error)(nil), // 10: uber.submitqueue.gateway.Error - (*UnrecognizedQueueError)(nil), // 11: uber.submitqueue.gateway.UnrecognizedQueueError - (*RequestNotFoundError)(nil), // 12: uber.submitqueue.gateway.RequestNotFoundError - nil, // 13: uber.submitqueue.gateway.StatusResponse.MetadataEntry + (*ListRequest)(nil), // 10: uber.submitqueue.gateway.ListRequest + (*RequestSummary)(nil), // 11: uber.submitqueue.gateway.RequestSummary + (*ListResponse)(nil), // 12: uber.submitqueue.gateway.ListResponse + (*Error)(nil), // 13: uber.submitqueue.gateway.Error + (*UnrecognizedQueueError)(nil), // 14: uber.submitqueue.gateway.UnrecognizedQueueError + (*RequestNotFoundError)(nil), // 15: uber.submitqueue.gateway.RequestNotFoundError + nil, // 16: uber.submitqueue.gateway.StatusResponse.MetadataEntry + nil, // 17: uber.submitqueue.gateway.RequestSummary.MetadataEntry } var file_gateway_proto_depIdxs = []int32{ 3, // 0: uber.submitqueue.gateway.LandRequest.change:type_name -> uber.submitqueue.gateway.Change 0, // 1: uber.submitqueue.gateway.LandRequest.strategy:type_name -> uber.submitqueue.gateway.Strategy - 13, // 2: uber.submitqueue.gateway.StatusResponse.metadata:type_name -> uber.submitqueue.gateway.StatusResponse.MetadataEntry - 10, // 3: uber.submitqueue.gateway.UnrecognizedQueueError.error:type_name -> uber.submitqueue.gateway.Error - 10, // 4: uber.submitqueue.gateway.RequestNotFoundError.error:type_name -> uber.submitqueue.gateway.Error - 1, // 5: uber.submitqueue.gateway.SubmitQueueGateway.Ping:input_type -> uber.submitqueue.gateway.PingRequest - 4, // 6: uber.submitqueue.gateway.SubmitQueueGateway.Land:input_type -> uber.submitqueue.gateway.LandRequest - 6, // 7: uber.submitqueue.gateway.SubmitQueueGateway.Cancel:input_type -> uber.submitqueue.gateway.CancelRequest - 8, // 8: uber.submitqueue.gateway.SubmitQueueGateway.Status:input_type -> uber.submitqueue.gateway.StatusRequest - 2, // 9: uber.submitqueue.gateway.SubmitQueueGateway.Ping:output_type -> uber.submitqueue.gateway.PingResponse - 5, // 10: uber.submitqueue.gateway.SubmitQueueGateway.Land:output_type -> uber.submitqueue.gateway.LandResponse - 7, // 11: uber.submitqueue.gateway.SubmitQueueGateway.Cancel:output_type -> uber.submitqueue.gateway.CancelResponse - 9, // 12: uber.submitqueue.gateway.SubmitQueueGateway.Status:output_type -> uber.submitqueue.gateway.StatusResponse - 9, // [9:13] is the sub-list for method output_type - 5, // [5:9] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 16, // 2: uber.submitqueue.gateway.StatusResponse.metadata:type_name -> uber.submitqueue.gateway.StatusResponse.MetadataEntry + 17, // 3: uber.submitqueue.gateway.RequestSummary.metadata:type_name -> uber.submitqueue.gateway.RequestSummary.MetadataEntry + 11, // 4: uber.submitqueue.gateway.ListResponse.requests:type_name -> uber.submitqueue.gateway.RequestSummary + 13, // 5: uber.submitqueue.gateway.UnrecognizedQueueError.error:type_name -> uber.submitqueue.gateway.Error + 13, // 6: uber.submitqueue.gateway.RequestNotFoundError.error:type_name -> uber.submitqueue.gateway.Error + 1, // 7: uber.submitqueue.gateway.SubmitQueueGateway.Ping:input_type -> uber.submitqueue.gateway.PingRequest + 4, // 8: uber.submitqueue.gateway.SubmitQueueGateway.Land:input_type -> uber.submitqueue.gateway.LandRequest + 6, // 9: uber.submitqueue.gateway.SubmitQueueGateway.Cancel:input_type -> uber.submitqueue.gateway.CancelRequest + 8, // 10: uber.submitqueue.gateway.SubmitQueueGateway.Status:input_type -> uber.submitqueue.gateway.StatusRequest + 10, // 11: uber.submitqueue.gateway.SubmitQueueGateway.List:input_type -> uber.submitqueue.gateway.ListRequest + 2, // 12: uber.submitqueue.gateway.SubmitQueueGateway.Ping:output_type -> uber.submitqueue.gateway.PingResponse + 5, // 13: uber.submitqueue.gateway.SubmitQueueGateway.Land:output_type -> uber.submitqueue.gateway.LandResponse + 7, // 14: uber.submitqueue.gateway.SubmitQueueGateway.Cancel:output_type -> uber.submitqueue.gateway.CancelResponse + 9, // 15: uber.submitqueue.gateway.SubmitQueueGateway.Status:output_type -> uber.submitqueue.gateway.StatusResponse + 12, // 16: uber.submitqueue.gateway.SubmitQueueGateway.List:output_type -> uber.submitqueue.gateway.ListResponse + 12, // [12:17] is the sub-list for method output_type + 7, // [7:12] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_gateway_proto_init() } @@ -865,7 +1174,7 @@ func file_gateway_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_gateway_proto_rawDesc), len(file_gateway_proto_rawDesc)), NumEnums: 1, - NumMessages: 13, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/submitqueue/gateway/protopb/gateway.pb.yarpc.go b/submitqueue/gateway/protopb/gateway.pb.yarpc.go index e04d0127..56adea53 100644 --- a/submitqueue/gateway/protopb/gateway.pb.yarpc.go +++ b/submitqueue/gateway/protopb/gateway.pb.yarpc.go @@ -25,6 +25,7 @@ type SubmitQueueGatewayYARPCClient interface { Land(context.Context, *LandRequest, ...yarpc.CallOption) (*LandResponse, error) Cancel(context.Context, *CancelRequest, ...yarpc.CallOption) (*CancelResponse, error) Status(context.Context, *StatusRequest, ...yarpc.CallOption) (*StatusResponse, error) + List(context.Context, *ListRequest, ...yarpc.CallOption) (*ListResponse, error) } func newSubmitQueueGatewayYARPCClient(clientConfig transport.ClientConfig, anyResolver v2.AnyResolver, options ...v2.ClientOption) SubmitQueueGatewayYARPCClient { @@ -49,6 +50,7 @@ type SubmitQueueGatewayYARPCServer interface { Land(context.Context, *LandRequest) (*LandResponse, error) Cancel(context.Context, *CancelRequest) (*CancelResponse, error) Status(context.Context, *StatusRequest) (*StatusResponse, error) + List(context.Context, *ListRequest) (*ListResponse, error) } type buildSubmitQueueGatewayYARPCProceduresParams struct { @@ -102,6 +104,16 @@ func buildSubmitQueueGatewayYARPCProcedures(params buildSubmitQueueGatewayYARPCP }, ), }, + { + MethodName: "List", + Handler: v2.NewUnaryHandler( + v2.UnaryHandlerParams{ + Handle: handler.List, + NewRequest: newSubmitQueueGatewayServiceListYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, }, OnewayHandlerParams: []v2.BuildProceduresOnewayHandlerParams{}, StreamHandlerParams: []v2.BuildProceduresStreamHandlerParams{}, @@ -262,6 +274,18 @@ func (c *_SubmitQueueGatewayYARPCCaller) Status(ctx context.Context, request *St return response, err } +func (c *_SubmitQueueGatewayYARPCCaller) List(ctx context.Context, request *ListRequest, options ...yarpc.CallOption) (*ListResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "List", request, newSubmitQueueGatewayServiceListYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*ListResponse) + if !ok { + return nil, v2.CastError(emptySubmitQueueGatewayServiceListYARPCResponse, responseMessage) + } + return response, err +} + type _SubmitQueueGatewayYARPCHandler struct { server SubmitQueueGatewayYARPCServer } @@ -330,6 +354,22 @@ func (h *_SubmitQueueGatewayYARPCHandler) Status(ctx context.Context, requestMes return response, err } +func (h *_SubmitQueueGatewayYARPCHandler) List(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *ListRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*ListRequest) + if !ok { + return nil, v2.CastError(emptySubmitQueueGatewayServiceListYARPCRequest, requestMessage) + } + } + response, err := h.server.List(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + func newSubmitQueueGatewayServicePingYARPCRequest() proto.Message { return &PingRequest{} } @@ -362,6 +402,14 @@ func newSubmitQueueGatewayServiceStatusYARPCResponse() proto.Message { return &StatusResponse{} } +func newSubmitQueueGatewayServiceListYARPCRequest() proto.Message { + return &ListRequest{} +} + +func newSubmitQueueGatewayServiceListYARPCResponse() proto.Message { + return &ListResponse{} +} + var ( emptySubmitQueueGatewayServicePingYARPCRequest = &PingRequest{} emptySubmitQueueGatewayServicePingYARPCResponse = &PingResponse{} @@ -371,52 +419,71 @@ var ( emptySubmitQueueGatewayServiceCancelYARPCResponse = &CancelResponse{} emptySubmitQueueGatewayServiceStatusYARPCRequest = &StatusRequest{} emptySubmitQueueGatewayServiceStatusYARPCResponse = &StatusResponse{} + emptySubmitQueueGatewayServiceListYARPCRequest = &ListRequest{} + emptySubmitQueueGatewayServiceListYARPCResponse = &ListResponse{} ) var yarpcFileDescriptorClosuref1a937782ebbded5 = [][]byte{ // gateway.proto []byte{ - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0x51, 0x6b, 0xdb, 0x3c, - 0x14, 0x8d, 0xe3, 0xc4, 0x4d, 0xae, 0x93, 0x92, 0x4f, 0x94, 0x62, 0x42, 0x3f, 0x96, 0x6a, 0x6c, - 0x0d, 0x7b, 0x70, 0x21, 0x63, 0x5b, 0x59, 0x61, 0x90, 0x76, 0x6e, 0xf7, 0xd0, 0x96, 0xd6, 0x5e, - 0x19, 0x0c, 0x46, 0x51, 0x1c, 0xe1, 0x9a, 0xd5, 0x76, 0x6a, 0xc9, 0x1d, 0xd9, 0xf3, 0xf6, 0x33, - 0xf6, 0xbf, 0xf6, 0x73, 0x86, 0x25, 0xb9, 0x75, 0x60, 0x76, 0x1f, 0xf6, 0xa6, 0x7b, 0x75, 0xce, - 0xb9, 0x47, 0xd2, 0xbd, 0x36, 0xf4, 0x03, 0xc2, 0xe9, 0x37, 0xb2, 0xb4, 0x17, 0x69, 0xc2, 0x13, - 0x64, 0x65, 0x33, 0x9a, 0xda, 0x2c, 0x9b, 0x45, 0x21, 0xbf, 0xcd, 0x68, 0x46, 0x6d, 0xb5, 0x8f, - 0x77, 0xc0, 0x3c, 0x0f, 0xe3, 0xc0, 0xa5, 0xb7, 0x19, 0x65, 0x1c, 0x59, 0xb0, 0x16, 0x51, 0xc6, - 0x48, 0x40, 0x2d, 0x6d, 0xa4, 0x8d, 0xbb, 0x6e, 0x11, 0xe2, 0x9f, 0x1a, 0xf4, 0x24, 0x92, 0x2d, - 0x92, 0x98, 0xd1, 0x6a, 0x28, 0xda, 0x86, 0x1e, 0xa3, 0xe9, 0x5d, 0xe8, 0xd3, 0xab, 0x98, 0x44, - 0xd4, 0x6a, 0x8a, 0x6d, 0x53, 0xe5, 0xce, 0x48, 0x44, 0xd1, 0x16, 0x74, 0x79, 0x18, 0x51, 0xc6, - 0x49, 0xb4, 0xb0, 0xf4, 0x91, 0x36, 0xd6, 0xdd, 0x87, 0x04, 0x1a, 0x42, 0xe7, 0x3a, 0x61, 0x5c, - 0x90, 0x5b, 0x82, 0x7c, 0x1f, 0xe3, 0x2d, 0x30, 0x0e, 0xaf, 0x49, 0x1c, 0x50, 0x84, 0xa0, 0x95, - 0xa5, 0x21, 0xb3, 0xb4, 0x91, 0x3e, 0xee, 0xba, 0x62, 0x8d, 0x7f, 0x69, 0x60, 0x9e, 0x90, 0x78, - 0x5e, 0x9c, 0x67, 0x03, 0xda, 0xe2, 0xbc, 0xca, 0xa2, 0x0c, 0xd0, 0x1e, 0x18, 0xbe, 0xd0, 0x10, - 0xd6, 0xcc, 0xc9, 0xc8, 0xae, 0xba, 0x1f, 0x5b, 0xd6, 0x72, 0x15, 0x1e, 0xbd, 0x83, 0x0e, 0xe3, - 0x29, 0xe1, 0x34, 0x58, 0x0a, 0x67, 0xeb, 0x13, 0x5c, 0xcd, 0xf5, 0x14, 0xd2, 0xbd, 0xe7, 0x60, - 0x0c, 0x3d, 0x69, 0x4f, 0x5d, 0x22, 0x82, 0x16, 0xbb, 0x0d, 0xe7, 0xca, 0x9e, 0x58, 0xe3, 0x7d, - 0xe8, 0x1f, 0x92, 0xd8, 0xa7, 0x37, 0xc5, 0x21, 0xfe, 0x02, 0x42, 0x9b, 0x60, 0xa4, 0x94, 0xb0, - 0x24, 0x56, 0xb7, 0xab, 0x22, 0x3c, 0x80, 0xf5, 0x82, 0x2c, 0x4b, 0xe0, 0xa7, 0xd0, 0xf7, 0x38, - 0xe1, 0x19, 0xab, 0x91, 0xc3, 0xbf, 0x35, 0x58, 0x2f, 0x50, 0xca, 0xda, 0x26, 0x18, 0x4c, 0x64, - 0x14, 0x50, 0x45, 0xe8, 0x7f, 0x80, 0x1b, 0xc2, 0xf8, 0x15, 0x4d, 0xd3, 0x24, 0x55, 0xd5, 0xbb, - 0x79, 0xc6, 0xc9, 0x13, 0xc8, 0x85, 0x4e, 0x44, 0x39, 0x99, 0x13, 0x4e, 0x2c, 0x7d, 0xa4, 0x8f, - 0xcd, 0xc9, 0xeb, 0xba, 0x1b, 0x2a, 0x97, 0xb4, 0x4f, 0x15, 0xd1, 0x89, 0x79, 0xba, 0x74, 0xef, - 0x75, 0x86, 0xfb, 0xd0, 0x5f, 0xd9, 0x42, 0x03, 0xd0, 0xbf, 0xd2, 0xa5, 0x32, 0x96, 0x2f, 0xf3, - 0x87, 0xbe, 0x23, 0x37, 0x59, 0xd1, 0x6c, 0x32, 0x78, 0xdb, 0xdc, 0xd3, 0xf0, 0x36, 0xb4, 0xa5, - 0xb3, 0xea, 0xde, 0xa6, 0xb0, 0x79, 0x19, 0xa7, 0xd4, 0x4f, 0x82, 0x38, 0xfc, 0x4e, 0xe7, 0x17, - 0xb9, 0x47, 0xc9, 0x79, 0x05, 0x6d, 0x79, 0x4e, 0x4d, 0x34, 0xca, 0x93, 0xea, 0xa3, 0x08, 0xbc, - 0x2b, 0xd1, 0x0f, 0x6d, 0xd7, 0x2c, 0xb5, 0x1d, 0x26, 0xb0, 0xa1, 0xde, 0xe0, 0x2c, 0xe1, 0x47, - 0x49, 0x16, 0xcf, 0xff, 0xa9, 0x48, 0xf1, 0x8e, 0xcd, 0x87, 0x77, 0x7c, 0x31, 0x85, 0x4e, 0xd1, - 0x75, 0xc8, 0x84, 0xb5, 0xf7, 0xce, 0xd1, 0xf4, 0xf2, 0xe4, 0xe3, 0xa0, 0x81, 0x00, 0x0c, 0xd7, - 0x39, 0x98, 0x7a, 0xce, 0x40, 0x43, 0xff, 0x41, 0xdf, 0xbb, 0xb8, 0x9c, 0x7a, 0x1f, 0xae, 0x54, - 0xaa, 0x89, 0xba, 0xd0, 0x3e, 0x75, 0xdc, 0x63, 0x67, 0xa0, 0x4f, 0x7e, 0xe8, 0x80, 0x3c, 0x51, - 0x5b, 0xdc, 0xc3, 0xb1, 0x2c, 0x8d, 0x3e, 0x41, 0x2b, 0x1f, 0x7f, 0xf4, 0xac, 0xda, 0x5d, 0xe9, - 0x43, 0x32, 0x7c, 0xfe, 0x18, 0x4c, 0x75, 0x67, 0x23, 0x17, 0xce, 0x47, 0xa2, 0x4e, 0xb8, 0x34, - 0xd1, 0x75, 0xc2, 0xe5, 0xc9, 0xc2, 0x0d, 0xf4, 0x05, 0x0c, 0x39, 0x0a, 0x68, 0xa7, 0x66, 0xbe, - 0xcb, 0x93, 0x36, 0x1c, 0x3f, 0x0e, 0x2c, 0xcb, 0xcb, 0xf6, 0xad, 0x93, 0x5f, 0x99, 0xbc, 0x3a, - 0xf9, 0xd5, 0x49, 0xc0, 0x8d, 0x83, 0x00, 0xb6, 0xfc, 0x24, 0xaa, 0x24, 0x1c, 0xf4, 0xd4, 0xc3, - 0x9c, 0xe7, 0x1f, 0xf8, 0x73, 0xed, 0xf3, 0x9b, 0x20, 0xe4, 0xd7, 0xd9, 0xcc, 0xf6, 0x93, 0x68, - 0x37, 0x27, 0xed, 0x96, 0x48, 0x2b, 0x6b, 0x25, 0xb0, 0x2b, 0xfe, 0x0c, 0x8b, 0xd9, 0xcc, 0x10, - 0x8b, 0x97, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x0c, 0x41, 0x0f, 0xf0, 0x33, 0x06, 0x00, 0x00, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xdd, 0x8a, 0xdb, 0x46, + 0x14, 0x5e, 0x59, 0xb6, 0x56, 0x3e, 0xb2, 0xb7, 0xee, 0x10, 0x16, 0xe1, 0x6e, 0x1b, 0x47, 0xa5, + 0x89, 0xe9, 0x85, 0x17, 0x5c, 0xda, 0x86, 0x06, 0x0a, 0xde, 0xc4, 0x49, 0x2f, 0xb2, 0x61, 0x23, + 0xef, 0x52, 0x28, 0x14, 0x31, 0xb6, 0x0e, 0x5a, 0x11, 0x4b, 0xf2, 0x6a, 0x46, 0x69, 0xbd, 0xf4, + 0xb6, 0x8f, 0xd1, 0x67, 0xe8, 0x23, 0xf4, 0x35, 0x7a, 0xd3, 0x77, 0x29, 0xf3, 0x63, 0x59, 0x86, + 0xb5, 0x36, 0x90, 0xbb, 0x99, 0x33, 0xdf, 0xf9, 0xff, 0x74, 0x8e, 0xa0, 0x1b, 0x51, 0x8e, 0xbf, + 0xd1, 0xf5, 0x68, 0x95, 0x67, 0x3c, 0x23, 0x6e, 0x31, 0xc7, 0x7c, 0xc4, 0x8a, 0x79, 0x12, 0xf3, + 0x9b, 0x02, 0x0b, 0x1c, 0xe9, 0x77, 0xef, 0x09, 0x38, 0x17, 0x71, 0x1a, 0xf9, 0x78, 0x53, 0x20, + 0xe3, 0xc4, 0x85, 0xc3, 0x04, 0x19, 0xa3, 0x11, 0xba, 0xc6, 0xc0, 0x18, 0xb6, 0xfd, 0xcd, 0xd5, + 0xfb, 0xd3, 0x80, 0x8e, 0x42, 0xb2, 0x55, 0x96, 0x32, 0xdc, 0x0f, 0x25, 0x8f, 0xa0, 0xc3, 0x30, + 0x7f, 0x1f, 0x2f, 0x30, 0x48, 0x69, 0x82, 0x6e, 0x43, 0x3e, 0x3b, 0x5a, 0xf6, 0x86, 0x26, 0x48, + 0x4e, 0xa0, 0xcd, 0xe3, 0x04, 0x19, 0xa7, 0xc9, 0xca, 0x35, 0x07, 0xc6, 0xd0, 0xf4, 0xb7, 0x02, + 0xd2, 0x07, 0xfb, 0x3a, 0x63, 0x5c, 0x2a, 0x37, 0xa5, 0x72, 0x79, 0xf7, 0x4e, 0xc0, 0x7a, 0x7e, + 0x4d, 0xd3, 0x08, 0x09, 0x81, 0x66, 0x91, 0xc7, 0xcc, 0x35, 0x06, 0xe6, 0xb0, 0xed, 0xcb, 0xb3, + 0xf7, 0x97, 0x01, 0xce, 0x6b, 0x9a, 0x86, 0x9b, 0x7c, 0x1e, 0x40, 0x4b, 0xe6, 0xab, 0x43, 0x54, + 0x17, 0xf2, 0x14, 0xac, 0x85, 0xb4, 0x21, 0x43, 0x73, 0xc6, 0x83, 0xd1, 0xbe, 0xfa, 0x8c, 0x94, + 0x2f, 0x5f, 0xe3, 0xc9, 0x8f, 0x60, 0x33, 0x9e, 0x53, 0x8e, 0xd1, 0x5a, 0x46, 0x76, 0x34, 0xf6, + 0xf6, 0xeb, 0xce, 0x34, 0xd2, 0x2f, 0x75, 0x3c, 0x0f, 0x3a, 0x2a, 0x3c, 0x5d, 0x44, 0x02, 0x4d, + 0x76, 0x13, 0x87, 0x3a, 0x3c, 0x79, 0xf6, 0x9e, 0x41, 0xf7, 0x39, 0x4d, 0x17, 0xb8, 0xdc, 0x24, + 0x71, 0x07, 0x88, 0x1c, 0x83, 0x95, 0x23, 0x65, 0x59, 0xaa, 0xab, 0xab, 0x6f, 0x5e, 0x0f, 0x8e, + 0x36, 0xca, 0xca, 0x85, 0xf7, 0x25, 0x74, 0x67, 0x9c, 0xf2, 0x82, 0xd5, 0x98, 0xf3, 0xfe, 0x35, + 0xe0, 0x68, 0x83, 0xd2, 0xa1, 0x1d, 0x83, 0xc5, 0xa4, 0x44, 0x03, 0xf5, 0x8d, 0x7c, 0x0e, 0xb0, + 0xa4, 0x8c, 0x07, 0x98, 0xe7, 0x59, 0xae, 0xbd, 0xb7, 0x85, 0x64, 0x2a, 0x04, 0xc4, 0x07, 0x3b, + 0x41, 0x4e, 0x43, 0xca, 0xa9, 0x6b, 0x0e, 0xcc, 0xa1, 0x33, 0xfe, 0xae, 0xae, 0x42, 0x55, 0x97, + 0xa3, 0x73, 0xad, 0x38, 0x4d, 0x79, 0xbe, 0xf6, 0x4b, 0x3b, 0xfd, 0x67, 0xd0, 0xdd, 0x79, 0x22, + 0x3d, 0x30, 0xdf, 0xe1, 0x5a, 0x07, 0x26, 0x8e, 0xa2, 0xd1, 0xef, 0xe9, 0xb2, 0xd8, 0x90, 0x4d, + 0x5d, 0x7e, 0x68, 0x3c, 0x35, 0xbc, 0x7f, 0x04, 0x25, 0x62, 0xc6, 0xeb, 0x29, 0xe1, 0x41, 0x97, + 0x71, 0x9a, 0xf3, 0x40, 0xb0, 0x30, 0x48, 0x98, 0xb4, 0x63, 0xfa, 0x8e, 0x14, 0x5e, 0xc6, 0x09, + 0x9e, 0x33, 0xf2, 0x05, 0x38, 0x98, 0x86, 0x25, 0x42, 0xd3, 0x16, 0xd3, 0x50, 0xbf, 0xf7, 0x05, + 0x39, 0x44, 0x42, 0xc8, 0xdc, 0xa6, 0x24, 0x65, 0x79, 0x27, 0x9f, 0x41, 0x7b, 0x45, 0x23, 0x0c, + 0x58, 0x7c, 0x8b, 0x6e, 0x6b, 0x60, 0x0c, 0x5b, 0xbe, 0x2d, 0x04, 0xb3, 0xf8, 0x16, 0x45, 0x49, + 0xe5, 0x23, 0xcf, 0xde, 0x61, 0xea, 0x5a, 0xaa, 0xa4, 0x42, 0x72, 0x29, 0x04, 0xde, 0xdf, 0x26, + 0x1c, 0xe9, 0xe8, 0x67, 0x45, 0x92, 0xd0, 0x7c, 0x7d, 0x27, 0x25, 0xca, 0xc4, 0x1a, 0xd5, 0xc4, + 0x1e, 0x82, 0xa3, 0xb8, 0x1b, 0xc8, 0x8f, 0xc5, 0x94, 0x71, 0x81, 0x12, 0x5d, 0xe5, 0x31, 0xab, + 0xf4, 0xb9, 0x59, 0xd3, 0xe7, 0x56, 0x5d, 0x9f, 0xad, 0xfb, 0xfa, 0xbc, 0x1b, 0xfd, 0xbe, 0x3e, + 0x97, 0x4d, 0xc0, 0x30, 0xa0, 0x5c, 0x94, 0xf8, 0xb0, 0xd2, 0x04, 0x0c, 0x27, 0xfc, 0x9c, 0x09, + 0x4c, 0xb1, 0x0a, 0xe9, 0x16, 0x63, 0x2b, 0x8c, 0x16, 0x4a, 0xcc, 0x63, 0xf8, 0x64, 0x91, 0x25, + 0xab, 0x25, 0x6e, 0x51, 0x6d, 0x89, 0xea, 0x96, 0x62, 0x89, 0xeb, 0x83, 0xcd, 0x31, 0x4f, 0xe2, + 0x94, 0x2e, 0x5d, 0x18, 0x18, 0x43, 0xdb, 0x2f, 0xef, 0x1f, 0xc7, 0xb9, 0x3f, 0xa0, 0xa3, 0x28, + 0xa7, 0xbf, 0xa5, 0x17, 0x60, 0xe7, 0xaa, 0x04, 0x6a, 0x5c, 0x39, 0xe3, 0xe1, 0x87, 0x16, 0xcb, + 0x2f, 0x35, 0x45, 0x5a, 0x29, 0xfe, 0xce, 0x83, 0x0a, 0x57, 0x94, 0xe7, 0xae, 0x10, 0x5f, 0x94, + 0x7c, 0x79, 0x04, 0x2d, 0xd5, 0xa3, 0xfd, 0xd3, 0x1c, 0xe1, 0xf8, 0x2a, 0xcd, 0x71, 0x91, 0x45, + 0x69, 0x7c, 0x8b, 0xe1, 0x5b, 0x11, 0x80, 0xd2, 0xf9, 0x16, 0x5a, 0xaa, 0xe3, 0x86, 0x1c, 0x8d, + 0x0f, 0xf7, 0xc7, 0x29, 0xf1, 0xbe, 0x42, 0xdf, 0x4d, 0x3e, 0x8f, 0xc2, 0x03, 0x9d, 0xcd, 0x9b, + 0x8c, 0xbf, 0xcc, 0x8a, 0x34, 0xfc, 0x28, 0x27, 0x1b, 0xd6, 0x37, 0xb6, 0xac, 0xff, 0x7a, 0x02, + 0xf6, 0x66, 0xce, 0x12, 0x07, 0x0e, 0x5f, 0x4c, 0x5f, 0x4e, 0xae, 0x5e, 0x5f, 0xf6, 0x0e, 0x08, + 0x80, 0xe5, 0x4f, 0xcf, 0x26, 0xb3, 0x69, 0xcf, 0x20, 0x9f, 0x42, 0x77, 0xf6, 0xf6, 0x6a, 0x32, + 0xfb, 0x29, 0xd0, 0xa2, 0x06, 0x69, 0x43, 0xeb, 0x7c, 0xea, 0xbf, 0x9a, 0xf6, 0xcc, 0xf1, 0x7f, + 0x26, 0x90, 0x99, 0xf4, 0x2d, 0xeb, 0xf0, 0x4a, 0xb9, 0x26, 0x3f, 0x43, 0x53, 0x2c, 0x3c, 0xf2, + 0xd5, 0xfe, 0xe8, 0x2a, 0xab, 0xb3, 0xff, 0xf8, 0x3e, 0x98, 0x9e, 0xc7, 0x07, 0xc2, 0xb0, 0x58, + 0x02, 0x75, 0x86, 0x2b, 0x3b, 0xac, 0xce, 0x70, 0x75, 0x97, 0x78, 0x07, 0xe4, 0x57, 0xb0, 0xd4, + 0xf0, 0x27, 0x4f, 0x6a, 0x36, 0x5a, 0x75, 0xb7, 0xf4, 0x87, 0xf7, 0x03, 0xab, 0xe6, 0xd5, 0xc0, + 0xae, 0x33, 0xbf, 0xb3, 0x6b, 0xea, 0xcc, 0xef, 0xce, 0x7e, 0x5d, 0x96, 0x98, 0xf1, 0xda, 0xb2, + 0x6c, 0xe7, 0x78, 0x6d, 0x59, 0x2a, 0xdf, 0x9e, 0x77, 0x70, 0x16, 0xc1, 0xc9, 0x22, 0x4b, 0xf6, + 0xc2, 0xcf, 0x3a, 0xba, 0xe3, 0x17, 0xe2, 0x5f, 0xe9, 0xc2, 0xf8, 0xe5, 0xfb, 0x28, 0xe6, 0xd7, + 0xc5, 0x7c, 0xb4, 0xc8, 0x92, 0x53, 0xa1, 0x74, 0x5a, 0x51, 0xda, 0x39, 0x6b, 0x03, 0xa7, 0xf2, + 0x27, 0x6b, 0x35, 0x9f, 0x5b, 0xf2, 0xf0, 0xcd, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff, 0x70, 0x49, + 0x9e, 0x6c, 0x7e, 0x09, 0x00, 0x00, }, } diff --git a/submitqueue/gateway/protopb/gateway_grpc.pb.go b/submitqueue/gateway/protopb/gateway_grpc.pb.go index f271309f..c1211101 100644 --- a/submitqueue/gateway/protopb/gateway_grpc.pb.go +++ b/submitqueue/gateway/protopb/gateway_grpc.pb.go @@ -38,6 +38,7 @@ const ( SubmitQueueGateway_Land_FullMethodName = "/uber.submitqueue.gateway.SubmitQueueGateway/Land" SubmitQueueGateway_Cancel_FullMethodName = "/uber.submitqueue.gateway.SubmitQueueGateway/Cancel" SubmitQueueGateway_Status_FullMethodName = "/uber.submitqueue.gateway.SubmitQueueGateway/Status" + SubmitQueueGateway_List_FullMethodName = "/uber.submitqueue.gateway.SubmitQueueGateway/List" ) // SubmitQueueGatewayClient is the client API for SubmitQueueGateway service. @@ -66,6 +67,8 @@ type SubmitQueueGatewayClient interface { // Status returns the current status of a previously submitted request, identified by its sqid. // The status is eventually consistent with the request store and reconciled from the append-only request log. Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + // List returns request summaries for one queue whose lifecycles overlap a time window. + List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) } type submitQueueGatewayClient struct { @@ -116,6 +119,16 @@ func (c *submitQueueGatewayClient) Status(ctx context.Context, in *StatusRequest return out, nil } +func (c *submitQueueGatewayClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListResponse) + err := c.cc.Invoke(ctx, SubmitQueueGateway_List_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // SubmitQueueGatewayServer is the server API for SubmitQueueGateway service. // All implementations must embed UnimplementedSubmitQueueGatewayServer // for forward compatibility. @@ -142,6 +155,8 @@ type SubmitQueueGatewayServer interface { // Status returns the current status of a previously submitted request, identified by its sqid. // The status is eventually consistent with the request store and reconciled from the append-only request log. Status(context.Context, *StatusRequest) (*StatusResponse, error) + // List returns request summaries for one queue whose lifecycles overlap a time window. + List(context.Context, *ListRequest) (*ListResponse, error) mustEmbedUnimplementedSubmitQueueGatewayServer() } @@ -164,6 +179,9 @@ func (UnimplementedSubmitQueueGatewayServer) Cancel(context.Context, *CancelRequ func (UnimplementedSubmitQueueGatewayServer) Status(context.Context, *StatusRequest) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") } +func (UnimplementedSubmitQueueGatewayServer) List(context.Context, *ListRequest) (*ListResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method List not implemented") +} func (UnimplementedSubmitQueueGatewayServer) mustEmbedUnimplementedSubmitQueueGatewayServer() {} func (UnimplementedSubmitQueueGatewayServer) testEmbeddedByValue() {} @@ -257,6 +275,24 @@ func _SubmitQueueGateway_Status_Handler(srv interface{}, ctx context.Context, de return interceptor(ctx, in, info, handler) } +func _SubmitQueueGateway_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SubmitQueueGatewayServer).List(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SubmitQueueGateway_List_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SubmitQueueGatewayServer).List(ctx, req.(*ListRequest)) + } + return interceptor(ctx, in, info, handler) +} + // SubmitQueueGateway_ServiceDesc is the grpc.ServiceDesc for SubmitQueueGateway service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -280,6 +316,10 @@ var SubmitQueueGateway_ServiceDesc = grpc.ServiceDesc{ MethodName: "Status", Handler: _SubmitQueueGateway_Status_Handler, }, + { + MethodName: "List", + Handler: _SubmitQueueGateway_List_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "gateway.proto",