kat/internal/config/parse.go
Tanishq Dubey 58bdca5703
All checks were successful
Unit Tests / unit-tests (push) Successful in 9m54s
Integration Tests / integration-tests (push) Successful in 10m0s
Implement Phase 1 of KAT (#1)
**Phase 1: State Management & Leader Election**
*   **Goal**: A functional embedded etcd and leader election mechanism.
*   **Tasks**:
    1.  Implement the `StateStore` interface (RFC 5.1) with an etcd backend (`internal/store/etcd.go`).
    2.  Integrate embedded etcd server into `kat-agent` (RFC 2.2, 5.2), configurable via `cluster.kat` parameters.
    3.  Implement leader election using `go.etcd.io/etcd/client/v3/concurrency` (RFC 5.3).
    4.  Basic `kat-agent init` functionality:
        *   Parse `cluster.kat`.
        *   Start single-node embedded etcd.
        *   Campaign for and become leader.
        *   Store initial cluster configuration (UID, CIDRs from `cluster.kat`) in etcd.
*   **Milestone**:
    *   A single `kat-agent init --config cluster.kat` process starts, initializes etcd, and logs that it has become the leader.
    *   The cluster configuration from `cluster.kat` can be verified in etcd using an etcd client.
    *   `StateStore` interface methods (`Put`, `Get`, `Delete`, `List`) are testable against the embedded etcd.

Reviewed-on: #1
2025-05-16 20:19:25 -04:00

328 lines
11 KiB
Go

// File: internal/config/parse.go
package config
import (
"fmt"
"net"
"os"
pb "git.dws.rip/dubey/kat/api/v1alpha1"
"github.com/davecgh/go-spew/spew"
"gopkg.in/yaml.v3"
"encoding/json"
)
var _ = yaml.Unmarshal // Used for Quadlet parsing
// ParseClusterConfiguration reads, unmarshals, and validates a cluster.kat file.
func ParseClusterConfiguration(filePath string) (*pb.ClusterConfiguration, error) {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil, fmt.Errorf("cluster configuration file not found: %s", filePath)
}
yamlFile, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read cluster configuration file %s: %w", filePath, err)
}
var config pb.ClusterConfiguration
// We expect the YAML to have top-level keys like 'apiVersion', 'kind', 'metadata', 'spec'
// but our proto is just the ClusterConfiguration message.
// So, we'll unmarshal into a temporary map to extract the 'spec' and 'metadata'.
var rawConfigMap map[string]interface{}
if err = yaml.Unmarshal(yamlFile, &rawConfigMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML from %s: %w", filePath, err)
}
// Quick check for kind
kind, ok := rawConfigMap["kind"].(string)
if !ok || kind != "ClusterConfiguration" {
return nil, fmt.Errorf("invalid kind in %s: expected ClusterConfiguration, got %v", filePath, rawConfigMap["kind"])
}
metadataMap, ok := rawConfigMap["metadata"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("metadata section not found or invalid in %s", filePath)
}
metadataBytes, err := json.Marshal(metadataMap)
if err != nil {
return nil, fmt.Errorf("failed to re-marshal metadata: %w", err)
}
if err = json.Unmarshal(metadataBytes, &config.Metadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata into proto: %w", err)
}
specMap, ok := rawConfigMap["spec"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("spec section not found or invalid in %s", filePath)
}
specBytes, err := json.Marshal(specMap)
if err != nil {
return nil, fmt.Errorf("failed to re-marshal spec: %w", err)
}
if err = json.Unmarshal(specBytes, &config.Spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal spec into proto: %w", err)
}
spew.Dump(&config) // For debugging, remove in production
SetClusterConfigDefaults(&config)
if err := ValidateClusterConfiguration(&config); err != nil {
return nil, fmt.Errorf("invalid cluster configuration in %s: %w", filePath, err)
}
return &config, nil
}
// SetClusterConfigDefaults applies default values to the ClusterConfiguration spec.
func SetClusterConfigDefaults(config *pb.ClusterConfiguration) {
if config.Spec == nil {
config.Spec = &pb.ClusterConfigurationSpec{}
}
s := config.Spec
if s.ClusterDomain == "" {
s.ClusterDomain = DefaultClusterDomain
}
if s.AgentPort == 0 {
s.AgentPort = DefaultAgentPort
}
if s.ApiPort == 0 {
s.ApiPort = DefaultApiPort
}
if s.EtcdPeerPort == 0 {
s.EtcdPeerPort = DefaultEtcdPeerPort
}
if s.EtcdClientPort == 0 {
s.EtcdClientPort = DefaultEtcdClientPort
}
if s.VolumeBasePath == "" {
s.VolumeBasePath = DefaultVolumeBasePath
}
if s.BackupPath == "" {
s.BackupPath = DefaultBackupPath
}
if s.BackupIntervalMinutes == 0 {
s.BackupIntervalMinutes = DefaultBackupIntervalMins
}
if s.AgentTickSeconds == 0 {
s.AgentTickSeconds = DefaultAgentTickSeconds
}
if s.NodeLossTimeoutSeconds == 0 {
s.NodeLossTimeoutSeconds = DefaultNodeLossTimeoutSec
if s.AgentTickSeconds > 0 { // If agent tick is set, derive from it
s.NodeLossTimeoutSeconds = s.AgentTickSeconds * 4 // Example: 4 ticks
}
}
if s.NodeSubnetBits == 0 {
s.NodeSubnetBits = DefaultNodeSubnetBits
}
}
// ValidateClusterConfiguration performs basic validation on the ClusterConfiguration.
func ValidateClusterConfiguration(config *pb.ClusterConfiguration) error {
if config.Metadata == nil || config.Metadata.Name == "" {
return fmt.Errorf("metadata.name is required")
}
if config.Spec == nil {
return fmt.Errorf("spec is required")
}
s := config.Spec
if s.ClusterCidr == "" {
return fmt.Errorf("spec.clusterCIDR is required")
}
if _, _, err := net.ParseCIDR(s.ClusterCidr); err != nil {
return fmt.Errorf("invalid spec.clusterCIDR %s: %w", s.ClusterCidr, err)
}
if s.ServiceCidr == "" {
return fmt.Errorf("spec.serviceCIDR is required")
}
if _, _, err := net.ParseCIDR(s.ServiceCidr); err != nil {
return fmt.Errorf("invalid spec.serviceCIDR %s: %w", s.ServiceCidr, err)
}
// Validate ports
ports := []struct {
name string
port int32
}{
{"agentPort", s.AgentPort}, {"apiPort", s.ApiPort},
{"etcdPeerPort", s.EtcdPeerPort}, {"etcdClientPort", s.EtcdClientPort},
}
for _, p := range ports {
if p.port <= 0 || p.port > 65535 {
return fmt.Errorf("invalid port for %s: %d. Must be between 1 and 65535", p.name, p.port)
}
}
// Check for port conflicts among configured ports
portSet := make(map[int32]string)
for _, p := range ports {
if existing, found := portSet[p.port]; found {
return fmt.Errorf("port conflict: %s (%d) and %s (%d) use the same port", p.name, p.port, existing, p.port)
}
portSet[p.port] = p.name
}
if s.NodeSubnetBits <= 0 || s.NodeSubnetBits >= 32 {
return fmt.Errorf("invalid spec.nodeSubnetBits: %d. Must be > 0 and < 32", s.NodeSubnetBits)
}
// Validate nodeSubnetBits against clusterCIDR prefix length
_, clusterNet, _ := net.ParseCIDR(s.ClusterCidr)
clusterPrefixLen, _ := clusterNet.Mask.Size()
if int(s.NodeSubnetBits) <= clusterPrefixLen {
// This logic might be too simple. NodeSubnetBits is the number of *additional* bits for the subnet *within* the cluster prefix.
// So, the resulting node subnet prefix length would be clusterPrefixLen + s.NodeSubnetBits.
// This must be less than 32 (or 31 for usable IPs).
// The RFC states: "Default 7 (yielding /23 subnets if clusterCIDR=/16)"
// So if clusterCIDR is /16, node subnet is / (16+7) = /23. This is valid.
// A node subnet prefix length must be > clusterPrefixLen and < 32.
if (clusterPrefixLen + int(s.NodeSubnetBits)) >= 32 {
return fmt.Errorf("spec.nodeSubnetBits (%d) combined with clusterCIDR prefix (%d) results in an invalid subnet size (>= /32)", s.NodeSubnetBits, clusterPrefixLen)
}
} else {
// This case seems unlikely if nodeSubnetBits is the number of bits for the node part.
// Let's assume nodeSubnetBits is the number of bits *after* the cluster prefix that define the node subnet.
// e.g. cluster 10.0.0.0/8, nodeSubnetBits=8 -> node subnets are /16.
}
if s.BackupIntervalMinutes < 0 { // 0 could mean disabled, but RFC implies positive
return fmt.Errorf("spec.backupIntervalMinutes must be non-negative")
}
if s.AgentTickSeconds <= 0 {
return fmt.Errorf("spec.agentTickSeconds must be positive")
}
if s.NodeLossTimeoutSeconds <= 0 {
return fmt.Errorf("spec.nodeLossTimeoutSeconds must be positive")
}
if s.NodeLossTimeoutSeconds < s.AgentTickSeconds {
return fmt.Errorf("spec.nodeLossTimeoutSeconds must be greater than or equal to spec.agentTickSeconds")
}
// volumeBasePath and backupPath should be absolute paths, but validation can be tricky
// For now, just check if they are non-empty if specified, defaults handle empty.
// A more robust check would be filepath.IsAbs()
return nil
}
// ParsedQuadletFiles holds the structured data from a Quadlet directory.
type ParsedQuadletFiles struct {
Workload *pb.Workload
VirtualLoadBalancer *pb.VirtualLoadBalancer
JobDefinition *pb.JobDefinition
BuildDefinition *pb.BuildDefinition
// Namespace is typically a cluster-level resource, not part of a workload quadlet bundle.
// If it were, it would be: Namespace *pb.Namespace
// Store raw file contents for potential future use (e.g. annotations, original source)
RawFiles map[string][]byte
}
// ParseQuadletFile unmarshals a single Quadlet file content based on its kind.
// It returns the specific protobuf message.
func ParseQuadletFile(fileName string, content []byte) (interface{}, error) {
var base struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
}
if err := yaml.Unmarshal(content, &base); err != nil {
return nil, fmt.Errorf("failed to unmarshal base YAML from %s to determine kind: %w", fileName, err)
}
// TODO: Check apiVersion, e.g., base.ApiVersion == "kat.dws.rip/v1alpha1"
var resource interface{}
var err error
switch base.Kind {
case "Workload":
var wl pb.Workload
// Similar to ClusterConfiguration, need to unmarshal metadata and spec separately
// from a temporary map if the proto doesn't match the full YAML structure directly.
// For simplicity in Phase 0, assuming direct unmarshal works if YAML matches proto structure.
// If YAML has apiVersion/kind/metadata/spec at top level, then:
var raw map[string]interface{}
if err = yaml.Unmarshal(content, &raw); err == nil {
if meta, ok := raw["metadata"]; ok {
metaBytes, _ := yaml.Marshal(meta)
yaml.Unmarshal(metaBytes, &wl.Metadata)
}
if spec, ok := raw["spec"]; ok {
specBytes, _ := yaml.Marshal(spec)
yaml.Unmarshal(specBytes, &wl.Spec)
}
}
resource = &wl
case "VirtualLoadBalancer":
var vlb pb.VirtualLoadBalancer
var raw map[string]interface{}
if err = yaml.Unmarshal(content, &raw); err == nil {
if meta, ok := raw["metadata"]; ok {
metaBytes, _ := yaml.Marshal(meta)
yaml.Unmarshal(metaBytes, &vlb.Metadata)
}
if spec, ok := raw["spec"]; ok {
specBytes, _ := yaml.Marshal(spec)
yaml.Unmarshal(specBytes, &vlb.Spec)
}
}
resource = &vlb
// Add cases for JobDefinition, BuildDefinition as they are defined
case "JobDefinition":
var jd pb.JobDefinition
// ... unmarshal logic ...
resource = &jd
case "BuildDefinition":
var bd pb.BuildDefinition
// ... unmarshal logic ...
resource = &bd
default:
return nil, fmt.Errorf("unknown Kind '%s' in file %s", base.Kind, fileName)
}
if err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML for Kind '%s' from %s: %w", base.Kind, fileName, err)
}
// TODO: Add basic validation for each parsed type (e.g., required fields like metadata.name)
return resource, nil
}
// ParseQuadletDirectory processes a map of file contents (from UntarQuadlets).
func ParseQuadletDirectory(files map[string][]byte) (*ParsedQuadletFiles, error) {
parsed := &ParsedQuadletFiles{RawFiles: files}
for fileName, content := range files {
obj, err := ParseQuadletFile(fileName, content)
if err != nil {
return nil, fmt.Errorf("error parsing quadlet file %s: %w", fileName, err)
}
switch v := obj.(type) {
case *pb.Workload:
if parsed.Workload != nil {
return nil, fmt.Errorf("multiple Workload definitions found")
}
parsed.Workload = v
case *pb.VirtualLoadBalancer:
if parsed.VirtualLoadBalancer != nil {
return nil, fmt.Errorf("multiple VirtualLoadBalancer definitions found")
}
parsed.VirtualLoadBalancer = v
// Add cases for JobDefinition, BuildDefinition
}
}
// TODO: Perform cross-Quadlet file validation (e.g., workload.kat must exist)
if parsed.Workload == nil {
return nil, fmt.Errorf("required Workload definition (workload.kat) not found in Quadlet bundle")
}
if parsed.Workload.Metadata == nil || parsed.Workload.Metadata.Name == "" {
return nil, fmt.Errorf("workload.kat must have metadata.name defined")
}
return parsed, nil
}