feat: Implement Phase 1 of kat-agent with leader election and init
This commit is contained in:
		| @ -0,0 +1,220 @@ | |||||||
|  | package main | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 	"log" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"path/filepath" | ||||||
|  | 	// "strings" // Not used | ||||||
|  | 	"syscall" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	pb "git.dws.rip/dubey/kat/api/v1alpha1" | ||||||
|  | 	"git.dws.rip/dubey/kat/internal/config" | ||||||
|  | 	"git.dws.rip/dubey/kat/internal/leader" | ||||||
|  | 	"git.dws.rip/dubey/kat/internal/store" | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/spf13/cobra" | ||||||
|  | 	"google.golang.org/protobuf/encoding/protojson" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	rootCmd = &cobra.Command{ | ||||||
|  | 		Use:   "kat-agent", | ||||||
|  | 		Short: "KAT Agent manages workloads on a node and participates in the cluster.", | ||||||
|  | 		Long: `The KAT Agent is responsible for running and managing containerized workloads | ||||||
|  | as instructed by the KAT Leader. It also participates in leader election if configured.`, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	initCmd = &cobra.Command{ | ||||||
|  | 		Use:   "init", | ||||||
|  | 		Short: "Initializes a new KAT cluster or a leader node.", | ||||||
|  | 		Long: `Parses a cluster.kat configuration file, starts an embedded etcd server (for the first node), | ||||||
|  | campaigns for leadership, and stores initial cluster configuration.`, | ||||||
|  | 		Run: runInit, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Global flags / config paths | ||||||
|  | 	clusterConfigPath string | ||||||
|  | 	nodeName          string | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	clusterUIDKey      = "/kat/config/cluster_uid" | ||||||
|  | 	clusterConfigKey   = "/kat/config/cluster_config" // Stores the JSON of pb.ClusterConfigurationSpec | ||||||
|  | 	defaultNodeName    = "kat-node" | ||||||
|  | 	etcdDataDirDefault = "/var/lib/kat-agent/etcd" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	initCmd.Flags().StringVar(&clusterConfigPath, "config", "cluster.kat", "Path to the cluster.kat configuration file.") | ||||||
|  | 	// It's good practice for node name to be unique. Hostname is a common default. | ||||||
|  | 	defaultHostName, err := os.Hostname() | ||||||
|  | 	if err != nil { | ||||||
|  | 		defaultHostName = defaultNodeName | ||||||
|  | 	} | ||||||
|  | 	initCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name of this node, used as leader ID if elected.") | ||||||
|  |  | ||||||
|  | 	rootCmd.AddCommand(initCmd) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func runInit(cmd *cobra.Command, args []string) { | ||||||
|  | 	log.Printf("Starting KAT Agent in init mode for node: %s", nodeName) | ||||||
|  |  | ||||||
|  | 	// 1. Parse cluster.kat | ||||||
|  | 	parsedClusterConfig, err := config.ParseClusterConfiguration(clusterConfigPath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatalf("Failed to parse cluster configuration from %s: %v", clusterConfigPath, err) | ||||||
|  | 	} | ||||||
|  | 	// SetClusterConfigDefaults is already called within ParseClusterConfiguration in the provided internal/config/parse.go | ||||||
|  | 	// config.SetClusterConfigDefaults(parsedClusterConfig)  | ||||||
|  | 	log.Printf("Successfully parsed and applied defaults to cluster configuration: %s", parsedClusterConfig.Metadata.Name) | ||||||
|  |  | ||||||
|  | 	// Prepare etcd embed config | ||||||
|  | 	// For a single node init, this node is the only peer. | ||||||
|  | 	// Client URLs and Peer URLs will be based on its own configuration. | ||||||
|  | 	// Ensure ports are defaulted if not specified (SetClusterConfigDefaults handles this). | ||||||
|  | 	 | ||||||
|  | 	// Assuming nodeName is resolvable or an IP is used. For simplicity, using localhost for single node. | ||||||
|  | 	// In a multi-node setup, this needs to be the advertised IP. | ||||||
|  | 	// For init, we assume this node is the first and only one. | ||||||
|  | 	clientURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdClientPort) | ||||||
|  | 	peerURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdPeerPort) | ||||||
|  |  | ||||||
|  | 	etcdEmbedCfg := store.EtcdEmbedConfig{ | ||||||
|  | 		Name:           nodeName,                                           // Etcd member name | ||||||
|  | 		DataDir:        filepath.Join(etcdDataDirDefault, nodeName),        // Ensure unique data dir | ||||||
|  | 		ClientURLs:     []string{clientURL},                                // Listen on this for clients | ||||||
|  | 		PeerURLs:       []string{peerURL},                                  // Listen on this for peers | ||||||
|  | 		InitialCluster: fmt.Sprintf("%s=%s", nodeName, peerURL), // For a new cluster, it's just itself | ||||||
|  | 		// ForceNewCluster should be true if we are certain this is a brand new cluster. | ||||||
|  | 		// For simplicity in init, we might not set it and rely on empty data-dir. | ||||||
|  | 		// embed.Config has ForceNewCluster field. | ||||||
|  | 	} | ||||||
|  | 	// Ensure data directory exists | ||||||
|  | 	if err := os.MkdirAll(etcdEmbedCfg.DataDir, 0700); err != nil { | ||||||
|  | 		log.Fatalf("Failed to create etcd data directory %s: %v", etcdEmbedCfg.DataDir, err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// 2. Start embedded etcd server | ||||||
|  | 	log.Printf("Initializing embedded etcd server (name: %s, data-dir: %s)...", etcdEmbedCfg.Name, etcdEmbedCfg.DataDir) | ||||||
|  | 	embeddedEtcd, err := store.StartEmbeddedEtcd(etcdEmbedCfg) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatalf("Failed to start embedded etcd: %v", err) | ||||||
|  | 	} | ||||||
|  | 	log.Println("Successfully initialized and started embedded etcd.") | ||||||
|  |  | ||||||
|  | 	// 3. Create StateStore client | ||||||
|  | 	// For init, the endpoints point to our own embedded server. | ||||||
|  | 	etcdStore, err := store.NewEtcdStore([]string{clientURL}, embeddedEtcd) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatalf("Failed to create etcd store client: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer etcdStore.Close() // Ensure etcd and client are cleaned up on exit | ||||||
|  |  | ||||||
|  | 	// Setup signal handling for graceful shutdown | ||||||
|  | 	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) | ||||||
|  | 	defer stop() | ||||||
|  |  | ||||||
|  | 	// 4. Campaign for leadership | ||||||
|  | 	leadershipMgr := leader.NewLeadershipManager( | ||||||
|  | 		etcdStore, | ||||||
|  | 		nodeName, | ||||||
|  | 		func(leadershipCtx context.Context) { // OnElected | ||||||
|  | 			log.Printf("Node %s became leader. Performing initial setup.", nodeName) | ||||||
|  | 			// Store Cluster UID | ||||||
|  | 			// Check if UID already exists, perhaps from a previous partial init. | ||||||
|  | 			// For a clean init, we'd expect to write it. | ||||||
|  | 			_, getErr := etcdStore.Get(leadershipCtx, clusterUIDKey) | ||||||
|  | 			if getErr != nil { // Assuming error means not found or other issue | ||||||
|  | 				clusterUID := uuid.New().String() | ||||||
|  | 				err := etcdStore.Put(leadershipCtx, clusterUIDKey, []byte(clusterUID)) | ||||||
|  | 				if err != nil { | ||||||
|  | 					log.Printf("Failed to store cluster UID: %v. Continuing...", err) | ||||||
|  | 					// This is critical, should ideally retry or fail. | ||||||
|  | 				} else { | ||||||
|  | 					log.Printf("Stored new Cluster UID: %s", clusterUID) | ||||||
|  | 				} | ||||||
|  | 			} else { | ||||||
|  | 				log.Printf("Cluster UID already exists in etcd. Skipping storage.") | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  |  | ||||||
|  | 			// Store ClusterConfigurationSpec (as JSON) | ||||||
|  | 			// We store Spec because Metadata might change (e.g. resourceVersion) | ||||||
|  | 			// and is more for API object representation. | ||||||
|  | 			specJson, err := protojson.Marshal(parsedClusterConfig.Spec) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Printf("Failed to marshal ClusterConfigurationSpec to JSON: %v", err) | ||||||
|  | 			} else { | ||||||
|  | 				err = etcdStore.Put(leadershipCtx, clusterConfigKey, specJson) | ||||||
|  | 				if err != nil { | ||||||
|  | 					log.Printf("Failed to store cluster configuration spec: %v", err) | ||||||
|  | 				} else { | ||||||
|  | 					log.Printf("Stored cluster configuration spec in etcd.") | ||||||
|  | 					log.Printf("Cluster CIDR: %s, Service CIDR: %s, API Port: %d", | ||||||
|  | 						parsedClusterConfig.Spec.ClusterCidr, | ||||||
|  | 						parsedClusterConfig.Spec.ServiceCidr, | ||||||
|  | 						parsedClusterConfig.Spec.ApiPort) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			log.Println("Initial leader setup complete. Waiting for leadership context to end or agent to be stopped.") | ||||||
|  | 			<-leadershipCtx.Done() // Wait until leadership is lost or context is cancelled by manager | ||||||
|  | 		}, | ||||||
|  | 		func() { // OnResigned | ||||||
|  | 			log.Printf("Node %s resigned or lost leadership.", nodeName) | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	// Set lease TTL from cluster.kat or defaults | ||||||
|  | 	if parsedClusterConfig.Spec.AgentTickSeconds > 0 { | ||||||
|  | 		// A common pattern is TTL = 3 * TickInterval | ||||||
|  | 		leaseTTL := int64(parsedClusterConfig.Spec.AgentTickSeconds * 3) | ||||||
|  | 		if leaseTTL < leader.DefaultLeaseTTLSeconds { // Ensure a minimum | ||||||
|  | 			leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds | ||||||
|  | 		} else { | ||||||
|  | 			leadershipMgr.LeaseTTLSeconds = leaseTTL | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  |  | ||||||
|  | 	// Run leadership manager. This will block until ctx is cancelled. | ||||||
|  | 	go leadershipMgr.Run(ctx) | ||||||
|  |  | ||||||
|  | 	// Keep main alive until context is cancelled (e.g. by SIGINT/SIGTERM) | ||||||
|  | 	<-ctx.Done() | ||||||
|  | 	log.Println("KAT Agent init shutting down...") | ||||||
|  |  | ||||||
|  | 	// The defer etcdStore.Close() will handle resigning and stopping etcd. | ||||||
|  | 	// Allow some time for graceful shutdown. | ||||||
|  | 	time.Sleep(1 * time.Second) | ||||||
|  | 	log.Println("KAT Agent init shutdown complete.") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func main() { | ||||||
|  | 	if err := rootCmd.Execute(); err != nil { | ||||||
|  | 		fmt.Fprintf(os.Stderr, "Error: %v\n", err) | ||||||
|  | 		os.Exit(1) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Helper to check if a string is in a slice of strings (Not currently used) | ||||||
|  | // func containsString(slice []string, s string) bool { | ||||||
|  | // 	for _, item := range slice { | ||||||
|  | // 		if item == s { | ||||||
|  | // 			return true | ||||||
|  | // 		} | ||||||
|  | // 	} | ||||||
|  | // 	return false | ||||||
|  | // } | ||||||
|  |  | ||||||
|  | // SanitizeClusterConfigForStorage can be used if we want to strip sensitive fields | ||||||
|  | // or normalize the config before storing. For now, storing Spec as is. | ||||||
|  | // func SanitizeClusterConfigForStorage(config *pb.ClusterConfiguration) *pb.ClusterConfigurationSpec { | ||||||
|  | // 	// Example: return a copy with certain fields cleared if needed | ||||||
|  | // 	return config.Spec | ||||||
|  | // } | ||||||
|  | |||||||
| @ -0,0 +1,85 @@ | |||||||
|  | package leader | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"log" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"git.dws.rip/dubey/kat/internal/store" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	// DefaultLeaseTTLSeconds is the default time-to-live for a leader's lease. | ||||||
|  | 	DefaultLeaseTTLSeconds = 15 | ||||||
|  | 	// DefaultRetryPeriod is the time to wait before retrying to campaign for leadership. | ||||||
|  | 	DefaultRetryPeriod = 5 * time.Second | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // LeadershipManager handles the lifecycle of campaigning for and maintaining leadership. | ||||||
|  | type LeadershipManager struct { | ||||||
|  | 	Store           store.StateStore | ||||||
|  | 	LeaderID        string // Identifier for this candidate (e.g., node name) | ||||||
|  | 	LeaseTTLSeconds int64 | ||||||
|  |  | ||||||
|  | 	OnElected  func(leadershipCtx context.Context) // Called when leadership is acquired | ||||||
|  | 	OnResigned func()                             // Called when leadership is lost or resigned | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewLeadershipManager creates a new leadership manager. | ||||||
|  | func NewLeadershipManager(st store.StateStore, leaderID string, onElected func(leadershipCtx context.Context), onResigned func()) *LeadershipManager { | ||||||
|  | 	return &LeadershipManager{ | ||||||
|  | 		Store:           st, | ||||||
|  | 		LeaderID:        leaderID, | ||||||
|  | 		LeaseTTLSeconds: DefaultLeaseTTLSeconds, | ||||||
|  | 		OnElected:       onElected, | ||||||
|  | 		OnResigned:      onResigned, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Run starts the leadership campaign loop. | ||||||
|  | // It blocks until the provided context is cancelled. | ||||||
|  | func (lm *LeadershipManager) Run(ctx context.Context) { | ||||||
|  | 	log.Printf("Starting leadership manager for %s", lm.LeaderID) | ||||||
|  | 	defer log.Printf("Leadership manager for %s stopped", lm.LeaderID) | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-ctx.Done(): | ||||||
|  | 			log.Printf("Parent context cancelled, stopping leadership campaign for %s.", lm.LeaderID) | ||||||
|  | 			// Attempt to resign if currently leading, though store.Close() might handle this too. | ||||||
|  | 			// This resign is best-effort as the app is shutting down. | ||||||
|  | 			resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||||||
|  | 			lm.Store.Resign(resignCtx) | ||||||
|  | 			cancel() | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		log.Printf("%s is campaigning for leadership...", lm.LeaderID) | ||||||
|  | 		leadershipCtx, err := lm.Store.Campaign(ctx, lm.LeaderID, lm.LeaseTTLSeconds) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Printf("Error campaigning for leadership for %s: %v. Retrying in %v.", lm.LeaderID, err, DefaultRetryPeriod) | ||||||
|  | 			select { | ||||||
|  | 			case <-time.After(DefaultRetryPeriod): | ||||||
|  | 				continue | ||||||
|  | 			case <-ctx.Done(): | ||||||
|  | 				return // Exit if parent context cancelled during retry wait | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// Successfully became leader | ||||||
|  | 		log.Printf("%s is now the leader.", lm.LeaderID) | ||||||
|  | 		if lm.OnElected != nil { | ||||||
|  | 			lm.OnElected(leadershipCtx) // Pass the context that's cancelled on leadership loss | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// Block until leadership is lost (leadershipCtx is cancelled) | ||||||
|  | 		<-leadershipCtx.Done() | ||||||
|  | 		log.Printf("%s has lost leadership.", lm.LeaderID) | ||||||
|  | 		if lm.OnResigned != nil { | ||||||
|  | 			lm.OnResigned() | ||||||
|  | 		} | ||||||
|  | 		// Loop will restart campaign unless parent ctx is done. | ||||||
|  | 		// Store.Resign() is implicitly called by the store when leadershipCtx is done or session expires. | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user