From 92fb052594f922478a61e4aa735fd3a5b2db4de0 Mon Sep 17 00:00:00 2001 From: Tanishq Dubey Date: Sun, 18 May 2025 11:35:22 -0400 Subject: [PATCH] more fixes before final part --- Makefile | 2 +- cmd/kat-agent/main.go | 15 ++--- internal/agent/agent.go | 73 +++++++++++++----------- internal/agent/agent_test.go | 4 +- internal/api/node_status_handler_test.go | 2 - 5 files changed, 51 insertions(+), 45 deletions(-) diff --git a/Makefile b/Makefile index d4178c1..17e41b0 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ clean: # Run all tests test: generate @echo "Running all tests..." - @go test -v -count=1 ./... --coverprofile=coverage.out + @go test -v -count=1 ./... --coverprofile=coverage.out --short # Run unit tests only (faster, no integration tests) test-unit: diff --git a/cmd/kat-agent/main.go b/cmd/kat-agent/main.go index 146932c..f6e9510 100644 --- a/cmd/kat-agent/main.go +++ b/cmd/kat-agent/main.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "git.dws.rip/dubey/kat/internal/agent" "git.dws.rip/dubey/kat/internal/api" "git.dws.rip/dubey/kat/internal/cli" "git.dws.rip/dubey/kat/internal/config" @@ -265,7 +266,7 @@ func runInit(cmd *cobra.Command, args []string) { joinHandler := api.NewJoinHandler(etcdStore, caKeyPath, caCertPath) apiServer.RegisterJoinHandler(joinHandler) log.Printf("Registered join handler with CA key: %s, CA cert: %s", caKeyPath, caCertPath) - + // Register the node status handler nodeStatusHandler := api.NewNodeStatusHandler(etcdStore) apiServer.RegisterNodeStatusHandler(nodeStatusHandler) @@ -343,11 +344,11 @@ func runJoin(cmd *cobra.Command, args []string) { } log.Printf("Successfully joined cluster. Node is ready.") - + // Setup signal handling for graceful shutdown ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - + // Create and start the agent with heartbeating agent, err := agent.NewAgent( joinResp.NodeName, @@ -360,19 +361,19 @@ func runJoin(cmd *cobra.Command, args []string) { if err != nil { log.Fatalf("Failed to create agent: %v", err) } - + // Setup mTLS client if err := agent.SetupMTLSClient(); err != nil { log.Fatalf("Failed to setup mTLS client: %v", err) } - + // Start heartbeating if err := agent.StartHeartbeat(ctx); err != nil { log.Fatalf("Failed to start heartbeat: %v", err) } - + log.Printf("Node %s is now running with heartbeat. Press Ctrl+C to exit.", nodeName) - + // Wait for shutdown signal <-ctx.Done() log.Println("Received shutdown signal. Stopping heartbeat...") diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 17802ca..48dd486 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -17,12 +17,12 @@ import ( // NodeStatus represents the data sent in a heartbeat type NodeStatus struct { - NodeName string `json:"nodeName"` - NodeUID string `json:"nodeUID"` - Timestamp time.Time `json:"timestamp"` - Resources Resources `json:"resources"` + NodeName string `json:"nodeName"` + NodeUID string `json:"nodeUID"` + Timestamp time.Time `json:"timestamp"` + Resources Resources `json:"resources"` Workloads []WorkloadStatus `json:"workloadInstances,omitempty"` - NetworkInfo NetworkInfo `json:"overlayNetwork"` + NetworkInfo NetworkInfo `json:"overlayNetwork"` } // Resources represents the node's resource capacity and usage @@ -39,15 +39,15 @@ type ResourceMetrics struct { // WorkloadStatus represents the status of a workload instance type WorkloadStatus struct { - WorkloadName string `json:"workloadName"` - Namespace string `json:"namespace"` - InstanceID string `json:"instanceID"` - ContainerID string `json:"containerID"` - ImageID string `json:"imageID"` - State string `json:"state"` // "running", "exited", "paused", "unknown" - ExitCode int `json:"exitCode"` - HealthStatus string `json:"healthStatus"` // "healthy", "unhealthy", "pending_check" - Restarts int `json:"restarts"` + WorkloadName string `json:"workloadName"` + Namespace string `json:"namespace"` + InstanceID string `json:"instanceID"` + ContainerID string `json:"containerID"` + ImageID string `json:"imageID"` + State string `json:"state"` // "running", "exited", "paused", "unknown" + ExitCode int `json:"exitCode"` + HealthStatus string `json:"healthStatus"` // "healthy", "unhealthy", "pending_check" + Restarts int `json:"restarts"` } // NetworkInfo contains information about the node's overlay network @@ -63,10 +63,10 @@ type Agent struct { LeaderAPI string AdvertiseAddr string PKIDir string - + // mTLS client for leader communication - client *http.Client - + client *http.Client + // Heartbeat configuration heartbeatInterval time.Duration stopHeartbeat chan struct{} @@ -77,7 +77,7 @@ func NewAgent(nodeName, nodeUID, leaderAPI, advertiseAddr, pkiDir string, heartb if heartbeatIntervalSeconds <= 0 { heartbeatIntervalSeconds = 15 // Default to 15 seconds } - + return &Agent{ NodeName: nodeName, NodeUID: nodeUID, @@ -125,29 +125,29 @@ func (a *Agent) SetupMTLSClient() error { // Override the dial function to map any hostname to the leader's IP DialTLS: func(network, addr string) (net.Conn, error) { // Extract host and port from addr - host, port, err := net.SplitHostPort(addr) + _, port, err := net.SplitHostPort(addr) if err != nil { return nil, err } - + // Extract host and port from LeaderAPI - leaderHost, leaderPort, err := net.SplitHostPort(a.LeaderAPI) + leaderHost, _, err := net.SplitHostPort(a.LeaderAPI) if err != nil { return nil, err } - + // Use the leader's IP but keep the original port dialAddr := net.JoinHostPort(leaderHost, port) - + // For logging purposes log.Printf("Dialing %s instead of %s", dialAddr, addr) - + // Create the TLS connection conn, err := tls.Dial(network, dialAddr, tlsConfig) if err != nil { return nil, err } - + return conn, nil }, }, @@ -211,8 +211,13 @@ func (a *Agent) sendHeartbeat() error { return fmt.Errorf("failed to marshal node status: %w", err) } + leaderHost, leaderPort, err := net.SplitHostPort(a.LeaderAPI) + if err != nil { + return err + } + // Construct URL - use leader.kat.cluster.local as hostname to match certificate - url := fmt.Sprintf("https://leader.kat.cluster.local/v1alpha1/nodes/%s/status", a.NodeName) + url := fmt.Sprintf("https://%s:%s/v1alpha1/nodes/%s/status", leaderHost, leaderPort, a.NodeName) // Create request req, err := http.NewRequest("POST", url, bytes.NewBuffer(statusJSON)) @@ -241,19 +246,19 @@ func (a *Agent) sendHeartbeat() error { func (a *Agent) gatherNodeStatus() NodeStatus { // For now, just provide basic information // In future phases, this will include actual resource usage, workload status, etc. - + // Get basic system info for initial capacity reporting var m runtime.MemStats runtime.ReadMemStats(&m) - + // Convert to human-readable format (very simplified for now) - cpuCapacity := fmt.Sprintf("%dm", runtime.NumCPU() * 1000) - memCapacity := fmt.Sprintf("%dMi", m.Sys / (1024 * 1024)) - + cpuCapacity := fmt.Sprintf("%dm", runtime.NumCPU()*1000) + memCapacity := fmt.Sprintf("%dMi", m.Sys/(1024*1024)) + // For allocatable, we'll just use 90% of capacity for this phase - cpuAllocatable := fmt.Sprintf("%dm", runtime.NumCPU() * 900) - memAllocatable := fmt.Sprintf("%dMi", (m.Sys / (1024 * 1024)) * 9 / 10) - + cpuAllocatable := fmt.Sprintf("%dm", runtime.NumCPU()*900) + memAllocatable := fmt.Sprintf("%dMi", (m.Sys/(1024*1024))*9/10) + return NodeStatus{ NodeName: a.NodeName, NodeUID: a.NodeUID, diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 263f05c..205bae9 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "crypto/x509/pkix" + "git.dws.rip/dubey/kat/internal/pki" ) @@ -98,7 +100,7 @@ func TestAgentHeartbeat(t *testing.T) { t.Fatalf("Failed to read CA certificate: %v", err) } server.TLS.ClientCAs.AppendCertsFromPEM(caCertData) - + // Set the server certificate to use the test node name as CN // to match what our test agent will expect server.TLS.Certificates = []tls.Certificate{ diff --git a/internal/api/node_status_handler_test.go b/internal/api/node_status_handler_test.go index 875fb1d..681cdbe 100644 --- a/internal/api/node_status_handler_test.go +++ b/internal/api/node_status_handler_test.go @@ -2,14 +2,12 @@ package api import ( "bytes" - "context" "encoding/json" "net/http" "net/http/httptest" "testing" "time" - "git.dws.rip/dubey/kat/internal/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" )