kat/internal/agent/agent.go

278 lines
7.5 KiB
Go

package agent
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os"
"runtime"
"time"
)
// NodeStatus represents the data sent in a heartbeat
type NodeStatus struct {
NodeName string `json:"nodeName"`
NodeUID string `json:"nodeUID"`
Timestamp time.Time `json:"timestamp"`
Resources Resources `json:"resources"`
Workloads []WorkloadStatus `json:"workloadInstances,omitempty"`
NetworkInfo NetworkInfo `json:"overlayNetwork"`
}
// Resources represents the node's resource capacity and usage
type Resources struct {
Capacity ResourceMetrics `json:"capacity"`
Allocatable ResourceMetrics `json:"allocatable"`
}
// ResourceMetrics contains CPU and memory metrics
type ResourceMetrics struct {
CPU string `json:"cpu"` // e.g., "2000m"
Memory string `json:"memory"` // e.g., "4096Mi"
}
// WorkloadStatus represents the status of a workload instance
type WorkloadStatus struct {
WorkloadName string `json:"workloadName"`
Namespace string `json:"namespace"`
InstanceID string `json:"instanceID"`
ContainerID string `json:"containerID"`
ImageID string `json:"imageID"`
State string `json:"state"` // "running", "exited", "paused", "unknown"
ExitCode int `json:"exitCode"`
HealthStatus string `json:"healthStatus"` // "healthy", "unhealthy", "pending_check"
Restarts int `json:"restarts"`
}
// NetworkInfo contains information about the node's overlay network
type NetworkInfo struct {
Status string `json:"status"` // "connected", "disconnected", "initializing"
LastPeerSync string `json:"lastPeerSync"` // timestamp
}
// Agent represents a KAT agent node
type Agent struct {
NodeName string
NodeUID string
LeaderAPI string
AdvertiseAddr string
PKIDir string
// mTLS client for leader communication
client *http.Client
// Heartbeat configuration
heartbeatInterval time.Duration
stopHeartbeat chan struct{}
}
// NewAgent creates a new Agent instance
func NewAgent(nodeName, nodeUID, leaderAPI, advertiseAddr, pkiDir string, heartbeatIntervalSeconds int) (*Agent, error) {
if heartbeatIntervalSeconds <= 0 {
heartbeatIntervalSeconds = 15 // Default to 15 seconds
}
return &Agent{
NodeName: nodeName,
NodeUID: nodeUID,
LeaderAPI: leaderAPI,
AdvertiseAddr: advertiseAddr,
PKIDir: pkiDir,
heartbeatInterval: time.Duration(heartbeatIntervalSeconds) * time.Second,
stopHeartbeat: make(chan struct{}),
}, nil
}
// SetupMTLSClient configures the HTTP client with mTLS using the agent's certificates
func (a *Agent) SetupMTLSClient() error {
// Load client certificate and key
cert, err := tls.LoadX509KeyPair(
fmt.Sprintf("%s/node.crt", a.PKIDir),
fmt.Sprintf("%s/node.key", a.PKIDir),
)
if err != nil {
return fmt.Errorf("failed to load client certificate and key: %w", err)
}
// Load CA certificate
caCert, err := os.ReadFile(fmt.Sprintf("%s/ca.crt", a.PKIDir))
if err != nil {
return fmt.Errorf("failed to read CA certificate: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return fmt.Errorf("failed to append CA certificate to pool")
}
// Create TLS configuration
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
// Create HTTP client with TLS configuration
a.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
// Override the dial function to map any hostname to the leader's IP
DialTLS: func(network, addr string) (net.Conn, error) {
// Extract host and port from addr
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
// Extract host and port from LeaderAPI
leaderHost, leaderPort, err := net.SplitHostPort(a.LeaderAPI)
if err != nil {
return nil, err
}
// Use the leader's IP but keep the original port
dialAddr := net.JoinHostPort(leaderHost, port)
// For logging purposes
log.Printf("Dialing %s instead of %s", dialAddr, addr)
// Create the TLS connection
conn, err := tls.Dial(network, dialAddr, tlsConfig)
if err != nil {
return nil, err
}
return conn, nil
},
},
Timeout: 10 * time.Second,
}
return nil
}
// StartHeartbeat begins sending periodic heartbeats to the leader
func (a *Agent) StartHeartbeat(ctx context.Context) error {
if a.client == nil {
if err := a.SetupMTLSClient(); err != nil {
return fmt.Errorf("failed to setup mTLS client: %w", err)
}
}
log.Printf("Starting heartbeat to leader at %s every %v", a.LeaderAPI, a.heartbeatInterval)
ticker := time.NewTicker(a.heartbeatInterval)
defer ticker.Stop()
// Send initial heartbeat immediately
if err := a.sendHeartbeat(); err != nil {
log.Printf("Initial heartbeat failed: %v", err)
}
go func() {
for {
select {
case <-ticker.C:
if err := a.sendHeartbeat(); err != nil {
log.Printf("Heartbeat failed: %v", err)
}
case <-a.stopHeartbeat:
log.Printf("Heartbeat stopped")
return
case <-ctx.Done():
log.Printf("Heartbeat context cancelled")
return
}
}
}()
return nil
}
// StopHeartbeat stops the heartbeat goroutine
func (a *Agent) StopHeartbeat() {
close(a.stopHeartbeat)
}
// sendHeartbeat sends a single heartbeat to the leader
func (a *Agent) sendHeartbeat() error {
// Gather node status
status := a.gatherNodeStatus()
// Marshal to JSON
statusJSON, err := json.Marshal(status)
if err != nil {
return fmt.Errorf("failed to marshal node status: %w", err)
}
// Construct URL - use leader.kat.cluster.local as hostname to match certificate
url := fmt.Sprintf("https://leader.kat.cluster.local/v1alpha1/nodes/%s/status", a.NodeName)
// Create request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(statusJSON))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
// Send request
resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send heartbeat: %w", err)
}
defer resp.Body.Close()
// Check response
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("heartbeat returned non-OK status: %d", resp.StatusCode)
}
log.Printf("Heartbeat sent successfully to %s", url)
return nil
}
// gatherNodeStatus collects the current node status
func (a *Agent) gatherNodeStatus() NodeStatus {
// For now, just provide basic information
// In future phases, this will include actual resource usage, workload status, etc.
// Get basic system info for initial capacity reporting
var m runtime.MemStats
runtime.ReadMemStats(&m)
// Convert to human-readable format (very simplified for now)
cpuCapacity := fmt.Sprintf("%dm", runtime.NumCPU() * 1000)
memCapacity := fmt.Sprintf("%dMi", m.Sys / (1024 * 1024))
// For allocatable, we'll just use 90% of capacity for this phase
cpuAllocatable := fmt.Sprintf("%dm", runtime.NumCPU() * 900)
memAllocatable := fmt.Sprintf("%dMi", (m.Sys / (1024 * 1024)) * 9 / 10)
return NodeStatus{
NodeName: a.NodeName,
NodeUID: a.NodeUID,
Timestamp: time.Now(),
Resources: Resources{
Capacity: ResourceMetrics{
CPU: cpuCapacity,
Memory: memCapacity,
},
Allocatable: ResourceMetrics{
CPU: cpuAllocatable,
Memory: memAllocatable,
},
},
NetworkInfo: NetworkInfo{
Status: "initializing", // Placeholder until network is implemented
LastPeerSync: time.Now().Format(time.RFC3339),
},
// Workloads will be empty for now
}
}