265 lines
7.4 KiB
Go
265 lines
7.4 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"runtime"
|
|
"time"
|
|
)
|
|
|
|
// NodeStatus represents the data sent in a heartbeat
|
|
type NodeStatus struct {
|
|
NodeName string `json:"nodeName"`
|
|
NodeUID string `json:"nodeUID"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Resources Resources `json:"resources"`
|
|
Workloads []WorkloadStatus `json:"workloadInstances,omitempty"`
|
|
NetworkInfo NetworkInfo `json:"overlayNetwork"`
|
|
}
|
|
|
|
// Resources represents the node's resource capacity and usage
|
|
type Resources struct {
|
|
Capacity ResourceMetrics `json:"capacity"`
|
|
Allocatable ResourceMetrics `json:"allocatable"`
|
|
}
|
|
|
|
// ResourceMetrics contains CPU and memory metrics
|
|
type ResourceMetrics struct {
|
|
CPU string `json:"cpu"` // e.g., "2000m"
|
|
Memory string `json:"memory"` // e.g., "4096Mi"
|
|
}
|
|
|
|
// WorkloadStatus represents the status of a workload instance
|
|
type WorkloadStatus struct {
|
|
WorkloadName string `json:"workloadName"`
|
|
Namespace string `json:"namespace"`
|
|
InstanceID string `json:"instanceID"`
|
|
ContainerID string `json:"containerID"`
|
|
ImageID string `json:"imageID"`
|
|
State string `json:"state"` // "running", "exited", "paused", "unknown"
|
|
ExitCode int `json:"exitCode"`
|
|
HealthStatus string `json:"healthStatus"` // "healthy", "unhealthy", "pending_check"
|
|
Restarts int `json:"restarts"`
|
|
}
|
|
|
|
// NetworkInfo contains information about the node's overlay network
|
|
type NetworkInfo struct {
|
|
Status string `json:"status"` // "connected", "disconnected", "initializing"
|
|
LastPeerSync string `json:"lastPeerSync"` // timestamp
|
|
}
|
|
|
|
// Agent represents a KAT agent node
|
|
type Agent struct {
|
|
NodeName string
|
|
NodeUID string
|
|
LeaderAPI string
|
|
AdvertiseAddr string
|
|
PKIDir string
|
|
|
|
// mTLS client for leader communication
|
|
client *http.Client
|
|
|
|
// Heartbeat configuration
|
|
heartbeatInterval time.Duration
|
|
stopHeartbeat chan struct{}
|
|
}
|
|
|
|
// NewAgent creates a new Agent instance
|
|
func NewAgent(nodeName, nodeUID, leaderAPI, advertiseAddr, pkiDir string, heartbeatIntervalSeconds int) (*Agent, error) {
|
|
if heartbeatIntervalSeconds <= 0 {
|
|
heartbeatIntervalSeconds = 15 // Default to 15 seconds
|
|
}
|
|
|
|
return &Agent{
|
|
NodeName: nodeName,
|
|
NodeUID: nodeUID,
|
|
LeaderAPI: leaderAPI,
|
|
AdvertiseAddr: advertiseAddr,
|
|
PKIDir: pkiDir,
|
|
heartbeatInterval: time.Duration(heartbeatIntervalSeconds) * time.Second,
|
|
stopHeartbeat: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
// SetupMTLSClient configures the HTTP client with mTLS using the agent's certificates
|
|
func (a *Agent) SetupMTLSClient() error {
|
|
// Load client certificate and key
|
|
cert, err := tls.LoadX509KeyPair(
|
|
fmt.Sprintf("%s/node.crt", a.PKIDir),
|
|
fmt.Sprintf("%s/node.key", a.PKIDir),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load client certificate and key: %w", err)
|
|
}
|
|
|
|
// Load CA certificate
|
|
caCert, err := os.ReadFile(fmt.Sprintf("%s/ca.crt", a.PKIDir))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read CA certificate: %w", err)
|
|
}
|
|
|
|
caCertPool := x509.NewCertPool()
|
|
if !caCertPool.AppendCertsFromPEM(caCert) {
|
|
return fmt.Errorf("failed to append CA certificate to pool")
|
|
}
|
|
|
|
// Create TLS configuration
|
|
tlsConfig := &tls.Config{
|
|
Certificates: []tls.Certificate{cert},
|
|
RootCAs: caCertPool,
|
|
MinVersion: tls.VersionTLS12,
|
|
// Skip hostname verification since we're using IP addresses
|
|
// and the leader cert is issued for leader.kat.cluster.local
|
|
InsecureSkipVerify: true,
|
|
// Custom verification to still validate the certificate chain
|
|
// but ignore the hostname mismatch
|
|
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
|
|
// Skip verification if there are no chains (shouldn't happen with our config)
|
|
if len(verifiedChains) == 0 {
|
|
return fmt.Errorf("no verified chains provided")
|
|
}
|
|
|
|
// The certificate chain was already verified against our CA by the TLS stack
|
|
// We just need to check that the leaf cert was issued by our trusted CA
|
|
// which is already done by the time this callback is called
|
|
return nil
|
|
},
|
|
}
|
|
|
|
// Create HTTP client with TLS configuration
|
|
a.client = &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: tlsConfig,
|
|
},
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartHeartbeat begins sending periodic heartbeats to the leader
|
|
func (a *Agent) StartHeartbeat(ctx context.Context) error {
|
|
if a.client == nil {
|
|
if err := a.SetupMTLSClient(); err != nil {
|
|
return fmt.Errorf("failed to setup mTLS client: %w", err)
|
|
}
|
|
}
|
|
|
|
log.Printf("Starting heartbeat to leader at %s every %v", a.LeaderAPI, a.heartbeatInterval)
|
|
|
|
ticker := time.NewTicker(a.heartbeatInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Send initial heartbeat immediately
|
|
if err := a.sendHeartbeat(); err != nil {
|
|
log.Printf("Initial heartbeat failed: %v", err)
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := a.sendHeartbeat(); err != nil {
|
|
log.Printf("Heartbeat failed: %v", err)
|
|
}
|
|
case <-a.stopHeartbeat:
|
|
log.Printf("Heartbeat stopped")
|
|
return
|
|
case <-ctx.Done():
|
|
log.Printf("Heartbeat context cancelled")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// StopHeartbeat stops the heartbeat goroutine
|
|
func (a *Agent) StopHeartbeat() {
|
|
close(a.stopHeartbeat)
|
|
}
|
|
|
|
// sendHeartbeat sends a single heartbeat to the leader
|
|
func (a *Agent) sendHeartbeat() error {
|
|
// Gather node status
|
|
status := a.gatherNodeStatus()
|
|
|
|
// Marshal to JSON
|
|
statusJSON, err := json.Marshal(status)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal node status: %w", err)
|
|
}
|
|
|
|
// Construct URL
|
|
url := fmt.Sprintf("https://%s/v1alpha1/nodes/%s/status", a.LeaderAPI, a.NodeName)
|
|
|
|
// Create request
|
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(statusJSON))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
// Send request
|
|
resp, err := a.client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send heartbeat: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Check response
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("heartbeat returned non-OK status: %d", resp.StatusCode)
|
|
}
|
|
|
|
log.Printf("Heartbeat sent successfully to %s", url)
|
|
return nil
|
|
}
|
|
|
|
// gatherNodeStatus collects the current node status
|
|
func (a *Agent) gatherNodeStatus() NodeStatus {
|
|
// For now, just provide basic information
|
|
// In future phases, this will include actual resource usage, workload status, etc.
|
|
|
|
// Get basic system info for initial capacity reporting
|
|
var m runtime.MemStats
|
|
runtime.ReadMemStats(&m)
|
|
|
|
// Convert to human-readable format (very simplified for now)
|
|
cpuCapacity := fmt.Sprintf("%dm", runtime.NumCPU() * 1000)
|
|
memCapacity := fmt.Sprintf("%dMi", m.Sys / (1024 * 1024))
|
|
|
|
// For allocatable, we'll just use 90% of capacity for this phase
|
|
cpuAllocatable := fmt.Sprintf("%dm", runtime.NumCPU() * 900)
|
|
memAllocatable := fmt.Sprintf("%dMi", (m.Sys / (1024 * 1024)) * 9 / 10)
|
|
|
|
return NodeStatus{
|
|
NodeName: a.NodeName,
|
|
NodeUID: a.NodeUID,
|
|
Timestamp: time.Now(),
|
|
Resources: Resources{
|
|
Capacity: ResourceMetrics{
|
|
CPU: cpuCapacity,
|
|
Memory: memCapacity,
|
|
},
|
|
Allocatable: ResourceMetrics{
|
|
CPU: cpuAllocatable,
|
|
Memory: memAllocatable,
|
|
},
|
|
},
|
|
NetworkInfo: NetworkInfo{
|
|
Status: "initializing", // Placeholder until network is implemented
|
|
LastPeerSync: time.Now().Format(time.RFC3339),
|
|
},
|
|
// Workloads will be empty for now
|
|
}
|
|
}
|