Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/rfc/submitqueue/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
| **buildsignal** | Build | speculate | Feed CI result back into speculation |
| **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue |
| **conclude** | BatchID | — | Map terminal batch state to request state |
| **log** | RequestLog | — | Append-only sink for request log events |
| **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage |

## DLQ reconciliation

Every primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress".
Every *consumed* primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The `log` topic is the exception: the orchestrator only publishes to it (the gateway is the sole consumer that persists the request log), so it has no orchestrator-side subscription and therefore no DLQ. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress".

The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel, log) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery.
The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery.

DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator.

Expand Down
2 changes: 2 additions & 0 deletions example/submitqueue/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ services:
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
# Path to YAML queue configuration baked into the image
- QUEUE_CONFIG_PATH=/root/queues.yaml
# Stable subscriber name for the request-log consumer
- HOSTNAME=gateway-dev
depends_on:
mysql-app:
condition: service_healthy
Expand Down
5 changes: 5 additions & 0 deletions example/submitqueue/gateway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ go_library(
importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server",
visibility = ["//visibility:private"],
deps = [
"//core/errs",
"//core/errs/generic",
"//core/errs/mysql",
"//extension/counter/mysql",
"//extension/messagequeue",
"//extension/messagequeue/mysql",
"//submitqueue/core/consumer",
"//submitqueue/extension/queueconfig/yaml",
"//submitqueue/extension/storage/mysql",
"//submitqueue/gateway/controller",
"//submitqueue/gateway/controller/log",
"//submitqueue/gateway/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
2 changes: 2 additions & 0 deletions example/submitqueue/gateway/server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ services:
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
# Path to YAML queue configuration baked into the image
- QUEUE_CONFIG_PATH=/root/queues.yaml
# Stable subscriber name for the request-log consumer
- HOSTNAME=gateway-dev
depends_on:
mysql-app:
condition: service_healthy
Expand Down
83 changes: 78 additions & 5 deletions example/submitqueue/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/errs"
genericerrs "github.com/uber/submitqueue/core/errs/generic"
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
extqueue "github.com/uber/submitqueue/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/submitqueue/core/consumer"
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/submitqueue/gateway/controller"
logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log"
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -174,12 +179,35 @@ func run() error {
zap.String("queue_dsn", queueDSN),
)

// Build a publish-only topic registry: gateway only feeds the start of the
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel).
// No subscription is configured because the gateway never consumes from the queue.
// Subscriber name for the log-topic consumer. It must be unique per running
// instance: SubscriberName identifies a subscriber for partition leases, so
// two gateway processes on the same host (sharing HOSTNAME) would otherwise
// contend for the same lease. Append the PID to keep co-located instances
// distinct; the PID is stable for the life of the process. Offset tracking
// stays keyed on the shared ConsumerGroup ("gateway-log"), not this name.
// Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs).
hostname := os.Getenv("HOSTNAME")
if hostname == "" {
hostname = fmt.Sprintf("gateway-%d", time.Now().Unix())
}
subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid())

// Build the topic registry. The gateway publishes to the start of the
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) —
// both publish-only. It additionally consumes the log topic (TopicKeyLog):
// the gateway is the sole writer of the request log, persisting entries that
// the orchestrator publishes there.
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue},
{Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue},
{
Key: consumer.TopicKeyLog,
Name: "log",
Queue: mysqlQueue,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "gateway-log",
),
},
})
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
Expand All @@ -201,7 +229,8 @@ func run() error {

// Initialize storage from the shared app database connection. The land
// controller writes to this store directly; cancel/status use the request
// log store directly.
// log store directly. The log consumer (registered below) is the sole
// persister of request log entries published by the orchestrator.
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
Expand Down Expand Up @@ -236,6 +265,31 @@ func run() error {
// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Create the queue consumer and register the log controller. The gateway is
// the sole persister of the request log: the orchestrator publishes entries
// to the log topic and this consumer writes them to storage.
logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
errs.NewClassifierProcessor(
// Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql)
// both run on the same MySQL driver, so a single classifier covers
// errors surfaced from either backend.
genericerrs.Classifier,
mysqlerrs.Classifier,
),
)

logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log")
if err := logConsumer.Register(logController); err != nil {
return fmt.Errorf("failed to register log controller: %w", err)
}

if err := logConsumer.Start(ctx); err != nil {
// The error can also be a result of a context cancellation due to SIGINT or SIGTERM.
// This is expected, just propagate it.
return fmt.Errorf("failed to start log consumer: %w", err)
}
logger.Info("log consumer started")

// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
Expand All @@ -257,6 +311,8 @@ func run() error {

// Wait for interrupt signal or server critical error
// If interruption is signaled, gracefully stop the server
// If the server exits with an error, cancel the context to signal the consumer
// After this, stop the consumer
// If an error happens during shutdown, return the actual error, not the context cancellation error
var serverErr error
select {
Expand All @@ -273,10 +329,27 @@ func run() error {
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down gateway server due to critical GRPC server error...")

// Cancel the context to signal cancellation to the queue consumer
cancel()
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

// Stop the consumer with a 30s timeout; by this time the context should be
// cancelled and the processing threads may already be exiting; recollect them.
errStop := logConsumer.Stop(30000)
if errStop != nil {
errStop = fmt.Errorf("failed to stop consumer: %w", errStop)
}

if errStop != nil || serverErr != nil {
// Override context cancellation error with the shutdown error. The server
// error is the primary/root failure, so it leads; the consumer-stop error
// is secondary cleanup.
err = errors.Join(serverErr, errStop)
}

return err
Expand Down
1 change: 0 additions & 1 deletion example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ go_library(
"//submitqueue/orchestrator/controller/cancel",
"//submitqueue/orchestrator/controller/conclude",
"//submitqueue/orchestrator/controller/dlq",
"//submitqueue/orchestrator/controller/log",
"//submitqueue/orchestrator/controller/merge",
"//submitqueue/orchestrator/controller/score",
"//submitqueue/orchestrator/controller/speculate",
Expand Down
28 changes: 11 additions & 17 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import (
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq"
logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/score"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate"
Expand Down Expand Up @@ -382,7 +381,6 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
{consumer.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"},
{consumer.TopicKeyMerge, "merge", "orchestrator-merge"},
{consumer.TopicKeyConclude, "conclude", "orchestrator-conclude"},
{consumer.TopicKeyLog, "log", "orchestrator-log"},
}

configs := make([]consumer.TopicConfig, 0, 2*len(primaryTopics))
Expand Down Expand Up @@ -419,6 +417,16 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
})
}

// Publish-only: the orchestrator emits request log entries to the log
// topic but never persists them. The gateway is the sole consumer that
// writes the request log to storage, so the orchestrator registers no
// consuming subscription (and therefore no log DLQ) for this topic.
configs = append(configs, consumer.TopicConfig{
Key: consumer.TopicKeyLog,
Name: "log",
Queue: q,
})

return consumer.NewTopicRegistry(configs)
}

Expand Down Expand Up @@ -651,26 +659,13 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
}
count++

logController := logctrl.NewController(
logger,
scope,
store,
consumer.TopicKeyLog,
"orchestrator-log",
)
if err := c.Register(logController); err != nil {
return count, fmt.Errorf("failed to register log controller: %w", err)
}
count++

return count, nil
}

// registerDLQControllers creates one DLQ reconciler per primary stage and
// registers them with the DLQ consumer. Each reconciler drives the affected
// request or batch into a terminal Error/Failed state so the gateway stops
// reporting it as stuck-in-progress. The log DLQ is a metric-only no-op (log
// entries are observability, not pipeline state).
// reporting it as stuck-in-progress.
func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage) (int, error) {
dlqScope := scope.SubScope("dlq")
dlqRegs := []struct {
Expand All @@ -687,7 +682,6 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop
{"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")},
{"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq")},
{"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyConclude), "orchestrator-conclude-dlq")},
{"log_dlq", dlq.NewDLQLogController(logger, dlqScope, dlq.TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq")},
}
var count int
for _, reg := range dlqRegs {
Expand Down
26 changes: 25 additions & 1 deletion submitqueue/gateway/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
SubmitQueue Gateway
# SubmitQueue Gateway

The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`,
`Status`, and `Ping` calls, validates them at the edge, and hands work off to the
orchestrator pipeline asynchronously via the message queue.

## Request log ownership

The gateway is the **sole owner of the request log** — the only service that
both writes and reads it. No other service persists or reads request log
entries:

- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on
`Cancel`), the gateway writes directly to storage so the entry is visible the
moment the RPC returns.
- For statuses produced downstream, the orchestrator only *publishes* entries to
the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a
consumer that drains the `log` topic and persists each entry to storage.

Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request
log directly from storage. The orchestrator only *publishes* log entries and
never touches the request log store.

This keeps a single service responsible for the request log while letting the
orchestrator remain free of storage writes for it.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log",
srcs = ["log.go"],
importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log",
importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// Controller handles log queue messages.
// It consumes request log entries and persists them to storage.
// Implements consumer.Controller interface for integration with the consumer.
//
// The request log is written exclusively by the gateway: other services
// (e.g. the orchestrator) only publish log entries to the log topic, and this
// controller is the single consumer that persists them to storage.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
Expand All @@ -40,7 +44,7 @@ type Controller struct {
// Verify Controller implements consumer.Controller interface at compile time.
var _ consumer.Controller = (*Controller)(nil)

// NewController creates a new log controller for the orchestrator.
// NewController creates a new log controller for the gateway.
func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock
logger := zaptest.NewLogger(t).Sugar()
scope := tally.NoopScope

return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log")
return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log")
}

func TestController_Process(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/submitqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_test(
"integration",
],
deps = [
"//submitqueue/entity",
"//submitqueue/gateway/protopb",
"//submitqueue/orchestrator/protopb",
"//test/testutil",
Expand Down
Loading
Loading