package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "path/filepath" "syscall" "time" "git.dws.rip/dubey/kat/internal/api" "git.dws.rip/dubey/kat/internal/cli" "git.dws.rip/dubey/kat/internal/config" "git.dws.rip/dubey/kat/internal/leader" "git.dws.rip/dubey/kat/internal/pki" "git.dws.rip/dubey/kat/internal/store" "github.com/google/uuid" "github.com/spf13/cobra" "google.golang.org/protobuf/encoding/protojson" ) var ( rootCmd = &cobra.Command{ Use: "kat-agent", Short: "KAT Agent manages workloads on a node and participates in the cluster.", Long: `The KAT Agent is responsible for running and managing containerized workloads as instructed by the KAT Leader. It also participates in leader election if configured.`, } initCmd = &cobra.Command{ Use: "init", Short: "Initializes a new KAT cluster or a leader node.", Long: `Parses a cluster.kat configuration file, starts an embedded etcd server (for the first node), campaigns for leadership, and stores initial cluster configuration.`, Run: runInit, } joinCmd = &cobra.Command{ Use: "join", Short: "Joins an existing KAT cluster.", Long: `Connects to an existing KAT leader, submits a certificate signing request, and obtains the necessary credentials to participate in the cluster.`, Run: runJoin, } // Global flags / config paths clusterConfigPath string nodeName string // Join command flags leaderAPI string advertiseAddr string leaderCACert string etcdPeer bool ) const ( clusterUIDKey = "/kat/config/cluster_uid" clusterConfigKey = "/kat/config/cluster_config" // Stores the JSON of pb.ClusterConfigurationSpec defaultNodeName = "kat-node" leaderCertCN = "leader.kat.cluster.local" // Common Name for leader certificate ) func init() { initCmd.Flags().StringVar(&clusterConfigPath, "config", "cluster.kat", "Path to the cluster.kat configuration file.") // It's good practice for node name to be unique. Hostname is a common default. defaultHostName, err := os.Hostname() if err != nil { defaultHostName = defaultNodeName } initCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name of this node, used as leader ID if elected.") // Join command flags joinCmd.Flags().StringVar(&leaderAPI, "leader-api", "", "Address of the leader API (required, format: host:port)") joinCmd.Flags().StringVar(&advertiseAddr, "advertise-address", "", "IP address or interface name to advertise to other nodes (required)") joinCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name for this node in the cluster") joinCmd.Flags().StringVar(&leaderCACert, "leader-ca-cert", "", "Path to the leader's CA certificate (optional, insecure if not provided)") joinCmd.Flags().BoolVar(&etcdPeer, "etcd-peer", false, "Request to join the etcd quorum (optional)") // Mark required flags joinCmd.MarkFlagRequired("leader-api") joinCmd.MarkFlagRequired("advertise-address") rootCmd.AddCommand(initCmd) rootCmd.AddCommand(joinCmd) } func runInit(cmd *cobra.Command, args []string) { log.Printf("Starting KAT Agent in init mode for node: %s", nodeName) // 1. Parse cluster.kat parsedClusterConfig, err := config.ParseClusterConfiguration(clusterConfigPath) if err != nil { log.Fatalf("Failed to parse cluster configuration from %s: %v", clusterConfigPath, err) } // SetClusterConfigDefaults is already called within ParseClusterConfiguration in the provided internal/config/parse.go // config.SetClusterConfigDefaults(parsedClusterConfig) log.Printf("Successfully parsed and applied defaults to cluster configuration: %s", parsedClusterConfig.Metadata.Name) // 1.5. Initialize PKI directory and CA if it doesn't exist pkiDir := pki.GetPKIPathFromClusterConfig(parsedClusterConfig.Spec.BackupPath) caKeyPath := filepath.Join(pkiDir, "ca.key") caCertPath := filepath.Join(pkiDir, "ca.crt") // Check if CA already exists _, caKeyErr := os.Stat(caKeyPath) _, caCertErr := os.Stat(caCertPath) if os.IsNotExist(caKeyErr) || os.IsNotExist(caCertErr) { log.Printf("CA key or certificate not found. Generating new CA in %s", pkiDir) if err := pki.GenerateCA(pkiDir, caKeyPath, caCertPath); err != nil { log.Fatalf("Failed to generate CA: %v", err) } log.Println("Successfully generated new CA key and certificate") } else { log.Println("CA key and certificate already exist, skipping generation") } // Prepare etcd embed config // For a single node init, this node is the only peer. // Client URLs and Peer URLs will be based on its own configuration. // Ensure ports are defaulted if not specified (SetClusterConfigDefaults handles this). // Assuming nodeName is resolvable or an IP is used. For simplicity, using localhost for single node. // In a multi-node setup, this needs to be the advertised IP. // For init, we assume this node is the first and only one. clientURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdClientPort) peerURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdPeerPort) etcdEmbedCfg := store.EtcdEmbedConfig{ Name: nodeName, // Etcd member name DataDir: filepath.Join(os.Getenv("HOME"), ".kat-agent", nodeName), // Ensure unique data dir ClientURLs: []string{clientURL}, // Listen on this for clients PeerURLs: []string{peerURL}, // Listen on this for peers InitialCluster: fmt.Sprintf("%s=%s", nodeName, peerURL), // For a new cluster, it's just itself // ForceNewCluster should be true if we are certain this is a brand new cluster. // For simplicity in init, we might not set it and rely on empty data-dir. // embed.Config has ForceNewCluster field. } // Ensure data directory exists if err := os.MkdirAll(etcdEmbedCfg.DataDir, 0700); err != nil { log.Fatalf("Failed to create etcd data directory %s: %v", etcdEmbedCfg.DataDir, err) } // 2. Start embedded etcd server log.Printf("Initializing embedded etcd server (name: %s, data-dir: %s)...", etcdEmbedCfg.Name, etcdEmbedCfg.DataDir) embeddedEtcd, err := store.StartEmbeddedEtcd(etcdEmbedCfg) if err != nil { log.Fatalf("Failed to start embedded etcd: %v", err) } log.Println("Successfully initialized and started embedded etcd.") // 3. Create StateStore client // For init, the endpoints point to our own embedded server. etcdStore, err := store.NewEtcdStore([]string{clientURL}, embeddedEtcd) if err != nil { log.Fatalf("Failed to create etcd store client: %v", err) } defer etcdStore.Close() // Ensure etcd and client are cleaned up on exit // Setup signal handling for graceful shutdown ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() // 4. Campaign for leadership leadershipMgr := leader.NewLeadershipManager( etcdStore, nodeName, func(leadershipCtx context.Context) { // OnElected log.Printf("Node %s became leader. Performing initial setup.", nodeName) // Store Cluster UID // Check if UID already exists, perhaps from a previous partial init. // For a clean init, we'd expect to write it. _, getErr := etcdStore.Get(leadershipCtx, clusterUIDKey) if getErr != nil { // Assuming error means not found or other issue clusterUID := uuid.New().String() err := etcdStore.Put(leadershipCtx, clusterUIDKey, []byte(clusterUID)) if err != nil { log.Printf("Failed to store cluster UID: %v. Continuing...", err) // This is critical, should ideally retry or fail. } else { log.Printf("Stored new Cluster UID: %s", clusterUID) } } else { log.Printf("Cluster UID already exists in etcd. Skipping storage.") } // Generate leader's server certificate for mTLS leaderKeyPath := filepath.Join(pkiDir, "leader.key") leaderCSRPath := filepath.Join(pkiDir, "leader.csr") leaderCertPath := filepath.Join(pkiDir, "leader.crt") // Check if leader cert already exists _, leaderCertErr := os.Stat(leaderCertPath) if os.IsNotExist(leaderCertErr) { log.Println("Generating leader server certificate for mTLS") // Generate key and CSR for leader if err := pki.GenerateCertificateRequest(leaderCertCN, leaderKeyPath, leaderCSRPath); err != nil { log.Printf("Failed to generate leader key and CSR: %v", err) } else { // Read the CSR file _, err := os.ReadFile(leaderCSRPath) if err != nil { log.Printf("Failed to read leader CSR file: %v", err) } else { // Sign the CSR with our CA if err := pki.SignCertificateRequest(caKeyPath, caCertPath, leaderCSRPath, leaderCertPath, 365*24*time.Hour); err != nil { log.Printf("Failed to sign leader CSR: %v", err) } else { log.Println("Successfully generated and signed leader server certificate") } } } } else { log.Println("Leader certificate already exists, skipping generation") } // Store ClusterConfigurationSpec (as JSON) // We store Spec because Metadata might change (e.g. resourceVersion) // and is more for API object representation. specJson, err := protojson.Marshal(parsedClusterConfig.Spec) if err != nil { log.Printf("Failed to marshal ClusterConfigurationSpec to JSON: %v", err) } else { err = etcdStore.Put(leadershipCtx, clusterConfigKey, specJson) if err != nil { log.Printf("Failed to store cluster configuration spec: %v", err) } else { log.Printf("Stored cluster configuration spec in etcd.") log.Printf("Cluster CIDR: %s, Service CIDR: %s, API Port: %d", parsedClusterConfig.Spec.ClusterCidr, parsedClusterConfig.Spec.ServiceCidr, parsedClusterConfig.Spec.ApiPort) } } // Start API server with mTLS log.Println("Starting API server with mTLS...") apiAddr := fmt.Sprintf(":%d", parsedClusterConfig.Spec.ApiPort) apiServer, err := api.NewServer(apiAddr, leaderCertPath, leaderKeyPath, caCertPath) if err != nil { log.Printf("Failed to create API server: %v", err) } else { // Register the join handler joinHandler := api.NewJoinHandler(etcdStore, caKeyPath, caCertPath) apiServer.RegisterJoinHandler(joinHandler) log.Printf("Registered join handler with CA key: %s, CA cert: %s", caKeyPath, caCertPath) // Start the server in a goroutine go func() { if err := apiServer.Start(); err != nil && err != http.ErrServerClosed { log.Printf("API server error: %v", err) } }() // Add a shutdown hook to the leadership context go func() { <-leadershipCtx.Done() log.Println("Leadership lost, shutting down API server...") shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := apiServer.Stop(shutdownCtx); err != nil { log.Printf("Error shutting down API server: %v", err) } }() log.Printf("API server started on port %d with mTLS", parsedClusterConfig.Spec.ApiPort) log.Printf("Verification: API server requires client certificates signed by the cluster CA") log.Printf("Test with: curl --cacert %s --cert --key https://localhost:%d/internal/v1alpha1/join", caCertPath, parsedClusterConfig.Spec.ApiPort) } log.Println("Initial leader setup complete. Waiting for leadership context to end or agent to be stopped.") <-leadershipCtx.Done() // Wait until leadership is lost or context is cancelled by manager }, func() { // OnResigned log.Printf("Node %s resigned or lost leadership.", nodeName) }, ) // Set lease TTL from cluster.kat or defaults if parsedClusterConfig.Spec.AgentTickSeconds > 0 { // A common pattern is TTL = 3 * TickInterval leaseTTL := int64(parsedClusterConfig.Spec.AgentTickSeconds * 3) if leaseTTL < leader.DefaultLeaseTTLSeconds { // Ensure a minimum leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds } else { leadershipMgr.LeaseTTLSeconds = leaseTTL } } else { leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds } // Run leadership manager. This will block until ctx is cancelled. go leadershipMgr.Run(ctx) // Keep main alive until context is cancelled (e.g. by SIGINT/SIGTERM) <-ctx.Done() log.Println("KAT Agent init shutting down...") // The defer etcdStore.Close() will handle resigning and stopping etcd. // Allow some time for graceful shutdown. time.Sleep(1 * time.Second) log.Println("KAT Agent init shutdown complete.") } func runJoin(cmd *cobra.Command, args []string) { log.Printf("Starting KAT Agent in join mode for node: %s", nodeName) log.Printf("Attempting to join cluster via leader API: %s", leaderAPI) // Determine PKI directory // For simplicity, we'll use a default location pkiDir := filepath.Join(os.Getenv("HOME"), ".kat-agent", nodeName, "pki") // Join the cluster if err := cli.JoinCluster(leaderAPI, advertiseAddr, nodeName, leaderCACert, pkiDir); err != nil { log.Fatalf("Failed to join cluster: %v", err) } log.Printf("Successfully joined cluster. Node is ready.") // In a real implementation, we would start the agent's main loop here // For now, we'll just exit successfully } func main() { if err := rootCmd.Execute(); err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } }