Skip to content
Merged
2 changes: 0 additions & 2 deletions charts/substrate/templates/ate-client.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/}}

{{- if eq .Values.auth.mode "jwt" }}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "substrate.fullname" (list "ate-client" .) }}
namespace: {{ .Release.Namespace }}
labels:
apps: ate-client
{{- end }}
8 changes: 7 additions & 1 deletion charts/substrate/templates/ate-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ spec:
containers:
- name: ate-controller
image: {{ include "substrate.componentImage" (list "atecontroller" .) }}
{{- if eq .Values.auth.mode "jwt" }}
args:
# The atecontroller binary defaults --ateapi-conn-spec to
# dns:///api.ate-system.svc:443, which is correct only for the
# canonical render (release name "substrate" in namespace
# "ate-system"). Pass the chart-resolved Service so the controller
# dials the right backend when substrate is installed as a subchart.
- "--ateapi-conn-spec=dns:///{{ include "substrate.fullname" (list "api" .) }}.{{ .Release.Namespace }}.svc:443"
{{- if eq .Values.auth.mode "jwt" }}
- "--ateapi-auth=jwt"
- "--ateapi-ca-file=/run/ateapi-ca/ca.crt"
- "--ateapi-server-name={{ include "substrate.fullname" (list "api" .) }}.{{ .Release.Namespace }}.svc"
Expand Down
10 changes: 10 additions & 0 deletions charts/substrate/templates/atenet-dns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ spec:
- "--log-level=debug"
- "--interval=10s"
- "--corefile-path=/etc/coredns/Corefile"
# Pass the chart-resolved Service names so the controller looks up the
# correct objects when substrate is installed as a subchart. The
# system namespace is read from POD_NAMESPACE below.
- "--router-service-name={{ include "substrate.fullname" (list "atenet-router" .) }}"
- "--dns-service-name={{ include "substrate.fullname" (list "dns" .) }}"
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: dns-config-volume
mountPath: /etc/coredns
Expand Down
3 changes: 3 additions & 0 deletions charts/substrate/templates/atenet-router.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ spec:
- "--standalone"
- "--networking-mode=agentgateway"
- "--namespace={{ .Release.Namespace }}"
# Pass the chart-resolved router Service name so /statusz looks up the
# correct Service when substrate is installed as a subchart.
- "--router-service-name={{ include "substrate.fullname" (list "atenet-router" .) }}"
- "--port-http=8080"
- "--port-extproc=50051"
- "--extproc-address=127.0.0.1"
Expand Down
3 changes: 2 additions & 1 deletion cmd/ateapi/internal/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/agent-substrate/substrate/cmd/ateapi/internal/workercache"
"github.com/agent-substrate/substrate/internal/ateinterceptors"
"github.com/agent-substrate/substrate/internal/envtestbins"
"github.com/agent-substrate/substrate/internal/installdefaults"
"github.com/agent-substrate/substrate/internal/proto/ateletpb"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
"github.com/agent-substrate/substrate/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -273,7 +274,7 @@ func setupTest(t *testing.T, ns string) *testContext {

// 3. Initialize Informers
workerFactory, workerInformer := WorkerPodInformer(k8sClient)
ateletFactory, ateletInformer := AteletInformer(k8sClient)
ateletFactory, ateletInformer := AteletInformer(k8sClient, installdefaults.SystemNamespace)

substrateInformerFactory := externalversions.NewSharedInformerFactory(substrateClient, 0)
actorTemplateLister := substrateInformerFactory.Api().V1alpha1().ActorTemplates().Lister()
Expand Down
6 changes: 3 additions & 3 deletions cmd/ateapi/internal/controlapi/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import (
)

const (
ateletNamespace = "ate-system"
byNamespaceAndName = "by-namespace-and-name"
byWorkerPool = "by-worker-pool"
byNode = "by-node"
workerPodLabel = "ate.dev/worker-pool"
)

// AteletInformer creates a SharedInformerFactory and SharedIndexInformer for Atelet pods.
func AteletInformer(kc kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer) {
// AteletInformer creates a SharedInformerFactory and SharedIndexInformer for
// Atelet pods in the given namespace.
func AteletInformer(kc kubernetes.Interface, ateletNamespace string) (informers.SharedInformerFactory, cache.SharedIndexInformer) {
factory := informers.NewSharedInformerFactoryWithOptions(kc, 0,
informers.WithNamespace(ateletNamespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
Expand Down
8 changes: 7 additions & 1 deletion cmd/ateapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/agent-substrate/substrate/cmd/ateapi/internal/workercache"
"github.com/agent-substrate/substrate/internal/ateapiauth"
"github.com/agent-substrate/substrate/internal/ateinterceptors"
"github.com/agent-substrate/substrate/internal/installdefaults"
"github.com/agent-substrate/substrate/internal/k8sjwt"
"github.com/agent-substrate/substrate/internal/serverboot"
"github.com/agent-substrate/substrate/internal/version"
Expand Down Expand Up @@ -135,8 +136,13 @@ func main() {
workerPoolLister := ateFactory.Api().V1alpha1().WorkerPools().Lister()
sandboxConfigLister := ateFactory.Api().V1alpha1().SandboxConfigs().Lister()

// atelet shares ateapi's namespace in every supported deployment topology,
// so we read it from Kubernetes' downward API rather than expose a flag.
ateletNamespace := installdefaults.NamespaceFromPodEnv()
slog.InfoContext(ctx, "Resolved atelet namespace", slog.String("atelet-namespace", ateletNamespace))

workerPodInformerFactory, workerPodInformer := controlapi.WorkerPodInformer(clientset)
ateletPodInformerFactory, ateletPodInformer := controlapi.AteletInformer(clientset)
ateletPodInformerFactory, ateletPodInformer := controlapi.AteletInformer(clientset, ateletNamespace)

syncer := controlapi.NewWorkerPoolSyncer(redisPersistence, workerPodInformer)
syncer.Start(ctx)
Expand Down
22 changes: 18 additions & 4 deletions cmd/atenet/internal/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/config"

"github.com/agent-substrate/substrate/cmd/atenet/internal/dns"
"github.com/agent-substrate/substrate/internal/installdefaults"
)

type DnsConfig struct {
LogLevel string
Kubeconfig string
ReconcileInterval time.Duration
CorefilePath string
RouterServiceName string
DNSServiceName string
}

func NewDnsCmd() *cobra.Command {
Expand Down Expand Up @@ -86,11 +89,20 @@ func NewDnsCmd() *cobra.Command {
return fmt.Errorf("failed to initialize cluster client: %w", err)
}

// atenet shares its namespace with atenet-router and substrate's
// CoreDNS in every supported deployment topology, so we read it
// from Kubernetes' downward API rather than expose a flag.
systemNamespace := installdefaults.NamespaceFromPodEnv()
slog.InfoContext(ctx, "Resolved system namespace", slog.String("system-namespace", systemNamespace))

dnsController := &dns.Controller{
Client: k8sClient,
Interval: cfg.ReconcileInterval,
CorefilePath: cfg.CorefilePath,
Reloader: dns.NewConfigReloader(),
Client: k8sClient,
Interval: cfg.ReconcileInterval,
CorefilePath: cfg.CorefilePath,
Reloader: dns.NewConfigReloader(),
SystemNamespace: systemNamespace,
RouterServiceName: cfg.RouterServiceName,
DNSServiceName: cfg.DNSServiceName,
}

slog.InfoContext(ctx, "Starting DNS Controller subsystem")
Expand All @@ -102,6 +114,8 @@ func NewDnsCmd() *cobra.Command {
cmd.Flags().StringVar(&cfg.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig configuration file")
cmd.Flags().DurationVar(&cfg.ReconcileInterval, "interval", 10*time.Second, "Interval for reconciling DNS configurations")
cmd.Flags().StringVar(&cfg.CorefilePath, "corefile-path", "/etc/coredns/Corefile", "Path to the local Corefile configuration on shared volume")
cmd.Flags().StringVar(&cfg.RouterServiceName, "router-service-name", installdefaults.RouterServiceName, "Service name of the atenet-router. Override when the deployment renames the Service.")
cmd.Flags().StringVar(&cfg.DNSServiceName, "dns-service-name", installdefaults.DNSServiceName, "Service name of substrate's CoreDNS. Override when the deployment renames the Service.")

return cmd
}
35 changes: 21 additions & 14 deletions cmd/atenet/internal/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,23 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
// serviceName is the name of the CoreDNS service.
serviceName = "dns"
systemNamespace = "ate-system"
)

// Controller manages the DNS configuration for the ATE.
type Controller struct {
Client client.Client
Interval time.Duration
CorefilePath string
Reloader ConfigReloader

// SystemNamespace is the namespace where atenet-router and the substrate
// CoreDNS Service live. Defaults to installdefaults.SystemNamespace.
SystemNamespace string
// RouterServiceName is the Service name of the atenet-router that the
// CoreDNS Corefile forwards actor traffic to. Defaults to
// installdefaults.RouterServiceName.
RouterServiceName string
// DNSServiceName is the Service name of substrate's CoreDNS. Defaults to
// installdefaults.DNSServiceName.
DNSServiceName string
}

// Run the DNS orchestration loop until ctx is canceled.
Expand All @@ -71,14 +76,15 @@ func (c *Controller) Run(ctx context.Context) error {
func (c *Controller) reconcile(ctx context.Context) error {
slog.DebugContext(ctx, "Reconciling DNS orchestration configuration...")

// 1. Get the ClusterIP of atenet-router in ate-system namespace
// 1. Get the ClusterIP of the atenet-router Service in the substrate namespace.
routerSvc := &corev1.Service{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: "atenet-router", Namespace: systemNamespace}, routerSvc); err != nil {
if err := c.Client.Get(ctx, types.NamespacedName{Name: c.RouterServiceName, Namespace: c.SystemNamespace}, routerSvc); err != nil {
if errors.IsNotFound(err) {
slog.WarnContext(ctx, "atenet-router service not found, skipping until it is available")
slog.WarnContext(ctx, "atenet-router service not found, skipping until it is available",
slog.String("name", c.RouterServiceName), slog.String("namespace", c.SystemNamespace))
return nil
}
return fmt.Errorf("failed to get atenet-router service: %w", err)
return fmt.Errorf("failed to get atenet-router service %s/%s: %w", c.SystemNamespace, c.RouterServiceName, err)
}

routerIP := routerSvc.Spec.ClusterIP
Expand All @@ -87,14 +93,15 @@ func (c *Controller) reconcile(ctx context.Context) error {
return nil
}

// 2. Get the ClusterIP of dns service in ate-system namespace
// 2. Get the ClusterIP of substrate's CoreDNS Service in the same namespace.
dnsSvc := &corev1.Service{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: systemNamespace}, dnsSvc); err != nil {
if err := c.Client.Get(ctx, types.NamespacedName{Name: c.DNSServiceName, Namespace: c.SystemNamespace}, dnsSvc); err != nil {
if errors.IsNotFound(err) {
slog.WarnContext(ctx, "dns service not found, skipping until it is available")
slog.WarnContext(ctx, "dns service not found, skipping until it is available",
slog.String("name", c.DNSServiceName), slog.String("namespace", c.SystemNamespace))
return nil
}
return fmt.Errorf("failed to get dns service: %w", err)
return fmt.Errorf("failed to get dns service %s/%s: %w", c.SystemNamespace, c.DNSServiceName, err)
}

dnsIP := dnsSvc.Spec.ClusterIP
Expand Down
24 changes: 16 additions & 8 deletions cmd/atenet/internal/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/agent-substrate/substrate/internal/installdefaults"
)

type mockConfigReloader struct {
Expand Down Expand Up @@ -94,10 +96,13 @@ func TestReconcile(t *testing.T) {

reloader := &mockConfigReloader{}
controller := &Controller{
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: reloader,
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: reloader,
SystemNamespace: installdefaults.SystemNamespace,
RouterServiceName: installdefaults.RouterServiceName,
DNSServiceName: installdefaults.DNSServiceName,
}

// Run one reconciliation loop
Expand Down Expand Up @@ -185,10 +190,13 @@ func TestReconcileKubeDNSNotFound(t *testing.T) {
Build()

controller := &Controller{
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: &mockConfigReloader{},
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: &mockConfigReloader{},
SystemNamespace: installdefaults.SystemNamespace,
RouterServiceName: installdefaults.RouterServiceName,
DNSServiceName: installdefaults.DNSServiceName,
}

ctx := context.Background()
Expand Down
2 changes: 2 additions & 0 deletions cmd/atenet/internal/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/agent-substrate/substrate/cmd/atenet/internal/router"
"github.com/agent-substrate/substrate/internal/ateapiauth"
"github.com/agent-substrate/substrate/internal/installdefaults"
)

func NewRouterCmd() *cobra.Command {
Expand All @@ -45,6 +46,7 @@ func NewRouterCmd() *cobra.Command {
cmd.Flags().StringVar(&cfg.MetricsAddr, "metrics-listen-addr", ":9090", "Address and port the prometheus metrics server should listen on.")
cmd.Flags().BoolVar(&cfg.Standalone, "standalone", false, "Run in standalone mode, bypassing creation of managed deployment and services in Kubernetes cluster")
cmd.Flags().StringVar(&cfg.Namespace, "namespace", "default", "Target operations namespace")
cmd.Flags().StringVar(&cfg.RouterServiceName, "router-service-name", installdefaults.RouterServiceName, "Service name of this atenet-router in the operations namespace. Override when the deployment renames the Service.")
cmd.Flags().StringVar(&cfg.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig configuration file")
cmd.Flags().StringVar(&cfg.AteapiAddr, "ateapi-address", "api.ate-system.svc:443", "gRPC host address of the cluster ateapi Control instance")
cmd.Flags().IntVar(&cfg.HttpPort, "port-http", 8080, "TCP port for workload traffic entering through the Envoy Router")
Expand Down
8 changes: 6 additions & 2 deletions cmd/atenet/internal/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ func init() {

// RouterConfig holds deployment setup and endpoint options for the router node instance.
type RouterConfig struct {
Standalone bool
Namespace string
Standalone bool
Namespace string
// RouterServiceName is the Service name of this atenet-router in the
// operations namespace, used by /statusz to look up its own ClusterIP.
// Defaults to installdefaults.RouterServiceName.
RouterServiceName string
Kubeconfig string
AteapiAddr string
HttpPort int
Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/router/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *RouterServer) getRouterIP(ctx context.Context) string {
return "Standalone Mode (No Cluster IP)"
}

svc, err := s.clientset.CoreV1().Services(s.cfg.Namespace).Get(ctx, "atenet-router", metav1.GetOptions{})
svc, err := s.clientset.CoreV1().Services(s.cfg.Namespace).Get(ctx, s.cfg.RouterServiceName, metav1.GetOptions{})
if err != nil {
return fmt.Sprintf("Lookup Failed: %v", err)
}
Expand Down
15 changes: 8 additions & 7 deletions internal/ateclient/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"sync"

"github.com/agent-substrate/substrate/internal/installdefaults"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -132,22 +133,22 @@ func dialPortForward(ctx context.Context, kubeconfigPath, k8sContext string, tra
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}

// Look up the 'api' Service to dynamically get its pod selector
svc, err := clientset.CoreV1().Services("ate-system").Get(ctx, "api", metav1.GetOptions{})
// Look up the ateapi Service to dynamically get its pod selector.
svc, err := clientset.CoreV1().Services(installdefaults.SystemNamespace).Get(ctx, installdefaults.APIServiceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get api service: %w", err)
return nil, fmt.Errorf("failed to get ateapi service %s/%s: %w", installdefaults.SystemNamespace, installdefaults.APIServiceName, err)
}
selector := labels.SelectorFromSet(svc.Spec.Selector).String()

// Find the pods backing the service
pods, err := clientset.CoreV1().Pods("ate-system").List(ctx, metav1.ListOptions{
pods, err := clientset.CoreV1().Pods(installdefaults.SystemNamespace).List(ctx, metav1.ListOptions{
LabelSelector: selector,
})
if err != nil {
return nil, fmt.Errorf("failed to list ateapi pods: %w", err)
}
if len(pods.Items) == 0 {
return nil, fmt.Errorf("no ate-api-server pods found in ate-system namespace")
return nil, fmt.Errorf("no ate-api-server pods found in %q namespace", installdefaults.SystemNamespace)
}
targetPod := pods.Items[0]

Expand Down Expand Up @@ -254,7 +255,7 @@ func jwtDialOptions(ctx context.Context, clientset *kubernetes.Clientset) ([]grp
ExpirationSeconds: &expirationSeconds,
},
}
token, err := clientset.CoreV1().ServiceAccounts("ate-system").CreateToken(ctx, "ate-client", tokenRequest, metav1.CreateOptions{})
token, err := clientset.CoreV1().ServiceAccounts(installdefaults.SystemNamespace).CreateToken(ctx, "ate-client", tokenRequest, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to request ateapi bearer token: %w", err)
}
Expand All @@ -267,7 +268,7 @@ func jwtDialOptions(ctx context.Context, clientset *kubernetes.Clientset) ([]grp
func isJWTMode(ctx context.Context, clientset *kubernetes.Clientset) (bool, error) {
// TODO: Replace deployment introspection with an explicit client-readable
// config file once ateapi auth mode is part of install/runtime config.
deployment, err := clientset.AppsV1().Deployments("ate-system").Get(ctx, "ate-api-server-deployment", metav1.GetOptions{})
deployment, err := clientset.AppsV1().Deployments(installdefaults.SystemNamespace).Get(ctx, "ate-api-server-deployment", metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get ate-api-server deployment: %w", err)
}
Expand Down
Loading
Loading