kat/cmd/kat-agent/main.go
Tanishq Dubey 58bdca5703
All checks were successful
Unit Tests / unit-tests (push) Successful in 9m54s
Integration Tests / integration-tests (push) Successful in 10m0s
Implement Phase 1 of KAT (#1)
**Phase 1: State Management & Leader Election**
*   **Goal**: A functional embedded etcd and leader election mechanism.
*   **Tasks**:
    1.  Implement the `StateStore` interface (RFC 5.1) with an etcd backend (`internal/store/etcd.go`).
    2.  Integrate embedded etcd server into `kat-agent` (RFC 2.2, 5.2), configurable via `cluster.kat` parameters.
    3.  Implement leader election using `go.etcd.io/etcd/client/v3/concurrency` (RFC 5.3).
    4.  Basic `kat-agent init` functionality:
        *   Parse `cluster.kat`.
        *   Start single-node embedded etcd.
        *   Campaign for and become leader.
        *   Store initial cluster configuration (UID, CIDRs from `cluster.kat`) in etcd.
*   **Milestone**:
    *   A single `kat-agent init --config cluster.kat` process starts, initializes etcd, and logs that it has become the leader.
    *   The cluster configuration from `cluster.kat` can be verified in etcd using an etcd client.
    *   `StateStore` interface methods (`Put`, `Get`, `Delete`, `List`) are testable against the embedded etcd.

Reviewed-on: #1
2025-05-16 20:19:25 -04:00

199 lines
7.6 KiB
Go

package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"git.dws.rip/dubey/kat/internal/config"
"git.dws.rip/dubey/kat/internal/leader"
"git.dws.rip/dubey/kat/internal/store"
"github.com/google/uuid"
"github.com/spf13/cobra"
"google.golang.org/protobuf/encoding/protojson"
)
var (
rootCmd = &cobra.Command{
Use: "kat-agent",
Short: "KAT Agent manages workloads on a node and participates in the cluster.",
Long: `The KAT Agent is responsible for running and managing containerized workloads
as instructed by the KAT Leader. It also participates in leader election if configured.`,
}
initCmd = &cobra.Command{
Use: "init",
Short: "Initializes a new KAT cluster or a leader node.",
Long: `Parses a cluster.kat configuration file, starts an embedded etcd server (for the first node),
campaigns for leadership, and stores initial cluster configuration.`,
Run: runInit,
}
// Global flags / config paths
clusterConfigPath string
nodeName string
)
const (
clusterUIDKey = "/kat/config/cluster_uid"
clusterConfigKey = "/kat/config/cluster_config" // Stores the JSON of pb.ClusterConfigurationSpec
defaultNodeName = "kat-node"
)
func init() {
initCmd.Flags().StringVar(&clusterConfigPath, "config", "cluster.kat", "Path to the cluster.kat configuration file.")
// It's good practice for node name to be unique. Hostname is a common default.
defaultHostName, err := os.Hostname()
if err != nil {
defaultHostName = defaultNodeName
}
initCmd.Flags().StringVar(&nodeName, "node-name", defaultHostName, "Name of this node, used as leader ID if elected.")
rootCmd.AddCommand(initCmd)
}
func runInit(cmd *cobra.Command, args []string) {
log.Printf("Starting KAT Agent in init mode for node: %s", nodeName)
// 1. Parse cluster.kat
parsedClusterConfig, err := config.ParseClusterConfiguration(clusterConfigPath)
if err != nil {
log.Fatalf("Failed to parse cluster configuration from %s: %v", clusterConfigPath, err)
}
// SetClusterConfigDefaults is already called within ParseClusterConfiguration in the provided internal/config/parse.go
// config.SetClusterConfigDefaults(parsedClusterConfig)
log.Printf("Successfully parsed and applied defaults to cluster configuration: %s", parsedClusterConfig.Metadata.Name)
// Prepare etcd embed config
// For a single node init, this node is the only peer.
// Client URLs and Peer URLs will be based on its own configuration.
// Ensure ports are defaulted if not specified (SetClusterConfigDefaults handles this).
// Assuming nodeName is resolvable or an IP is used. For simplicity, using localhost for single node.
// In a multi-node setup, this needs to be the advertised IP.
// For init, we assume this node is the first and only one.
clientURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdClientPort)
peerURL := fmt.Sprintf("http://localhost:%d", parsedClusterConfig.Spec.EtcdPeerPort)
etcdEmbedCfg := store.EtcdEmbedConfig{
Name: nodeName, // Etcd member name
DataDir: filepath.Join(os.Getenv("HOME"), ".kat-agent", nodeName), // Ensure unique data dir
ClientURLs: []string{clientURL}, // Listen on this for clients
PeerURLs: []string{peerURL}, // Listen on this for peers
InitialCluster: fmt.Sprintf("%s=%s", nodeName, peerURL), // For a new cluster, it's just itself
// ForceNewCluster should be true if we are certain this is a brand new cluster.
// For simplicity in init, we might not set it and rely on empty data-dir.
// embed.Config has ForceNewCluster field.
}
// Ensure data directory exists
if err := os.MkdirAll(etcdEmbedCfg.DataDir, 0700); err != nil {
log.Fatalf("Failed to create etcd data directory %s: %v", etcdEmbedCfg.DataDir, err)
}
// 2. Start embedded etcd server
log.Printf("Initializing embedded etcd server (name: %s, data-dir: %s)...", etcdEmbedCfg.Name, etcdEmbedCfg.DataDir)
embeddedEtcd, err := store.StartEmbeddedEtcd(etcdEmbedCfg)
if err != nil {
log.Fatalf("Failed to start embedded etcd: %v", err)
}
log.Println("Successfully initialized and started embedded etcd.")
// 3. Create StateStore client
// For init, the endpoints point to our own embedded server.
etcdStore, err := store.NewEtcdStore([]string{clientURL}, embeddedEtcd)
if err != nil {
log.Fatalf("Failed to create etcd store client: %v", err)
}
defer etcdStore.Close() // Ensure etcd and client are cleaned up on exit
// Setup signal handling for graceful shutdown
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// 4. Campaign for leadership
leadershipMgr := leader.NewLeadershipManager(
etcdStore,
nodeName,
func(leadershipCtx context.Context) { // OnElected
log.Printf("Node %s became leader. Performing initial setup.", nodeName)
// Store Cluster UID
// Check if UID already exists, perhaps from a previous partial init.
// For a clean init, we'd expect to write it.
_, getErr := etcdStore.Get(leadershipCtx, clusterUIDKey)
if getErr != nil { // Assuming error means not found or other issue
clusterUID := uuid.New().String()
err := etcdStore.Put(leadershipCtx, clusterUIDKey, []byte(clusterUID))
if err != nil {
log.Printf("Failed to store cluster UID: %v. Continuing...", err)
// This is critical, should ideally retry or fail.
} else {
log.Printf("Stored new Cluster UID: %s", clusterUID)
}
} else {
log.Printf("Cluster UID already exists in etcd. Skipping storage.")
}
// Store ClusterConfigurationSpec (as JSON)
// We store Spec because Metadata might change (e.g. resourceVersion)
// and is more for API object representation.
specJson, err := protojson.Marshal(parsedClusterConfig.Spec)
if err != nil {
log.Printf("Failed to marshal ClusterConfigurationSpec to JSON: %v", err)
} else {
err = etcdStore.Put(leadershipCtx, clusterConfigKey, specJson)
if err != nil {
log.Printf("Failed to store cluster configuration spec: %v", err)
} else {
log.Printf("Stored cluster configuration spec in etcd.")
log.Printf("Cluster CIDR: %s, Service CIDR: %s, API Port: %d",
parsedClusterConfig.Spec.ClusterCidr,
parsedClusterConfig.Spec.ServiceCidr,
parsedClusterConfig.Spec.ApiPort)
}
}
log.Println("Initial leader setup complete. Waiting for leadership context to end or agent to be stopped.")
<-leadershipCtx.Done() // Wait until leadership is lost or context is cancelled by manager
},
func() { // OnResigned
log.Printf("Node %s resigned or lost leadership.", nodeName)
},
)
// Set lease TTL from cluster.kat or defaults
if parsedClusterConfig.Spec.AgentTickSeconds > 0 {
// A common pattern is TTL = 3 * TickInterval
leaseTTL := int64(parsedClusterConfig.Spec.AgentTickSeconds * 3)
if leaseTTL < leader.DefaultLeaseTTLSeconds { // Ensure a minimum
leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds
} else {
leadershipMgr.LeaseTTLSeconds = leaseTTL
}
} else {
leadershipMgr.LeaseTTLSeconds = leader.DefaultLeaseTTLSeconds
}
// Run leadership manager. This will block until ctx is cancelled.
go leadershipMgr.Run(ctx)
// Keep main alive until context is cancelled (e.g. by SIGINT/SIGTERM)
<-ctx.Done()
log.Println("KAT Agent init shutting down...")
// The defer etcdStore.Close() will handle resigning and stopping etcd.
// Allow some time for graceful shutdown.
time.Sleep(1 * time.Second)
log.Println("KAT Agent init shutdown complete.")
}
func main() {
if err := rootCmd.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}