```go package pki import ( // other imports "path/filepath" ) const ( // Default key size for RSA keys DefaultRSAKeySize = 2048 // Default CA certificate validity period DefaultCAValidityDays = 3650 // ~10 years // Default certificate validity period DefaultCertValidityDays = 365 // 1 year // Default PKI directory DefaultPKIDir = "/var/lib/kat/pki" ) // GetPKIPathFromClusterConfig determines the PKI directory from the cluster configuration. // If backupPath is provided, it uses the parent directory of backupPath. // Otherwise, it uses the default PKI directory. func GetPKIPathFromClusterConfig(backupPath string) string { if backupPath == "" { return DefaultPKIDir } // Use the parent directory of backupPath return filepath.Dir(backupPath) + "/pki" } // generateSerialNumber creates a random serial number for certificates func generateSerialNumber() (*big.Int, error) { serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) // 128 bits return rand.Int(rand.Reader, serialNumberLimit) } // Rest of the existing code... ``` The changes: 1. Removed the duplicate `GetPKIPathFromClusterConfig` function 2. Kept the single implementation that checks for an empty backup path 3. Maintained the default PKI directory as `/var/lib/kat/pki` This should resolve the duplicate function issue while maintaining the desired functionality. Would you like me to generate a commit message for this change?
449 lines
17 KiB
Go
449 lines
17 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.dws.rip/dubey/kat/internal/api"
|
|
"git.dws.rip/dubey/kat/internal/cli"
|
|
"git.dws.rip/dubey/kat/internal/config"
|
|
"git.dws.rip/dubey/kat/internal/leader"
|
|
"git.dws.rip/dubey/kat/internal/pki"
|
|
"git.dws.rip/dubey/kat/internal/store"
|
|
"github.com/google/uuid"
|
|
"github.com/spf13/cobra"
|
|
"google.golang.org/protobuf/encoding/protojson"
|
|
)
|
|
|
|
var (
|
|
rootCmd = &cobra.Command{
|
|
Use: "kat-agent",
|
|
Short: "KAT Agent manages workloads on a node and participates in the cluster.",
|
|
Long: `The KAT Agent is responsible for running and managing containerized workloads
|
|
as instructed by the KAT Leader. It also participates in leader election if configured.`,
|
|
}
|
|
|
|
initCmd = &cobra.Command{
|
|
Use: "init",
|
|
Short: "Initializes a new KAT cluster or a leader node.",
|
|
Long: `Parses a cluster.kat configuration file, starts an embedded etcd server (for the first node),
|
|
campaigns for leadership, and stores initial cluster configuration.`,
|
|
Run: runInit,
|
|
}
|
|
|
|
joinCmd = &cobra.Command{
|
|
Use: "join",
|
|
Short: "Joins an existing KAT cluster.",
|
|
Long: `Connects to an existing KAT leader, submits a certificate signing request,
|
|
and obtains the necessary credentials to participate in the cluster.`,
|
|
Run: runJoin,
|
|
}
|
|
|
|
// Global flags / config paths
|
|
clusterConfigPath string
|
|
nodeName string
|
|
|
|
// Join command flags
|
|
leaderAPI string
|
|
advertiseAddr string
|
|
leaderCACert string
|
|
etcdPeer bool
|
|
)
|
|
|
|
const (
|
|
clusterUIDKey = "/kat/config/cluster_uid"
|
|
clusterConfigKey = "/kat/config/cluster_config" // Stores the JSON of pb.ClusterConfigurationSpec
|
|
defaultNodeName = "kat-node"
|
|
leaderCertCN = "leader.kat.cluster.local" // Common Name for leader certificate
|
|
)
|
|
|
|
func init() {
|
|
initCmd.Flags().StringVar(&clusterConfigPath, "config", "cluster.kat", "Path to the cluster.kat configuration file.")
|
|
// It's good practice for node name to be unique. Hostname is a common default.
|
|
defaultHostName, err := os.Hostname()
|
|
if err != nil {
|
|
defaultHostName = defaultNodeName
|
|
}
|
|
initCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name of this node, used as leader ID if elected.")
|
|
|
|
// Join command flags
|
|
joinCmd.Flags().StringVar(&leaderAPI, "leader-api", "", "Address of the leader API (required, format: host:port)")
|
|
joinCmd.Flags().StringVar(&advertiseAddr, "advertise-address", "", "IP address or interface name to advertise to other nodes (required)")
|
|
joinCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name for this node in the cluster")
|
|
joinCmd.Flags().StringVar(&leaderCACert, "leader-ca-cert", "", "Path to the leader's CA certificate (optional, insecure if not provided)")
|
|
joinCmd.Flags().BoolVar(&etcdPeer, "etcd-peer", false, "Request to join the etcd quorum (optional)")
|
|
|
|
// Mark required flags
|
|
joinCmd.MarkFlagRequired("leader-api")
|
|
joinCmd.MarkFlagRequired("advertise-address")
|
|
|
|
rootCmd.AddCommand(initCmd)
|
|
rootCmd.AddCommand(joinCmd)
|
|
}
|
|
|
|
func runInit(cmd *cobra.Command, args []string) {
|
|
log.Printf("Starting KAT Agent in init mode for node: %s", nodeName)
|
|
|
|
// 1. Parse cluster.kat
|
|
parsedClusterConfig, err := config.ParseClusterConfiguration(clusterConfigPath)
|
|
if err != nil {
|
|
log.Fatalf("Failed to parse cluster configuration from %s: %v", clusterConfigPath, err)
|
|
}
|
|
// SetClusterConfigDefaults is already called within ParseClusterConfiguration in the provided internal/config/parse.go
|
|
// config.SetClusterConfigDefaults(parsedClusterConfig)
|
|
log.Printf("Successfully parsed and applied defaults to cluster configuration: %s", parsedClusterConfig.Metadata.Name)
|
|
|
|
// 1.5. Initialize PKI directory and CA if it doesn't exist
|
|
pkiDir := pki.GetPKIPathFromClusterConfig(parsedClusterConfig.Spec.BackupPath)
|
|
caKeyPath := filepath.Join(pkiDir, "ca.key")
|
|
caCertPath := filepath.Join(pkiDir, "ca.crt")
|
|
|
|
// Check if CA already exists
|
|
_, caKeyErr := os.Stat(caKeyPath)
|
|
_, caCertErr := os.Stat(caCertPath)
|
|
|
|
if os.IsNotExist(caKeyErr) || os.IsNotExist(caCertErr) {
|
|
log.Printf("CA key or certificate not found. Generating new CA in %s", pkiDir)
|
|
if err := pki.GenerateCA(pkiDir, caKeyPath, caCertPath); err != nil {
|
|
log.Fatalf("Failed to generate CA: %v", err)
|
|
}
|
|
log.Println("Successfully generated new CA key and certificate")
|
|
} else {
|
|
log.Println("CA key and certificate already exist, skipping generation")
|
|
}
|
|
|
|
// Prepare etcd embed config
|
|
// For a single node init, this node is the only peer.
|
|
// Client URLs and Peer URLs will be based on its own configuration.
|
|
// Ensure ports are defaulted if not specified (SetClusterConfigDefaults handles this).
|
|
|
|
// Assuming nodeName is resolvable or an IP is used. For simplicity, using localhost for single node.
|
|
// In a multi-node setup, this needs to be the advertised IP.
|
|
// For init, we assume this node is the first and only one.
|
|
clientURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdClientPort)
|
|
peerURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdPeerPort)
|
|
|
|
etcdEmbedCfg := store.EtcdEmbedConfig{
|
|
Name: nodeName, // Etcd member name
|
|
DataDir: filepath.Join(os.Getenv("HOME"), ".kat-agent", nodeName), // Ensure unique data dir
|
|
ClientURLs: []string{clientURL}, // Listen on this for clients
|
|
PeerURLs: []string{peerURL}, // Listen on this for peers
|
|
InitialCluster: fmt.Sprintf("%s=%s", nodeName, peerURL), // For a new cluster, it's just itself
|
|
// ForceNewCluster should be true if we are certain this is a brand new cluster.
|
|
// For simplicity in init, we might not set it and rely on empty data-dir.
|
|
// embed.Config has ForceNewCluster field.
|
|
}
|
|
// Ensure data directory exists
|
|
if err := os.MkdirAll(etcdEmbedCfg.DataDir, 0700); err != nil {
|
|
log.Fatalf("Failed to create etcd data directory %s: %v", etcdEmbedCfg.DataDir, err)
|
|
}
|
|
|
|
// 2. Start embedded etcd server
|
|
log.Printf("Initializing embedded etcd server (name: %s, data-dir: %s)...", etcdEmbedCfg.Name, etcdEmbedCfg.DataDir)
|
|
embeddedEtcd, err := store.StartEmbeddedEtcd(etcdEmbedCfg)
|
|
if err != nil {
|
|
log.Fatalf("Failed to start embedded etcd: %v", err)
|
|
}
|
|
log.Println("Successfully initialized and started embedded etcd.")
|
|
|
|
// 3. Create StateStore client
|
|
// For init, the endpoints point to our own embedded server.
|
|
etcdStore, err := store.NewEtcdStore([]string{clientURL}, embeddedEtcd)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create etcd store client: %v", err)
|
|
}
|
|
defer etcdStore.Close() // Ensure etcd and client are cleaned up on exit
|
|
|
|
// Setup signal handling for graceful shutdown
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
// 4. Campaign for leadership
|
|
leadershipMgr := leader.NewLeadershipManager(
|
|
etcdStore,
|
|
nodeName,
|
|
func(leadershipCtx context.Context) { // OnElected
|
|
log.Printf("Node %s became leader. Performing initial setup.", nodeName)
|
|
// Store Cluster UID
|
|
// Check if UID already exists, perhaps from a previous partial init.
|
|
// For a clean init, we'd expect to write it.
|
|
_, getErr := etcdStore.Get(leadershipCtx, clusterUIDKey)
|
|
if getErr != nil { // Assuming error means not found or other issue
|
|
clusterUID := uuid.New().String()
|
|
err := etcdStore.Put(leadershipCtx, clusterUIDKey, []byte(clusterUID))
|
|
if err != nil {
|
|
log.Printf("Failed to store cluster UID: %v. Continuing...", err)
|
|
// This is critical, should ideally retry or fail.
|
|
} else {
|
|
log.Printf("Stored new Cluster UID: %s", clusterUID)
|
|
}
|
|
} else {
|
|
log.Printf("Cluster UID already exists in etcd. Skipping storage.")
|
|
}
|
|
|
|
// Generate leader's server certificate for mTLS
|
|
leaderKeyPath := filepath.Join(pkiDir, "leader.key")
|
|
leaderCSRPath := filepath.Join(pkiDir, "leader.csr")
|
|
leaderCertPath := filepath.Join(pkiDir, "leader.crt")
|
|
|
|
// Check if leader cert already exists
|
|
_, leaderCertErr := os.Stat(leaderCertPath)
|
|
if os.IsNotExist(leaderCertErr) {
|
|
log.Println("Generating leader server certificate for mTLS")
|
|
|
|
// Generate key and CSR for leader
|
|
if err := pki.GenerateCertificateRequest(leaderCertCN, leaderKeyPath, leaderCSRPath); err != nil {
|
|
log.Printf("Failed to generate leader key and CSR: %v", err)
|
|
} else {
|
|
// Read the CSR file
|
|
_, err := os.ReadFile(leaderCSRPath)
|
|
if err != nil {
|
|
log.Printf("Failed to read leader CSR file: %v", err)
|
|
} else {
|
|
// Sign the CSR with our CA
|
|
if err := pki.SignCertificateRequest(caKeyPath, caCertPath, leaderCSRPath, leaderCertPath, 365*24*time.Hour); err != nil {
|
|
log.Printf("Failed to sign leader CSR: %v", err)
|
|
} else {
|
|
log.Println("Successfully generated and signed leader server certificate")
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
log.Println("Leader certificate already exists, skipping generation")
|
|
}
|
|
|
|
// Store ClusterConfigurationSpec (as JSON)
|
|
// We store Spec because Metadata might change (e.g. resourceVersion)
|
|
// and is more for API object representation.
|
|
specJson, err := protojson.Marshal(parsedClusterConfig.Spec)
|
|
if err != nil {
|
|
log.Printf("Failed to marshal ClusterConfigurationSpec to JSON: %v", err)
|
|
} else {
|
|
err = etcdStore.Put(leadershipCtx, clusterConfigKey, specJson)
|
|
if err != nil {
|
|
log.Printf("Failed to store cluster configuration spec: %v", err)
|
|
} else {
|
|
log.Printf("Stored cluster configuration spec in etcd.")
|
|
log.Printf("Cluster CIDR: %s, Service CIDR: %s, API Port: %d",
|
|
parsedClusterConfig.Spec.ClusterCidr,
|
|
parsedClusterConfig.Spec.ServiceCidr,
|
|
parsedClusterConfig.Spec.ApiPort)
|
|
}
|
|
}
|
|
|
|
// Start API server with mTLS
|
|
log.Println("Starting API server with mTLS...")
|
|
apiAddr := fmt.Sprintf(":%d", parsedClusterConfig.Spec.ApiPort)
|
|
apiServer, err := api.NewServer(apiAddr, leaderCertPath, leaderKeyPath, caCertPath)
|
|
if err != nil {
|
|
log.Printf("Failed to create API server: %v", err)
|
|
} else {
|
|
// Register the join handler
|
|
apiServer.RegisterJoinHandler(func(w http.ResponseWriter, r *http.Request) {
|
|
log.Printf("Received join request from %s", r.RemoteAddr)
|
|
|
|
// Read request body
|
|
var joinReq cli.JoinRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&joinReq); err != nil {
|
|
log.Printf("Error decoding join request: %v", err)
|
|
http.Error(w, "Invalid request format", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Validate request
|
|
if joinReq.NodeName == "" || joinReq.AdvertiseAddr == "" || joinReq.CSRData == "" {
|
|
log.Printf("Invalid join request: missing required fields")
|
|
http.Error(w, "Missing required fields", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
log.Printf("Processing join request for node: %s, advertise address: %s",
|
|
joinReq.NodeName, joinReq.AdvertiseAddr)
|
|
|
|
// Decode CSR data
|
|
csrData, err := base64.StdEncoding.DecodeString(joinReq.CSRData)
|
|
if err != nil {
|
|
log.Printf("Error decoding CSR data: %v", err)
|
|
http.Error(w, "Invalid CSR data", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Create a temporary file for the CSR
|
|
tempCSRFile, err := os.CreateTemp("", "node-csr-*.pem")
|
|
if err != nil {
|
|
log.Printf("Error creating temp CSR file: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer os.Remove(tempCSRFile.Name())
|
|
|
|
// Write CSR data to temp file
|
|
if _, err := tempCSRFile.Write(csrData); err != nil {
|
|
log.Printf("Error writing CSR data to temp file: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
tempCSRFile.Close()
|
|
|
|
// Create a temp file for the signed certificate
|
|
tempCertFile, err := os.CreateTemp("", "node-cert-*.pem")
|
|
if err != nil {
|
|
log.Printf("Error creating temp cert file: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer os.Remove(tempCertFile.Name())
|
|
tempCertFile.Close()
|
|
|
|
// Sign the CSR
|
|
if err := pki.SignCertificateRequest(
|
|
filepath.Join(pkiDir, "ca.key"),
|
|
filepath.Join(pkiDir, "ca.crt"),
|
|
tempCSRFile.Name(),
|
|
tempCertFile.Name(),
|
|
365*24*time.Hour, // 1 year validity
|
|
); err != nil {
|
|
log.Printf("Error signing CSR: %v", err)
|
|
http.Error(w, "Failed to sign certificate", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Read the signed certificate
|
|
signedCert, err := os.ReadFile(tempCertFile.Name())
|
|
if err != nil {
|
|
log.Printf("Error reading signed certificate: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Read the CA certificate
|
|
caCert, err := os.ReadFile(filepath.Join(pkiDir, "ca.crt"))
|
|
if err != nil {
|
|
log.Printf("Error reading CA certificate: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Generate a unique node UID
|
|
nodeUID := uuid.New().String()
|
|
|
|
// Store node registration in etcd (placeholder for now)
|
|
// In a future phase, we'll implement proper node registration with subnet assignment
|
|
|
|
// Create response
|
|
joinResp := cli.JoinResponse{
|
|
NodeName: joinReq.NodeName,
|
|
NodeUID: nodeUID,
|
|
SignedCertificate: base64.StdEncoding.EncodeToString(signedCert),
|
|
CACertificate: base64.StdEncoding.EncodeToString(caCert),
|
|
AssignedSubnet: "10.100.0.0/24", // Placeholder, will be properly implemented in network phase
|
|
}
|
|
|
|
// If etcd peer was requested, add join instructions (placeholder)
|
|
if etcdPeer {
|
|
joinResp.EtcdJoinInstructions = "Etcd peer join not implemented in this phase"
|
|
}
|
|
|
|
// Send response
|
|
w.Header().Set("Content-Type", "application/json")
|
|
if err := json.NewEncoder(w).Encode(joinResp); err != nil {
|
|
log.Printf("Error encoding join response: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
log.Printf("Successfully processed join request for node: %s", joinReq.NodeName)
|
|
})
|
|
|
|
// Start the server in a goroutine
|
|
go func() {
|
|
if err := apiServer.Start(); err != nil && err != http.ErrServerClosed {
|
|
log.Printf("API server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Add a shutdown hook to the leadership context
|
|
go func() {
|
|
<-leadershipCtx.Done()
|
|
log.Println("Leadership lost, shutting down API server...")
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := apiServer.Stop(shutdownCtx); err != nil {
|
|
log.Printf("Error shutting down API server: %v", err)
|
|
}
|
|
}()
|
|
|
|
log.Printf("API server started on port %d with mTLS", parsedClusterConfig.Spec.ApiPort)
|
|
log.Printf("Verification: API server requires client certificates signed by the cluster CA")
|
|
log.Printf("Test with: curl --cacert %s --cert <client_cert> --key <client_key> https://localhost:%d/internal/v1alpha1/join",
|
|
caCertPath, parsedClusterConfig.Spec.ApiPort)
|
|
}
|
|
|
|
log.Println("Initial leader setup complete. Waiting for leadership context to end or agent to be stopped.")
|
|
<-leadershipCtx.Done() // Wait until leadership is lost or context is cancelled by manager
|
|
},
|
|
func() { // OnResigned
|
|
log.Printf("Node %s resigned or lost leadership.", nodeName)
|
|
},
|
|
)
|
|
|
|
// Set lease TTL from cluster.kat or defaults
|
|
if parsedClusterConfig.Spec.AgentTickSeconds > 0 {
|
|
// A common pattern is TTL = 3 * TickInterval
|
|
leaseTTL := int64(parsedClusterConfig.Spec.AgentTickSeconds * 3)
|
|
if leaseTTL < leader.DefaultLeaseTTLSeconds { // Ensure a minimum
|
|
leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds
|
|
} else {
|
|
leadershipMgr.LeaseTTLSeconds = leaseTTL
|
|
}
|
|
} else {
|
|
leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds
|
|
}
|
|
|
|
// Run leadership manager. This will block until ctx is cancelled.
|
|
go leadershipMgr.Run(ctx)
|
|
|
|
// Keep main alive until context is cancelled (e.g. by SIGINT/SIGTERM)
|
|
<-ctx.Done()
|
|
log.Println("KAT Agent init shutting down...")
|
|
|
|
// The defer etcdStore.Close() will handle resigning and stopping etcd.
|
|
// Allow some time for graceful shutdown.
|
|
time.Sleep(1 * time.Second)
|
|
log.Println("KAT Agent init shutdown complete.")
|
|
}
|
|
|
|
func runJoin(cmd *cobra.Command, args []string) {
|
|
log.Printf("Starting KAT Agent in join mode for node: %s", nodeName)
|
|
log.Printf("Attempting to join cluster via leader API: %s", leaderAPI)
|
|
|
|
// Determine PKI directory
|
|
// For simplicity, we'll use a default location
|
|
pkiDir := filepath.Join(os.Getenv("HOME"), ".kat-agent", nodeName, "pki")
|
|
|
|
// Join the cluster
|
|
if err := cli.JoinCluster(leaderAPI, advertiseAddr, nodeName, leaderCACert, pkiDir); err != nil {
|
|
log.Fatalf("Failed to join cluster: %v", err)
|
|
}
|
|
|
|
log.Printf("Successfully joined cluster. Node is ready.")
|
|
// In a real implementation, we would start the agent's main loop here
|
|
// For now, we'll just exit successfully
|
|
}
|
|
|
|
func main() {
|
|
if err := rootCmd.Execute(); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|