Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion example/submitqueue/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type GatewayServer struct {
landController *controller.LandController
cancelController *controller.CancelController
statusController *controller.StatusController
listController *controller.ListController
}

// Ping delegates to the controller
Expand All @@ -76,6 +77,11 @@ func (s *GatewayServer) Status(ctx context.Context, req *pb.StatusRequest) (*pb.
return s.statusController.Status(ctx, req)
}

// List delegates to the controller
func (s *GatewayServer) List(ctx context.Context, req *pb.ListRequest) (*pb.ListResponse, error) {
return s.listController.List(ctx, req)
}

func main() {
code := 0
if err := run(); err != nil {
Expand Down Expand Up @@ -236,6 +242,7 @@ func run() error {
return fmt.Errorf("failed to create storage: %w", err)
}
requestLogStore := store.GetRequestLogStore()
requestSummaryStore := store.GetRequestSummaryStore()

// Load queue configurations from YAML. Path is required so the gateway
// can reject requests for unknown queues at the edge.
Expand All @@ -251,13 +258,15 @@ func run() error {
// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, cnt, store, queueConfigs, registry)
cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry)
cancelController := controller.NewCancelController(logger.Sugar(), scope, store, registry)
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
listController := controller.NewListController(logger.Sugar(), scope, requestSummaryStore, queueConfigs)
gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
cancelController: cancelController,
statusController: statusController,
listController: listController,
}

pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)
Expand Down
1 change: 1 addition & 0 deletions submitqueue/core/request/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"log.go",
"request.go",
"store.go",
],
importpath = "github.com/uber/submitqueue/submitqueue/core/request",
visibility = ["//visibility:public"],
Expand Down
34 changes: 34 additions & 0 deletions submitqueue/core/request/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package request

import (
"context"
"fmt"

"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/storage"
)

// PersistLog inserts the immutable request log entry and updates the request summary read model.
func PersistLog(ctx context.Context, store storage.Storage, log entity.RequestLog) error {
if err := store.GetRequestLogStore().Insert(ctx, log); err != nil {
return fmt.Errorf("failed to insert request log for request_id=%s: %w", log.RequestID, err)
}
if err := store.GetRequestSummaryStore().UpsertFromLog(ctx, log); err != nil {
return fmt.Errorf("failed to upsert request summary for request_id=%s: %w", log.RequestID, err)
}
return nil
}
1 change: 1 addition & 0 deletions submitqueue/entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"queue_config.go",
"request.go",
"request_log.go",
"request_summary.go",
"speculation_tree.go",
],
importpath = "github.com/uber/submitqueue/submitqueue/entity",
Expand Down
66 changes: 66 additions & 0 deletions submitqueue/entity/request_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package entity

import (
"encoding/json"
"strings"
"time"
)

Expand Down Expand Up @@ -96,6 +97,10 @@ const (
type RequestLog struct {
// RequestID is the ID of the request this log entry belongs to. References entity.Request.ID.
RequestID string `json:"request_id"`
// Queue is the queue this request belongs to. New log producers should set it explicitly.
Queue string `json:"queue"`
// ChangeURIs are the original change URIs submitted with the request. They are populated by gateway-originated accepted logs.
ChangeURIs []string `json:"change_uris"`
// TimestampMs is the time this log entry was created, in milliseconds since Unix epoch.
TimestampMs int64 `json:"timestamp_ms"`
// Status is the request status at the time this log entry was created. It may contain requests states from the state machine and also display-friendly intermediate statuses.
Expand All @@ -117,11 +122,24 @@ type RequestLog struct {
// lastError is the last error message associated with the status at the time of this log entry, empty string if no error.
// metadata is a set of key-value pairs providing additional context for this log entry. Not constrained to any specific format or schema, used for display or debugging purposes.
func NewRequestLog(requestID string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
return NewRequestLogWithDetails(requestID, QueueFromRequestID(requestID), nil, status, requestVersion, lastError, metadata)
}

// NewRequestLogWithDetails creates a new RequestLog with queue and change information for list summaries.
func NewRequestLogWithDetails(requestID, queue string, changeURIs []string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
if metadata == nil {
metadata = make(map[string]string)
}
if queue == "" {
queue = QueueFromRequestID(requestID)
}
if changeURIs == nil {
changeURIs = []string{}
}
return RequestLog{
RequestID: requestID,
Queue: queue,
ChangeURIs: changeURIs,
TimestampMs: time.Now().UnixMilli(),
Status: status,
RequestVersion: requestVersion,
Expand All @@ -146,5 +164,53 @@ func RequestLogFromBytes(data []byte) (RequestLog, error) {
if log.Metadata == nil {
log.Metadata = make(map[string]string)
}
if log.ChangeURIs == nil {
log.ChangeURIs = []string{}
}
if log.Queue == "" {
log.Queue = QueueFromRequestID(log.RequestID)
}
return log, nil
}

// QueueFromRequestID extracts the queue from the current "<queue>/<number>" request ID format.
// It strips only a trailing numeric path segment so queue names may contain slashes.
func QueueFromRequestID(requestID string) string {
idx := strings.LastIndex(requestID, "/")
if idx <= 0 || idx == len(requestID)-1 {
return ""
}
for _, r := range requestID[idx+1:] {
if r < '0' || r > '9' {
return ""
}
}
return requestID[:idx]
}

// IsKnownRequestStatus returns true if status is a public request status emitted by SubmitQueue.
func IsKnownRequestStatus(status RequestStatus) bool {
switch status {
case RequestStatusAccepted,
RequestStatusStarted,
RequestStatusValidating,
RequestStatusValidated,
RequestStatusBatching,
RequestStatusBatched,
RequestStatusScored,
RequestStatusSpeculating,
RequestStatusSpeculated,
RequestStatusBuilding,
RequestStatusBuilt,
RequestStatusWaitingPath,
RequestStatusLanding,
RequestStatusProcessing,
RequestStatusLanded,
RequestStatusError,
RequestStatusCancelling,
RequestStatusCancelled:
return true
default:
return false
}
}
17 changes: 17 additions & 0 deletions submitqueue/entity/request_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func TestRequestLog_ToBytes(t *testing.T) {
func TestRequestLogFromBytes(t *testing.T) {
original := RequestLog{
RequestID: "my-queue/999",
Queue: "my-queue",
ChangeURIs: []string{"github://uber/repo/pull/1/abcdef"},
TimestampMs: 1709568000000,
Status: RequestStatusProcessing,
RequestVersion: 3,
Expand All @@ -65,6 +67,8 @@ func TestRequestLogFromBytes(t *testing.T) {
require.NoError(t, err)

assert.Equal(t, original.RequestID, deserialized.RequestID)
assert.Equal(t, original.Queue, deserialized.Queue)
assert.Equal(t, original.ChangeURIs, deserialized.ChangeURIs)
assert.Equal(t, original.TimestampMs, deserialized.TimestampMs)
assert.Equal(t, original.Status, deserialized.Status)
assert.Equal(t, original.RequestVersion, deserialized.RequestVersion)
Expand Down Expand Up @@ -103,6 +107,8 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
name: "with all fields populated",
log: RequestLog{
RequestID: "queue1/100",
Queue: "queue1",
ChangeURIs: []string{},
TimestampMs: 1709568000000,
Status: RequestStatusLanded,
RequestVersion: 5,
Expand All @@ -114,6 +120,8 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
name: "with error",
log: RequestLog{
RequestID: "queue2/200",
Queue: "queue2",
ChangeURIs: []string{},
TimestampMs: 1709568001000,
Status: RequestStatusError,
RequestVersion: 2,
Expand All @@ -125,6 +133,8 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
name: "with zero version",
log: RequestLog{
RequestID: "queue3/300",
Queue: "queue3",
ChangeURIs: []string{},
TimestampMs: 1709568002000,
Status: RequestStatusStarted,
RequestVersion: 0,
Expand All @@ -146,3 +156,10 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
})
}
}

func TestQueueFromRequestID(t *testing.T) {
assert.Equal(t, "queue", QueueFromRequestID("queue/100"))
assert.Equal(t, "org/queue", QueueFromRequestID("org/queue/100"))
assert.Empty(t, QueueFromRequestID("queue/not-a-number"))
assert.Empty(t, QueueFromRequestID("queue"))
}
29 changes: 29 additions & 0 deletions submitqueue/entity/request_summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

// RequestSummary is the gateway-owned current view used for queue/time-window listing.
type RequestSummary struct {
RequestID string
Queue string
ChangeURIs []string
Status RequestStatus
LastError string
Metadata map[string]string
StartedAtMs int64
UpdatedAtMs int64
CompletedAtMs int64
Terminal bool
}
1 change: 1 addition & 0 deletions submitqueue/extension/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"change_store.go",
"request_log_store.go",
"request_store.go",
"request_summary_store.go",
"speculation_tree_store.go",
"storage.go",
],
Expand Down
1 change: 1 addition & 0 deletions submitqueue/extension/storage/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"change_store_mock.go",
"request_log_store_mock.go",
"request_store_mock.go",
"request_summary_store_mock.go",
"speculation_tree_store_mock.go",
"storage_mock.go",
],
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions submitqueue/extension/storage/mock/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading