From f1edc3eca1333070b13f763d44068d120e7ad730 Mon Sep 17 00:00:00 2001 From: "Tanishq Dubey (aider)" Date: Sat, 10 May 2025 19:05:23 -0400 Subject: [PATCH] feat: Implement Phase 1 of kat-agent with leader election and init --- cmd/kat-agent/main.go | 220 ++++++++++++++++++++++++++++++++++++ internal/leader/election.go | 85 ++++++++++++++ 2 files changed, 305 insertions(+) diff --git a/cmd/kat-agent/main.go b/cmd/kat-agent/main.go index e69de29..a358cb0 100644 --- a/cmd/kat-agent/main.go +++ b/cmd/kat-agent/main.go @@ -0,0 +1,220 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + // "strings" // Not used + "syscall" + "time" + + pb "git.dws.rip/dubey/kat/api/v1alpha1" + "git.dws.rip/dubey/kat/internal/config" + "git.dws.rip/dubey/kat/internal/leader" + "git.dws.rip/dubey/kat/internal/store" + "github.com/google/uuid" + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" +) + +var ( + rootCmd = &cobra.Command{ + Use: "kat-agent", + Short: "KAT Agent manages workloads on a node and participates in the cluster.", + Long: `The KAT Agent is responsible for running and managing containerized workloads +as instructed by the KAT Leader. It also participates in leader election if configured.`, + } + + initCmd = &cobra.Command{ + Use: "init", + Short: "Initializes a new KAT cluster or a leader node.", + Long: `Parses a cluster.kat configuration file, starts an embedded etcd server (for the first node), +campaigns for leadership, and stores initial cluster configuration.`, + Run: runInit, + } + + // Global flags / config paths + clusterConfigPath string + nodeName string +) + +const ( + clusterUIDKey = "/kat/config/cluster_uid" + clusterConfigKey = "/kat/config/cluster_config" // Stores the JSON of pb.ClusterConfigurationSpec + defaultNodeName = "kat-node" + etcdDataDirDefault = "/var/lib/kat-agent/etcd" +) + +func init() { + initCmd.Flags().StringVar(&clusterConfigPath, "config", "cluster.kat", "Path to the cluster.kat configuration file.") + // It's good practice for node name to be unique. Hostname is a common default. + defaultHostName, err := os.Hostname() + if err != nil { + defaultHostName = defaultNodeName + } + initCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name of this node, used as leader ID if elected.") + + rootCmd.AddCommand(initCmd) +} + +func runInit(cmd *cobra.Command, args []string) { + log.Printf("Starting KAT Agent in init mode for node: %s", nodeName) + + // 1. Parse cluster.kat + parsedClusterConfig, err := config.ParseClusterConfiguration(clusterConfigPath) + if err != nil { + log.Fatalf("Failed to parse cluster configuration from %s: %v", clusterConfigPath, err) + } + // SetClusterConfigDefaults is already called within ParseClusterConfiguration in the provided internal/config/parse.go + // config.SetClusterConfigDefaults(parsedClusterConfig) + log.Printf("Successfully parsed and applied defaults to cluster configuration: %s", parsedClusterConfig.Metadata.Name) + + // Prepare etcd embed config + // For a single node init, this node is the only peer. + // Client URLs and Peer URLs will be based on its own configuration. + // Ensure ports are defaulted if not specified (SetClusterConfigDefaults handles this). + + // Assuming nodeName is resolvable or an IP is used. For simplicity, using localhost for single node. + // In a multi-node setup, this needs to be the advertised IP. + // For init, we assume this node is the first and only one. + clientURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdClientPort) + peerURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdPeerPort) + + etcdEmbedCfg := store.EtcdEmbedConfig{ + Name: nodeName, // Etcd member name + DataDir: filepath.Join(etcdDataDirDefault, nodeName), // Ensure unique data dir + ClientURLs: []string{clientURL}, // Listen on this for clients + PeerURLs: []string{peerURL}, // Listen on this for peers + InitialCluster: fmt.Sprintf("%s=%s", nodeName, peerURL), // For a new cluster, it's just itself + // ForceNewCluster should be true if we are certain this is a brand new cluster. + // For simplicity in init, we might not set it and rely on empty data-dir. + // embed.Config has ForceNewCluster field. + } + // Ensure data directory exists + if err := os.MkdirAll(etcdEmbedCfg.DataDir, 0700); err != nil { + log.Fatalf("Failed to create etcd data directory %s: %v", etcdEmbedCfg.DataDir, err) + } + + // 2. Start embedded etcd server + log.Printf("Initializing embedded etcd server (name: %s, data-dir: %s)...", etcdEmbedCfg.Name, etcdEmbedCfg.DataDir) + embeddedEtcd, err := store.StartEmbeddedEtcd(etcdEmbedCfg) + if err != nil { + log.Fatalf("Failed to start embedded etcd: %v", err) + } + log.Println("Successfully initialized and started embedded etcd.") + + // 3. Create StateStore client + // For init, the endpoints point to our own embedded server. + etcdStore, err := store.NewEtcdStore([]string{clientURL}, embeddedEtcd) + if err != nil { + log.Fatalf("Failed to create etcd store client: %v", err) + } + defer etcdStore.Close() // Ensure etcd and client are cleaned up on exit + + // Setup signal handling for graceful shutdown + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // 4. Campaign for leadership + leadershipMgr := leader.NewLeadershipManager( + etcdStore, + nodeName, + func(leadershipCtx context.Context) { // OnElected + log.Printf("Node %s became leader. Performing initial setup.", nodeName) + // Store Cluster UID + // Check if UID already exists, perhaps from a previous partial init. + // For a clean init, we'd expect to write it. + _, getErr := etcdStore.Get(leadershipCtx, clusterUIDKey) + if getErr != nil { // Assuming error means not found or other issue + clusterUID := uuid.New().String() + err := etcdStore.Put(leadershipCtx, clusterUIDKey, []byte(clusterUID)) + if err != nil { + log.Printf("Failed to store cluster UID: %v. Continuing...", err) + // This is critical, should ideally retry or fail. + } else { + log.Printf("Stored new Cluster UID: %s", clusterUID) + } + } else { + log.Printf("Cluster UID already exists in etcd. Skipping storage.") + } + + + // Store ClusterConfigurationSpec (as JSON) + // We store Spec because Metadata might change (e.g. resourceVersion) + // and is more for API object representation. + specJson, err := protojson.Marshal(parsedClusterConfig.Spec) + if err != nil { + log.Printf("Failed to marshal ClusterConfigurationSpec to JSON: %v", err) + } else { + err = etcdStore.Put(leadershipCtx, clusterConfigKey, specJson) + if err != nil { + log.Printf("Failed to store cluster configuration spec: %v", err) + } else { + log.Printf("Stored cluster configuration spec in etcd.") + log.Printf("Cluster CIDR: %s, Service CIDR: %s, API Port: %d", + parsedClusterConfig.Spec.ClusterCidr, + parsedClusterConfig.Spec.ServiceCidr, + parsedClusterConfig.Spec.ApiPort) + } + } + log.Println("Initial leader setup complete. Waiting for leadership context to end or agent to be stopped.") + <-leadershipCtx.Done() // Wait until leadership is lost or context is cancelled by manager + }, + func() { // OnResigned + log.Printf("Node %s resigned or lost leadership.", nodeName) + }, + ) + + // Set lease TTL from cluster.kat or defaults + if parsedClusterConfig.Spec.AgentTickSeconds > 0 { + // A common pattern is TTL = 3 * TickInterval + leaseTTL := int64(parsedClusterConfig.Spec.AgentTickSeconds * 3) + if leaseTTL < leader.DefaultLeaseTTLSeconds { // Ensure a minimum + leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds + } else { + leadershipMgr.LeaseTTLSeconds = leaseTTL + } + } else { + leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds + } + + + // Run leadership manager. This will block until ctx is cancelled. + go leadershipMgr.Run(ctx) + + // Keep main alive until context is cancelled (e.g. by SIGINT/SIGTERM) + <-ctx.Done() + log.Println("KAT Agent init shutting down...") + + // The defer etcdStore.Close() will handle resigning and stopping etcd. + // Allow some time for graceful shutdown. + time.Sleep(1 * time.Second) + log.Println("KAT Agent init shutdown complete.") +} + +func main() { + if err := rootCmd.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +// Helper to check if a string is in a slice of strings (Not currently used) +// func containsString(slice []string, s string) bool { +// for _, item := range slice { +// if item == s { +// return true +// } +// } +// return false +// } + +// SanitizeClusterConfigForStorage can be used if we want to strip sensitive fields +// or normalize the config before storing. For now, storing Spec as is. +// func SanitizeClusterConfigForStorage(config *pb.ClusterConfiguration) *pb.ClusterConfigurationSpec { +// // Example: return a copy with certain fields cleared if needed +// return config.Spec +// } diff --git a/internal/leader/election.go b/internal/leader/election.go index e69de29..3691bb4 100644 --- a/internal/leader/election.go +++ b/internal/leader/election.go @@ -0,0 +1,85 @@ +package leader + +import ( + "context" + "log" + "time" + + "git.dws.rip/dubey/kat/internal/store" +) + +const ( + // DefaultLeaseTTLSeconds is the default time-to-live for a leader's lease. + DefaultLeaseTTLSeconds = 15 + // DefaultRetryPeriod is the time to wait before retrying to campaign for leadership. + DefaultRetryPeriod = 5 * time.Second +) + +// LeadershipManager handles the lifecycle of campaigning for and maintaining leadership. +type LeadershipManager struct { + Store store.StateStore + LeaderID string // Identifier for this candidate (e.g., node name) + LeaseTTLSeconds int64 + + OnElected func(leadershipCtx context.Context) // Called when leadership is acquired + OnResigned func() // Called when leadership is lost or resigned +} + +// NewLeadershipManager creates a new leadership manager. +func NewLeadershipManager(st store.StateStore, leaderID string, onElected func(leadershipCtx context.Context), onResigned func()) *LeadershipManager { + return &LeadershipManager{ + Store: st, + LeaderID: leaderID, + LeaseTTLSeconds: DefaultLeaseTTLSeconds, + OnElected: onElected, + OnResigned: onResigned, + } +} + +// Run starts the leadership campaign loop. +// It blocks until the provided context is cancelled. +func (lm *LeadershipManager) Run(ctx context.Context) { + log.Printf("Starting leadership manager for %s", lm.LeaderID) + defer log.Printf("Leadership manager for %s stopped", lm.LeaderID) + + for { + select { + case <-ctx.Done(): + log.Printf("Parent context cancelled, stopping leadership campaign for %s.", lm.LeaderID) + // Attempt to resign if currently leading, though store.Close() might handle this too. + // This resign is best-effort as the app is shutting down. + resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + lm.Store.Resign(resignCtx) + cancel() + return + default: + } + + log.Printf("%s is campaigning for leadership...", lm.LeaderID) + leadershipCtx, err := lm.Store.Campaign(ctx, lm.LeaderID, lm.LeaseTTLSeconds) + if err != nil { + log.Printf("Error campaigning for leadership for %s: %v. Retrying in %v.", lm.LeaderID, err, DefaultRetryPeriod) + select { + case <-time.After(DefaultRetryPeriod): + continue + case <-ctx.Done(): + return // Exit if parent context cancelled during retry wait + } + } + + // Successfully became leader + log.Printf("%s is now the leader.", lm.LeaderID) + if lm.OnElected != nil { + lm.OnElected(leadershipCtx) // Pass the context that's cancelled on leadership loss + } + + // Block until leadership is lost (leadershipCtx is cancelled) + <-leadershipCtx.Done() + log.Printf("%s has lost leadership.", lm.LeaderID) + if lm.OnResigned != nil { + lm.OnResigned() + } + // Loop will restart campaign unless parent ctx is done. + // Store.Resign() is implicitly called by the store when leadershipCtx is done or session expires. + } +}