package agent import ( "bytes" "context" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "log" "net" "net/http" "os" "runtime" "time" ) // NodeStatus represents the data sent in a heartbeat type NodeStatus struct { NodeName string `json:"nodeName"` NodeUID string `json:"nodeUID"` Timestamp time.Time `json:"timestamp"` Resources Resources `json:"resources"` Workloads []WorkloadStatus `json:"workloadInstances,omitempty"` NetworkInfo NetworkInfo `json:"overlayNetwork"` } // Resources represents the node's resource capacity and usage type Resources struct { Capacity ResourceMetrics `json:"capacity"` Allocatable ResourceMetrics `json:"allocatable"` } // ResourceMetrics contains CPU and memory metrics type ResourceMetrics struct { CPU string `json:"cpu"` // e.g., "2000m" Memory string `json:"memory"` // e.g., "4096Mi" } // WorkloadStatus represents the status of a workload instance type WorkloadStatus struct { WorkloadName string `json:"workloadName"` Namespace string `json:"namespace"` InstanceID string `json:"instanceID"` ContainerID string `json:"containerID"` ImageID string `json:"imageID"` State string `json:"state"` // "running", "exited", "paused", "unknown" ExitCode int `json:"exitCode"` HealthStatus string `json:"healthStatus"` // "healthy", "unhealthy", "pending_check" Restarts int `json:"restarts"` } // NetworkInfo contains information about the node's overlay network type NetworkInfo struct { Status string `json:"status"` // "connected", "disconnected", "initializing" LastPeerSync string `json:"lastPeerSync"` // timestamp } // Agent represents a KAT agent node type Agent struct { NodeName string NodeUID string LeaderAPI string AdvertiseAddr string PKIDir string // mTLS client for leader communication client *http.Client // Heartbeat configuration heartbeatInterval time.Duration stopHeartbeat chan struct{} } // NewAgent creates a new Agent instance func NewAgent(nodeName, nodeUID, leaderAPI, advertiseAddr, pkiDir string, heartbeatIntervalSeconds int) (*Agent, error) { if heartbeatIntervalSeconds <= 0 { heartbeatIntervalSeconds = 15 // Default to 15 seconds } return &Agent{ NodeName: nodeName, NodeUID: nodeUID, LeaderAPI: leaderAPI, AdvertiseAddr: advertiseAddr, PKIDir: pkiDir, heartbeatInterval: time.Duration(heartbeatIntervalSeconds) * time.Second, stopHeartbeat: make(chan struct{}), }, nil } // SetupMTLSClient configures the HTTP client with mTLS using the agent's certificates func (a *Agent) SetupMTLSClient() error { // Load client certificate and key cert, err := tls.LoadX509KeyPair( fmt.Sprintf("%s/node.crt", a.PKIDir), fmt.Sprintf("%s/node.key", a.PKIDir), ) if err != nil { return fmt.Errorf("failed to load client certificate and key: %w", err) } // Load CA certificate caCert, err := os.ReadFile(fmt.Sprintf("%s/ca.crt", a.PKIDir)) if err != nil { return fmt.Errorf("failed to read CA certificate: %w", err) } caCertPool := x509.NewCertPool() if !caCertPool.AppendCertsFromPEM(caCert) { return fmt.Errorf("failed to append CA certificate to pool") } // Create TLS configuration tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, MinVersion: tls.VersionTLS12, } // Create HTTP client with TLS configuration a.client = &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsConfig, // Override the dial function to map any hostname to the leader's IP DialTLS: func(network, addr string) (net.Conn, error) { // Extract host and port from addr _, port, err := net.SplitHostPort(addr) if err != nil { return nil, err } // Extract host and port from LeaderAPI leaderHost, _, err := net.SplitHostPort(a.LeaderAPI) if err != nil { return nil, err } // Use the leader's IP but keep the original port dialAddr := net.JoinHostPort(leaderHost, port) // For logging purposes log.Printf("Dialing %s instead of %s", dialAddr, addr) // Create the TLS connection conn, err := tls.Dial(network, dialAddr, tlsConfig) if err != nil { return nil, err } return conn, nil }, }, Timeout: 10 * time.Second, } return nil } // StartHeartbeat begins sending periodic heartbeats to the leader func (a *Agent) StartHeartbeat(ctx context.Context) error { if a.client == nil { if err := a.SetupMTLSClient(); err != nil { return fmt.Errorf("failed to setup mTLS client: %w", err) } } log.Printf("Starting heartbeat to leader at %s every %v", a.LeaderAPI, a.heartbeatInterval) ticker := time.NewTicker(a.heartbeatInterval) defer ticker.Stop() // Send initial heartbeat immediately if err := a.sendHeartbeat(); err != nil { log.Printf("Initial heartbeat failed: %v", err) } go func() { for { select { case <-ticker.C: if err := a.sendHeartbeat(); err != nil { log.Printf("Heartbeat failed: %v", err) } case <-a.stopHeartbeat: log.Printf("Heartbeat stopped") return case <-ctx.Done(): log.Printf("Heartbeat context cancelled") return } } }() return nil } // StopHeartbeat stops the heartbeat goroutine func (a *Agent) StopHeartbeat() { close(a.stopHeartbeat) } // sendHeartbeat sends a single heartbeat to the leader func (a *Agent) sendHeartbeat() error { // Gather node status status := a.gatherNodeStatus() // Marshal to JSON statusJSON, err := json.Marshal(status) if err != nil { return fmt.Errorf("failed to marshal node status: %w", err) } leaderHost, leaderPort, err := net.SplitHostPort(a.LeaderAPI) if err != nil { return err } // Construct URL - use leader.kat.cluster.local as hostname to match certificate url := fmt.Sprintf("https://%s:%s/v1alpha1/nodes/%s/status", leaderHost, leaderPort, a.NodeName) // Create request req, err := http.NewRequest("POST", url, bytes.NewBuffer(statusJSON)) if err != nil { return fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") // Send request resp, err := a.client.Do(req) if err != nil { return fmt.Errorf("failed to send heartbeat: %w", err) } defer resp.Body.Close() // Check response if resp.StatusCode != http.StatusOK { return fmt.Errorf("heartbeat returned non-OK status: %d", resp.StatusCode) } log.Printf("Heartbeat sent successfully to %s", url) return nil } // gatherNodeStatus collects the current node status func (a *Agent) gatherNodeStatus() NodeStatus { // For now, just provide basic information // In future phases, this will include actual resource usage, workload status, etc. // Get basic system info for initial capacity reporting var m runtime.MemStats runtime.ReadMemStats(&m) // Convert to human-readable format (very simplified for now) cpuCapacity := fmt.Sprintf("%dm", runtime.NumCPU()*1000) memCapacity := fmt.Sprintf("%dMi", m.Sys/(1024*1024)) // For allocatable, we'll just use 90% of capacity for this phase cpuAllocatable := fmt.Sprintf("%dm", runtime.NumCPU()*900) memAllocatable := fmt.Sprintf("%dMi", (m.Sys/(1024*1024))*9/10) return NodeStatus{ NodeName: a.NodeName, NodeUID: a.NodeUID, Timestamp: time.Now(), Resources: Resources{ Capacity: ResourceMetrics{ CPU: cpuCapacity, Memory: memCapacity, }, Allocatable: ResourceMetrics{ CPU: cpuAllocatable, Memory: memAllocatable, }, }, NetworkInfo: NetworkInfo{ Status: "initializing", // Placeholder until network is implemented LastPeerSync: time.Now().Format(time.RFC3339), }, // Workloads will be empty for now } }