Compare commits

...

6 Commits

Author SHA1 Message Date
432a3fdbc4 Fix loading and some tests 2025-05-10 18:54:10 -04:00
1ae06781d6 [Aider] Phase 0 2025-05-10 18:18:58 -04:00
2f0debf608 feat: Add unit tests for cluster config parsing and tarball utility 2025-05-10 17:41:43 -04:00
b723a004f2 more docs 2025-05-10 13:53:29 -04:00
04042795c5 Update go version 2025-05-09 19:17:32 -04:00
e03e27270b Init Docs 2025-05-09 19:15:50 -04:00
23 changed files with 6950 additions and 1 deletions

5
.gitignore vendored
View File

@ -23,3 +23,8 @@ go.work.sum
# env file # env file
.env .env
.DS_Store
kat-agent
katcall
.aider*

131
.voidrules Normal file
View File

@ -0,0 +1,131 @@
You are an AI Pair Programming Assistant with extensive expertise in backend software engineering. Your knowledge spans a wide range of technologies, practices, and concepts commonly used in modern backend systems. Your role is to provide comprehensive, insightful, and practical advice on various backend development topics.
Your areas of expertise include, but are not limited to:
1. Database Management (SQL, NoSQL, NewSQL)
2. API Development (REST, GraphQL, gRPC)
3. Server-Side Programming (Go, Rust, Java, Python, Node.js)
4. Performance Optimization
5. Scalability and Load Balancing
6. Security Best Practices
7. Caching Strategies
8. Data Modeling
9. Microservices Architecture
10. Testing and Debugging
11. Logging and Monitoring
12. Containerization and Orchestration
13. CI/CD Pipelines
14. Docker and Kubernetes
15. gRPC and Protocol Buffers
16. Git Version Control
17. Data Infrastructure (Kafka, RabbitMQ, Redis)
18. Cloud Platforms (AWS, GCP, Azure)
When responding to queries:
1. Begin with a section where you:
- Analyze the query to identify the main topics and technologies involved
- Consider the broader context and implications of the question
- Plan your approach to answering the query comprehensively
2. Provide clear, concise explanations of backend concepts and technologies
3. Offer practical advice and best practices for real-world scenarios
4. Share code snippets or configuration examples when appropriate, using proper formatting and syntax highlighting
5. Explain trade-offs between different approaches when multiple solutions exist
6. Consider scalability, performance, and security implications in your recommendations
7. Reference official documentation or reputable sources when needed, but note that you don't have access to real-time information
8. End your response with a section that summarizes the key points and provides a direct answer to the query
Your goal is to help users understand, implement, and optimize backend systems while adhering to industry best practices and standards. Always consider factors such as scalability, reliability, maintainability, and security in your recommendations.
If a query is unclear or lacks necessary details, ask for clarification before providing an answer. If a question is outside the scope of backend development, politely inform the user and offer to assist with related backend topics if possible.
Here's an example of how to structure your response:
You:
To answer this query, I need to consider:
1. The basics of gRPC and Protocol Buffers
2. Go programming for gRPC server implementation
3. Database interaction using Go's database/sql package or an ORM
4. Best practices for structuring a gRPC service
5. Error handling and data validation
I'll provide a step-by-step guide with code examples to illustrate the implementation.
1. Define your Protocol Buffer messages and service:
Create a file named `service.proto`:
```protobuf
syntax = "proto3";
package myservice;
option go_package = "./pb";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
message GetUserRequest {
int32 id = 1;
}
service UserService {
rpc GetUser(GetUserRequest) returns (User) {}
}
```
2. Generate Go code from the Proto file:
```
protoc --go_out=. --go-grpc_out=. service.proto
```
3. Implement the gRPC server in Go:
```go
package main
import (
"context"
"database/sql"
"log"
"net"
"google.golang.org/grpc"
"github.com/lib/pq"
pb "your_project/pb"
)
type server struct {
pb.UnimplementedUserServiceServer
db *sql.DB
}
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
var user pb.User
err := s.db.QueryRow("SELECT id, name, email FROM users WHERE id = $1", req.Id).Scan(&user.Id, &user.Name, &user.Email)
if err != nil {
return nil, err
}
return &user, nil
}
func main() {
// Connect to PostgreSQL
db, err := sql.Open("postgres", "postgresql://username:password@localhost/dbname?sslmode=disable")
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// Create gRPC server
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &server{db: db})
// Start listening
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
log.Println("Server listening on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
```
This example demonstrates:
- Defining a simple gRPC service using Protocol Buffers
- Implementing the service in Go
- Connecting to a PostgreSQL database
- Handling a basic database query within a gRPC method
Remember to handle errors properly, implement proper validation, and consider using an ORM like GORM for more complex database interactions. Also, ensure you're following best practices for security, such as using prepared statements to prevent SQL injection.
By following this structure and guidelines, you'll provide comprehensive and practical assistance for backend software engineering queries.

33
Makefile Normal file
View File

@ -0,0 +1,33 @@
# File: Makefile
.PHONY: all generate clean test
# Variables
GOLANGCI_LINT_VERSION := v1.55.2 # Or your preferred version
all: generate test
generate:
@echo "Generating Go code from Protobuf definitions..."
@./scripts/gen-proto.sh
# Placeholder for future commands
clean:
@echo "Cleaning up generated files and build artifacts..."
@rm -f ./api/v1alpha1/*.pb.go
@rm -f kat-agent katcall
test: generate
@echo "Running tests..."
@go test ./...
lint:
@echo "Running linter..."
@if ! command -v golangci-lint &> /dev/null; then \
echo "golangci-lint not found. Installing..."; \
go install github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION); \
fi
@golangci-lint run
# Add to go.mod if not already present by go install
# go get google.golang.org/protobuf/cmd/protoc-gen-go
# go get google.golang.org/grpc/cmd/protoc-gen-go-grpc (if you plan to use gRPC services soon)

3458
api/v1alpha1/kat.pb.go Normal file

File diff suppressed because it is too large Load Diff

345
api/v1alpha1/kat.proto Normal file
View File

@ -0,0 +1,345 @@
// File: api/v1alpha1/kat.proto
syntax = "proto3";
package v1alpha1;
option go_package = "git.dws.rip/dubey/kat"; // Adjust to your actual go module path
import "google/protobuf/timestamp.proto";
// Common Metadata (RFC 3.2, Phase 0 Docs)
message ObjectMeta {
string name = 1;
string namespace = 2;
string uid = 3;
int64 generation = 4;
string resource_version = 5; // e.g., etcd ModRevision
google.protobuf.Timestamp creation_timestamp = 6;
map<string, string> labels = 7;
map<string, string> annotations = 8;
}
// Workload (RFC 3.2)
enum WorkloadType {
WORKLOAD_TYPE_UNSPECIFIED = 0;
SERVICE = 1;
JOB = 2;
DAEMON_SERVICE = 3;
}
message GitSource {
string repository = 1;
string branch = 2;
string tag = 3;
string commit = 4;
}
message WorkloadSource {
oneof source_type {
string image = 1; // Direct image reference
GitSource git = 2; // Build from Git
}
string cache_image = 3; // Optional: Registry path for build cache layers (used with git source)
}
enum UpdateStrategyType {
UPDATE_STRATEGY_TYPE_UNSPECIFIED = 0;
ROLLING = 1;
SIMULTANEOUS = 2;
}
message RollingUpdateStrategy {
string max_surge = 1; // Can be int or percentage string e.g., "1" or "10%"
}
message UpdateStrategy {
UpdateStrategyType type = 1;
RollingUpdateStrategy rolling = 2; // Relevant if type is ROLLING
}
enum RestartCondition {
RESTART_CONDITION_UNSPECIFIED = 0;
NEVER = 1;
MAX_COUNT = 2;
ALWAYS = 3;
}
message RestartPolicy {
RestartCondition condition = 1;
int32 max_restarts = 2; // Used if condition=MAX_COUNT
int32 reset_seconds = 3; // Used if condition=MAX_COUNT
}
message Toleration {
string key = 1;
enum Operator {
OPERATOR_UNSPECIFIED = 0;
EXISTS = 1;
EQUAL = 2;
}
Operator operator = 2;
string value = 3; // Needed if operator=EQUAL
enum Effect {
EFFECT_UNSPECIFIED = 0;
NO_SCHEDULE = 1;
PREFER_NO_SCHEDULE = 2;
// NO_EXECUTE (not in RFC v1 scope for tolerations, but common)
}
Effect effect = 4;
}
message EnvVar {
string name = 1;
string value = 2;
}
message VolumeMount {
string name = 1; // Volume name from spec.volumes
string mount_path = 2; // Path inside container
string sub_path = 3; // Optional: Mount sub-directory
bool read_only = 4; // Optional: Default false
}
message ResourceRequests {
string cpu = 1; // e.g., "100m"
string memory = 2; // e.g., "64Mi"
}
message ResourceLimits {
string cpu = 1; // e.g., "1"
string memory = 2; // e.g., "256Mi"
}
enum GPUDriver {
GPU_DRIVER_UNSPECIFIED = 0;
ANY = 1;
NVIDIA = 2;
AMD = 3;
}
message GPUSpec {
GPUDriver driver = 1;
int32 min_vram_mb = 2; // Minimum GPU memory required
}
message ContainerResources {
ResourceRequests requests = 1;
ResourceLimits limits = 2;
GPUSpec gpu = 3;
}
message Container {
string name = 1; // Optional: Informational name
repeated string command = 2;
repeated string args = 3;
repeated EnvVar env = 4;
repeated VolumeMount volume_mounts = 5;
ContainerResources resources = 6;
}
message SimpleClusterStorageVolumeSource {
// Empty, implies agent creates dir under volumeBasePath
}
enum HostPathType {
HOST_PATH_TYPE_UNSPECIFIED = 0; // No check, mount whatever is there or fail
DIRECTORY_OR_CREATE = 1;
DIRECTORY = 2;
FILE_OR_CREATE = 3;
FILE = 4;
SOCKET = 5;
}
message HostMountVolumeSource {
string host_path = 1; // Absolute path on host
HostPathType ensure_type = 2; // Optional: Type to ensure/check
}
message Volume {
string name = 1; // Name referenced by volumeMounts
oneof volume_source {
SimpleClusterStorageVolumeSource simple_cluster_storage = 2;
HostMountVolumeSource host_mount = 3;
}
}
message WorkloadSpec {
WorkloadType type = 1;
WorkloadSource source = 2;
int32 replicas = 3; // Required for SERVICE
UpdateStrategy update_strategy = 4;
RestartPolicy restart_policy = 5;
map<string, string> node_selector = 6;
repeated Toleration tolerations = 7;
Container container = 8;
repeated Volume volumes = 9;
}
message WorkloadStatus {
// Placeholder for Phase 0. Will be expanded later.
// Example fields:
// int32 observed_generation = 1;
// int32 ready_replicas = 2;
// string condition = 3; // e.g., "Available", "Progressing", "Failed"
}
message Workload {
ObjectMeta metadata = 1;
WorkloadSpec spec = 2;
WorkloadStatus status = 3;
}
// VirtualLoadBalancer (RFC 3.3)
message PortSpec {
string name = 1; // Optional: e.g., "web", "grpc"
int32 container_port = 2; // Port app listens on in container
string protocol = 3; // Optional: TCP | UDP. Default TCP.
}
message ExecHealthCheck {
repeated string command = 1; // Exit 0 = healthy
}
message HealthCheck {
ExecHealthCheck exec = 1;
int32 initial_delay_seconds = 2;
int32 period_seconds = 3;
int32 timeout_seconds = 4;
int32 success_threshold = 5;
int32 failure_threshold = 6;
}
message IngressRule {
string host = 1;
string path = 2;
string service_port_name = 3; // Name of port from PortSpec
int32 service_port = 4; // Port number from PortSpec (overrides name)
bool tls = 5; // Signal for ACME
}
message VirtualLoadBalancerSpec {
repeated PortSpec ports = 1;
HealthCheck health_check = 2;
repeated IngressRule ingress = 3;
}
message VirtualLoadBalancer {
ObjectMeta metadata = 1; // Name likely matches Workload name
VirtualLoadBalancerSpec spec = 2;
// VirtualLoadBalancerStatus status = 3; // Placeholder
}
// JobDefinition (RFC 3.4)
message JobDefinitionSpec {
string schedule = 1; // Optional: Cron schedule
int32 completions = 2; // Optional: Default 1
int32 parallelism = 3; // Optional: Default 1
int32 active_deadline_seconds = 4; // Optional
int32 backoff_limit = 5; // Optional: Default 3
}
message JobDefinition {
ObjectMeta metadata = 1; // Name likely matches Workload name
JobDefinitionSpec spec = 2;
// JobDefinitionStatus status = 3; // Placeholder
}
// BuildDefinition (RFC 3.5)
message BuildCache {
string registry_path = 1; // e.g., "myreg.com/cache/myapp"
}
message BuildDefinitionSpec {
string build_context = 1; // Optional: Path relative to repo root. Defaults to "."
string dockerfile_path = 2; // Optional: Path relative to buildContext. Defaults to "./Dockerfile"
map<string, string> build_args = 3; // Optional
string target_stage = 4; // Optional
string platform = 5; // Optional: e.g., "linux/arm64"
BuildCache cache = 6; // Optional
}
message BuildDefinition {
ObjectMeta metadata = 1; // Name likely matches Workload name
BuildDefinitionSpec spec = 2;
// BuildDefinitionStatus status = 3; // Placeholder
}
// Namespace (RFC 3.7)
message NamespaceSpec {
// Potentially finalizers or other future spec fields
}
message NamespaceStatus {
// string phase = 1; // e.g., "Active", "Terminating"
}
message Namespace {
ObjectMeta metadata = 1;
NamespaceSpec spec = 2;
NamespaceStatus status = 3;
}
// Node (Internal Representation - RFC 3.8)
message NodeResources {
string cpu = 1; // e.g., "2000m"
string memory = 2; // e.g., "4096Mi"
map<string, string> custom_resources = 3; // e.g., for GPUs "nvidia.com/gpu: 2"
}
message NodeStatusDetails {
NodeResources capacity = 1;
NodeResources allocatable = 2;
// repeated WorkloadInstanceStatus workload_instances = 3; // Define later
// OverlayNetworkStatus overlay_network = 4; // Define later
string condition = 5; // e.g., "Ready", "NotReady", "Draining"
google.protobuf.Timestamp last_heartbeat_time = 6;
}
message Taint { // From RFC 1.5, used in NodeSpec
string key = 1;
string value = 2;
enum Effect {
EFFECT_UNSPECIFIED = 0;
NO_SCHEDULE = 1;
PREFER_NO_SCHEDULE = 2;
// NO_EXECUTE (not in RFC v1 scope for taints, but common)
}
Effect effect = 3;
}
message NodeSpec {
repeated Taint taints = 1;
string overlay_subnet = 2; // Assigned by leader
// string provider_id = 3; // Cloud provider specific ID
// map<string, string> labels = 4; // Node labels, distinct from metadata.labels
// map<string, string> annotations = 5; // Node annotations
}
message Node {
ObjectMeta metadata = 1; // Name is the unique node name/ID
NodeSpec spec = 2;
NodeStatusDetails status = 3;
}
// ClusterConfiguration (RFC 3.9)
message ClusterConfigurationSpec {
string cluster_cidr = 1;
string service_cidr = 2;
int32 node_subnet_bits = 3;
string cluster_domain = 4;
int32 agent_port = 5;
int32 api_port = 6;
int32 etcd_peer_port = 7;
int32 etcd_client_port = 8;
string volume_base_path = 9;
string backup_path = 10;
int32 backup_interval_minutes = 11;
int32 agent_tick_seconds = 12;
int32 node_loss_timeout_seconds = 13;
}
message ClusterConfiguration {
ObjectMeta metadata = 1; // e.g., name of the cluster
ClusterConfigurationSpec spec = 2;
// ClusterConfigurationStatus status = 3; // Placeholder
}

134
docs/plan/filestructure.md Normal file
View File

@ -0,0 +1,134 @@
# Directory/File Structure
This structure assumes a Go-based project, as hinted by the Go interface definitions in the RFC.
```
kat-system/
├── README.md # Project overview, build instructions, contribution guide
├── LICENSE # Project license (e.g., Apache 2.0, MIT)
├── go.mod # Go modules definition
├── go.sum # Go modules checksums
├── Makefile # Build, test, lint, generate code, etc.
├── api/
│ └── v1alpha1/
│ ├── kat.proto # Protocol Buffer definitions for all KAT resources (Workload, Node, etc.)
│ └── generated/ # Generated Go code from .proto files (e.g., using protoc-gen-go)
│ # Potentially OpenAPI/Swagger specs generated from protos too.
├── cmd/
│ ├── kat-agent/
│ │ └── main.go # Entrypoint for the kat-agent binary
│ └── katcall/
│ └── main.go # Entrypoint for the katcall CLI binary
├── internal/
│ ├── agent/
│ │ ├── agent.go # Core agent logic, heartbeating, command processing
│ │ ├── runtime.go # Interface with ContainerRuntime (Podman)
│ │ ├── build.go # Git-native build process logic
│ │ └── dns_resolver.go # Embedded DNS server logic
│ │
│ ├── leader/
│ │ ├── leader.go # Core leader logic, reconciliation loops
│ │ ├── schedule.go # Scheduling algorithm implementation
│ │ ├── ipam.go # IP Address Management logic
│ │ ├── state_backup.go # etcd backup logic
│ │ └── api_handler.go # HTTP API request handlers (connects to api/v1alpha1)
│ │
│ ├── api/ # Server-side API implementation details
│ │ ├── server.go # HTTP server setup, middleware (auth, logging)
│ │ ├── router.go # API route definitions
│ │ └── auth.go # Authentication (mTLS, Bearer token) logic
│ │
│ ├── cli/
│ │ ├── commands/ # Subdirectories for each katcall command (apply, get, logs, etc.)
│ │ │ ├── apply.go
│ │ │ └── ...
│ │ ├── client.go # HTTP client for interacting with KAT API
│ │ └── utils.go # CLI helper functions
│ │
│ ├── config/
│ │ ├── types.go # Go structs for Quadlet file kinds if not directly from proto
│ │ ├── parse.go # Logic for parsing and validating *.kat files (Quadlets, cluster.kat)
│ │ └── defaults.go # Default values for configurations
│ │
│ ├── store/
│ │ ├── interface.go # Definition of StateStore interface (as in RFC 5.1)
│ │ └── etcd.go # etcd implementation of StateStore, embedded etcd setup
│ │
│ ├── runtime/
│ │ ├── interface.go # Definition of ContainerRuntime interface (as in RFC 6.1)
│ │ └── podman.go # Podman implementation of ContainerRuntime
│ │
│ ├── network/
│ │ ├── wireguard.go # WireGuard setup and peer management logic
│ │ └── types.go # Network related internal types
│ │
│ ├── pki/
│ │ ├── ca.go # Certificate Authority management (generation, signing)
│ │ └── certs.go # Certificate generation and handling utilities
│ │
│ ├── observability/
│ │ ├── logging.go # Logging setup for components
│ │ ├── metrics.go # Metrics collection and exposure logic
│ │ └── events.go # Event recording and retrieval logic
│ │
│ ├── types/ # Core internal data structures if not covered by API protos
│ │ ├── node.go
│ │ ├── workload.go
│ │ └── ...
│ │
│ ├── constants/
│ │ └── constants.go # Global constants (etcd key prefixes, default ports, etc.)
│ │
│ └── utils/
│ ├── utils.go # Common utility functions (error handling, string manipulation)
│ └── tar.go # Utilities for handling tar.gz Quadlet archives
├── docs/
│ ├── rfc/
│ │ └── RFC001-KAT.md # The source RFC document
│ ├── user-guide/ # User documentation (installation, getting started, tutorials)
│ │ ├── installation.md
│ │ └── basic_usage.md
│ └── api-guide/ # API usage documentation (perhaps generated)
├── examples/
│ ├── simple-service/ # Example Quadlet for a simple service
│ │ ├── workload.kat
│ │ └── VirtualLoadBalancer.kat
│ ├── git-build-service/ # Example Quadlet for a service built from Git
│ │ ├── workload.kat
│ │ └── build.kat
│ ├── job/ # Example Quadlet for a Job
│ │ ├── workload.kat
│ │ └── job.kat
│ └── cluster.kat # Example cluster configuration file
├── scripts/
│ ├── setup-dev-env.sh # Script to set up development environment
│ ├── lint.sh # Code linting script
│ ├── test.sh # Script to run all tests
│ └── gen-proto.sh # Script to generate Go code from .proto files
└── test/
├── unit/ # Unit tests (mirroring internal/ structure)
├── integration/ # Integration tests (e.g., agent-leader interaction)
└── e2e/ # End-to-end tests (testing full cluster operations via katcall)
├── fixtures/ # Test Quadlet files
└── e2e_test.go
```
**Description of Key Files/Directories and Relationships:**
* **`api/v1alpha1/kat.proto`**: The source of truth for all resource definitions. `make generate` (or `scripts/gen-proto.sh`) would convert this into Go structs in `api/v1alpha1/generated/`. These structs will be used across the `internal/` packages.
* **`cmd/kat-agent/main.go`**: Initializes and runs the `kat-agent`. It will instantiate components from `internal/store` (for etcd), `internal/agent`, `internal/leader`, `internal/pki`, `internal/network`, and `internal/api` (for the API server if elected leader).
* **`cmd/katcall/main.go`**: Entry point for the CLI. It uses `internal/cli` components to parse commands and interact with the KAT API via `internal/cli/client.go`.
* **`internal/config/parse.go`**: Used by the Leader to parse submitted Quadlet `tar.gz` archives and by `kat-agent init` to parse `cluster.kat`.
* **`internal/store/etcd.go`**: Implements `StateStore` and manages the embedded etcd instance. Used by both Agent (for watching) and Leader (for all state modifications, leader election).
* **`internal/runtime/podman.go`**: Implements `ContainerRuntime`. Used by `internal/agent/runtime.go` to manage containers based on Podman.
* **`internal/agent/agent.go`** and **`internal/leader/leader.go`**: Contain the core state machines and logic for the respective roles. The `kat-agent` binary decides which role's logic to activate based on leader election status.
* **`internal/pki/ca.go`**: Used by `kat-agent init` to create the CA, and by the Leader to sign CSRs from joining agents.
* **`internal/network/wireguard.go`**: Used by agents to configure their local WireGuard interface based on data synced from etcd (managed by the Leader).
* **`internal/leader/api_handler.go`**: Implements the HTTP handlers for the API, using other leader components (scheduler, IPAM, store) to fulfill requests.

183
docs/plan/overview.md Normal file
View File

@ -0,0 +1,183 @@
# Implementation Plan
This plan breaks down the implementation into manageable phases, each with a testable milestone.
**Phase 0: Project Setup & Core Types**
* **Goal**: Basic project structure, version control, build system, and core data type definitions.
* **Tasks**:
1. Initialize Git repository, `go.mod`.
2. Create initial directory structure (as above).
3. Define core Proto3 messages in `api/v1alpha1/kat.proto` for: `Workload`, `VirtualLoadBalancer`, `JobDefinition`, `BuildDefinition`, `Namespace`, `Node` (internal representation), `ClusterConfiguration`.
4. Set up `scripts/gen-proto.sh` and generate initial Go types.
5. Implement parsing and basic validation for `cluster.kat` (`internal/config/parse.go`).
6. Implement parsing and basic validation for Quadlet files (`workload.kat`, etc.) and their `tar.gz` packaging/unpackaging.
* **Milestone**:
* `make generate` successfully creates Go types from protos.
* Unit tests pass for parsing `cluster.kat` and a sample Quadlet directory (as `tar.gz`) into their respective Go structs.
**Phase 1: State Management & Leader Election**
* **Goal**: A functional embedded etcd and leader election mechanism.
* **Tasks**:
1. Implement the `StateStore` interface (RFC 5.1) with an etcd backend (`internal/store/etcd.go`).
2. Integrate embedded etcd server into `kat-agent` (RFC 2.2, 5.2), configurable via `cluster.kat` parameters.
3. Implement leader election using `go.etcd.io/etcd/client/v3/concurrency` (RFC 5.3).
4. Basic `kat-agent init` functionality:
* Parse `cluster.kat`.
* Start single-node embedded etcd.
* Campaign for and become leader.
* Store initial cluster configuration (UID, CIDRs from `cluster.kat`) in etcd.
* **Milestone**:
* A single `kat-agent init --config cluster.kat` process starts, initializes etcd, and logs that it has become the leader.
* The cluster configuration from `cluster.kat` can be verified in etcd using an etcd client.
* `StateStore` interface methods (`Put`, `Get`, `Delete`, `List`) are testable against the embedded etcd.
**Phase 2: Basic Agent & Node Lifecycle (Init, Join, PKI)**
* **Goal**: Initial Leader setup, a second Agent joining with mTLS, and heartbeating.
* **Tasks**:
1. Implement Internal PKI (RFC 10.6) in `internal/pki/`:
* CA key/cert generation on `kat-agent init`.
* CSR generation by agent on join.
* CSR signing by Leader.
2. Implement initial Node Communication Protocol (RFC 2.3) for join:
* Agent (`kat-agent join --leader-api <...> --advertise-address <...>`) sends CSR to Leader.
* Leader validates, signs, returns certs & CA. Stores node registration (name, UID, advertise addr, WG pubkey placeholder) in etcd.
3. Implement basic mTLS for this join communication.
4. Implement Node Heartbeat (`POST /v1alpha1/nodes/{nodeName}/status`) from Agent to Leader (RFC 4.1.3). Leader updates node status in etcd.
5. Leader implements basic failure detection (marks Node `NotReady` in etcd if heartbeats cease) (RFC 4.1.4).
* **Milestone**:
* `kat-agent init` establishes a Leader with a CA.
* `kat-agent join` allows a second agent to securely register with the Leader, obtain certificates, and store its info in etcd.
* Leader's API receives heartbeats from the joined Agent.
* If a joined Agent is stopped, the Leader marks its status as `NotReady` in etcd after `nodeLossTimeoutSeconds`.
**Phase 3: Container Runtime Interface & Local Podman Management**
* **Goal**: Agent can manage containers locally via Podman using the CRI.
* **Tasks**:
1. Define `ContainerRuntime` interface in `internal/runtime/interface.go` (RFC 6.1).
2. Implement the Podman backend for `ContainerRuntime` in `internal/runtime/podman.go` (RFC 6.2). Focus on: `CreateContainer`, `StartContainer`, `StopContainer`, `RemoveContainer`, `GetContainerStatus`, `PullImage`, `StreamContainerLogs`.
3. Implement rootless execution strategy (RFC 6.3):
* Mechanism to ensure dedicated user accounts (initially, assume pre-existing or manual creation for tests).
* Podman systemd unit generation (`podman generate systemd`).
* Managing units via `systemctl --user`.
* **Milestone**:
* Agent process (upon a mocked internal command) can pull a specified image (e.g., `nginx`) and run it rootlessly using Podman and systemd user services.
* Agent can stop, remove, and get the status/logs of this container.
* All operations are performed via the `ContainerRuntime` interface.
**Phase 4: Basic Workload Deployment (Single Node, Image Source Only, No Networking)**
* **Goal**: Leader can instruct an Agent to run a simple `Service` workload (single container, image source) on itself (if leader is also an agent) or a single joined agent.
* **Tasks**:
1. Implement basic API endpoints on Leader for Workload CRUD (`POST/PUT /v1alpha1/n/{ns}/workloads` accepting `tar.gz`) (RFC 8.3, 4.2). Leader stores Quadlet files in etcd.
2. Simplistic scheduling (RFC 4.4): If only one agent node, assign workload to it. Leader creates an "assignment" or "task" for the agent in etcd.
3. Agent watches for assigned tasks from etcd.
4. On receiving a task, Agent uses `ContainerRuntime` to deploy the container (image from `workload.kat`).
5. Agent reports container instance status in its heartbeat. Leader updates overall workload status in etcd.
6. Basic `katcall apply -f <dir>` and `katcall get workload <name>` functionality.
* **Milestone**:
* User can deploy a simple single-container `Service` (e.g., `nginx`) using `katcall apply`.
* The container runs on the designated Agent node.
* `katcall get workload my-service` shows its status as running.
* `katcall logs <instanceID>` streams container logs.
**Phase 5: Overlay Networking (WireGuard) & IPAM**
* **Goal**: Nodes establish a WireGuard overlay network. Leader allocates IPs for containers.
* **Tasks**:
1. Implement WireGuard setup on Agents (`internal/network/wireguard.go`) (RFC 7.1):
* Key generation, public key reporting to Leader during join/heartbeat.
* Leader stores Node WireGuard public keys and advertise endpoints in etcd.
* Agent configures its `kat0` interface and peers by watching etcd.
2. Implement IPAM in Leader (`internal/leader/ipam.go`) (RFC 7.2):
* Node subnet allocation from `clusterCIDR` (from `cluster.kat`).
* Container IP allocation from the node's subnet when a workload instance is scheduled.
3. Agent uses the Leader-assigned IP when creating the container network/container with Podman.
* **Milestone**:
* All joined KAT nodes form a WireGuard mesh; `wg show` on nodes confirms peer connections.
* Leader allocates a unique overlay IP for each container instance.
* Containers on different nodes can ping each other using their overlay IPs.
**Phase 6: Distributed Agent DNS & Service Discovery**
* **Goal**: Basic service discovery using agent-local DNS for deployed services.
* **Tasks**:
1. Implement Agent-local DNS server (`internal/agent/dns_resolver.go`) using `miekg/dns` (RFC 7.3).
2. Leader writes DNS `A` records to etcd (e.g., `<workloadName>.<namespace>.<clusterDomain> -> <containerOverlayIP>`) when service instances become healthy/active.
3. Agent DNS server watches etcd for DNS records and updates its local zones.
4. Agent configures `/etc/resolv.conf` in managed containers to use its `kat0` IP as nameserver.
* **Milestone**:
* A service (`service-a`) deployed on one node can be resolved by its DNS name (e.g., `service-a.default.kat.cluster.local`) by a container on another node.
* DNS resolution provides the correct overlay IP(s) of `service-a` instances.
**Phase 7: Advanced Workload Features & Full Scheduling**
* **Goal**: Implement `Job`, `DaemonService`, richer scheduling, health checks, volumes, and restart policies.
* **Tasks**:
1. Implement `Job` type (RFC 3.4, 4.8): scheduling, completion tracking, backoff.
2. Implement `DaemonService` type (RFC 3.2): ensures one instance per eligible node.
3. Implement full scheduling logic in Leader (RFC 4.4): resource requests (`cpu`, `memory`), `nodeSelector`, Taint/Toleration, GPU (basic), "most empty" scoring.
4. Implement `VirtualLoadBalancer.kat` parsing and Agent-side health checks (RFC 3.3, 4.6.3). Leader uses health status for service readiness and DNS.
5. Implement container `restartPolicy` (RFC 3.2, 4.6.4) via systemd unit configuration.
6. Implement `volumeMounts` and `volumes` (RFC 3.2, 4.7): `HostMount`, `SimpleClusterStorage`. Agent ensures paths are set up.
* **Milestone**:
* `Job`s run to completion and their status is tracked.
* `DaemonService`s run one instance on all eligible nodes.
* Services are scheduled according to resource requests, selectors, and taints.
* Unhealthy service instances are identified by health checks and reflected in status.
* Containers restart based on their policy.
* Workloads can mount host paths and simple cluster storage.
**Phase 8: Git-Native Builds & Workload Updates/Rollbacks**
* **Goal**: Enable on-agent builds from Git sources and implement workload update strategies.
* **Tasks**:
1. Implement `BuildDefinition.kat` parsing (RFC 3.5).
2. Implement Git-native build process on Agent (`internal/agent/build.go`) using Podman (RFC 4.3).
3. Implement `cacheImage` pull/push for build caching (Agent needs registry credentials configured locally).
4. Implement workload update strategies in Leader (RFC 4.5): `Simultaneous`, `Rolling` (with `maxSurge`).
5. Implement manual rollback mechanism (`katcall rollback workload <name>`) (RFC 4.5).
* **Milestone**:
* A workload can be successfully deployed from a Git repository source, with the image built on the agent.
* A deployed service can be updated using the `Rolling` strategy with observable incremental instance replacement.
* A workload can be rolled back to its previous version.
**Phase 9: Full API Implementation & CLI (`katcall`) Polish**
* **Goal**: A robust and comprehensive HTTP API and `katcall` CLI.
* **Tasks**:
1. Implement all remaining API endpoints and features as per RFC Section 8. Ensure Proto3/JSON contracts are met.
2. Implement API authentication: bearer token for `katcall` (RFC 8.1, 10.1).
3. Flesh out `katcall` with all necessary commands and options (RFC 1.5 Terminology - katcall, RFC 8.3 hints):
* `drain <nodeName>`, `get nodes/namespaces`, `describe <resource>`, etc.
4. Improve error reporting and user feedback in CLI and API.
* **Milestone**:
* All functionalities defined in the RFC can be managed and introspected via the `katcall` CLI interacting with the secure KAT API.
* API documentation (e.g., Swagger/OpenAPI generated from protos or code) is available.
**Phase 10: Observability, Backup/Restore, Advanced Features & Security**
* **Goal**: Implement observability features, state backup/restore, and other advanced functionalities.
* **Tasks**:
1. Implement Agent & Leader logging to systemd journal/files; API for streaming container logs already in Phase 4/Milestone (RFC 9.1).
2. Implement basic Metrics exposure (`/metrics` JSON endpoint on Leader/Agent) (RFC 9.2).
3. Implement Events system: Leader records significant events in etcd, API to query events (RFC 9.3).
4. Implement Leader-driven etcd state backup (`etcdctl snapshot save`) (RFC 5.4).
5. Document and test the etcd state restore procedure (RFC 5.5).
6. Implement Detached Node Operation and Rejoin (RFC 4.9).
7. Provide standard Quadlet files and documentation for the Traefik Ingress recipe (RFC 7.4).
8. Review and harden security aspects: API security, build security, network security, secrets handling (document current limitations as per RFC 10.5).
* **Milestone**:
* Container logs are streamable via `katcall logs`. Agent/Leader logs are accessible.
* Basic metrics are available via API. Cluster events can be listed.
* Automated etcd backups are created by the Leader. Restore procedure is tested.
* Detached node can operate locally and rejoin the main cluster.
* Traefik can be deployed using provided Quadlets to achieve ingress.
**Phase 11: Testing, Documentation, and Release Preparation**
* **Goal**: Ensure KAT v1.0 is robust, well-documented, and ready for release.
* **Tasks**:
1. Write comprehensive unit tests for all core logic.
2. Develop integration tests for component interactions (e.g., Leader-Agent, Agent-Podman).
3. Create an E2E test suite using `katcall` to simulate real user scenarios.
4. Write detailed user documentation: installation, configuration, tutorials for all features, troubleshooting.
5. Perform performance testing on key operations (e.g., deployment speed, agent density).
6. Conduct a thorough security review/audit against RFC security considerations.
7. Establish a release process: versioning, changelog, building release artifacts.
* **Milestone**:
* High test coverage.
* Comprehensive user and API documentation is complete.
* Known critical bugs are fixed.
* KAT v1.0 is packaged and ready for its first official release.

274
docs/plan/phase0.md Normal file
View File

@ -0,0 +1,274 @@
# **Phase 0: Project Setup & Core Types**
* **Goal**: Initialize the project structure, establish version control and build tooling, define the core data structures (primarily through Protocol Buffers as specified in the RFC), and ensure basic parsing/validation capabilities for initial configuration files.
* **RFC Sections Primarily Used**: Overall project understanding, Section 8.2 (Resource Representation Proto3 & JSON), Section 3 (Resource Model - for identifying initial protos), Section 3.9 (Cluster Configuration - for `cluster.kat`).
**Tasks & Sub-Tasks:**
1. **Initialize Git Repository & Go Module**
* **Purpose**: Establish version control and Go project identity.
* **Details**:
* Create the root project directory (e.g., `kat-system`).
* Navigate into the directory: `cd kat-system`.
* Initialize Git: `git init`.
* Create an initial `.gitignore` file. Add common Go and OS-specific ignores (e.g., `*.o`, `*.exe`, `*~`, `.DS_Store`, compiled binaries like `kat-agent`, `katcall`).
* Initialize Go module: `go mod init github.com/dws-llc/kat-system` (or your chosen module path).
* **Verification**:
* `.git` directory exists.
* `go.mod` file is created with the correct module path.
* Initial commit can be made.
2. **Create Initial Directory Structure**
* **Purpose**: Lay out the skeleton of the project for organizing code and artifacts.
* **Details**: Create the top-level directories as outlined in the "Proposed Directory/File Structure" from the previous response:
```
kat-system/
├── api/
│ └── v1alpha1/
├── cmd/
│ ├── kat-agent/
│ └── katcall/
├── docs/
│ └── rfc/
├── examples/
├── internal/
├── pkg/ # (Optional, if you decide to have externally importable library code not part of 'internal')
├── scripts/
└── test/
``` * Place the `RFC001-KAT.md` into `docs/rfc/`.
* **Verification**: Directory structure matches the plan.
3. **Define Initial Protocol Buffer Messages (`api/v1alpha1/kat.proto`)**
* **Purpose**: Create the canonical definitions for KAT resources that will be used for API communication and internal state representation.
* **Details**:
* Create `api/v1alpha1/kat.proto`.
* Define initial messages based on RFC Section 3 and Section 8.2. Focus on data structures, not RPC service definitions yet.
* **Common Metadata**:
```protobuf
message ObjectMeta {
string name = 1;
string namespace = 2;
string uid = 3;
int64 generation = 4;
string resource_version = 5; // e.g., etcd ModRevision
google.protobuf.Timestamp creation_timestamp = 6;
map<string, string> labels = 7;
map<string, string> annotations = 8; // For future use
}
message Timestamp { // google.protobuf.Timestamp might be better
int64 seconds = 1;
int32 nanos = 2;
}
```
* **`Workload` (RFC 3.2)**:
```protobuf
enum WorkloadType {
WORKLOAD_TYPE_UNSPECIFIED = 0;
SERVICE = 1;
JOB = 2;
DAEMON_SERVICE = 3;
}
// ... (GitSource, UpdateStrategy, RestartPolicy, Container, VolumeMount, ResourceRequests, GPUSpec, Volume definitions)
message WorkloadSpec {
WorkloadType type = 1;
// Source source = 2; // Define GitSource, ImageSource, CacheImage
int32 replicas = 3;
// UpdateStrategy update_strategy = 4;
// RestartPolicy restart_policy = 5;
map<string, string> node_selector = 6;
// repeated Toleration tolerations = 7;
Container container = 8; // Define Container fully
repeated Volume volumes = 9; // Define Volume fully (SimpleClusterStorage, HostMount)
// ... other spec fields from workload.kat
}
message Workload {
ObjectMeta metadata = 1;
WorkloadSpec spec = 2;
// WorkloadStatus status = 3; // Define later
}
```
*(Start with core fields and expand. For brevity, not all sub-messages are listed here, but they need to be defined based on `workload.kat` fields in RFC 3.2)*
* **`VirtualLoadBalancer` (RFC 3.3)**:
```protobuf
message VirtualLoadBalancerSpec {
// repeated Port ports = 1;
// HealthCheck health_check = 2;
// repeated IngressRule ingress = 3;
}
message VirtualLoadBalancer { // This might be part of Workload or a separate resource
ObjectMeta metadata = 1; // Name likely matches Workload name
VirtualLoadBalancerSpec spec = 2;
}
```
*Consider if this is embedded in `Workload.spec` or a truly separate resource associated by name.* RFC shows it as a separate `*.kat` file, implying separate resource.
* **`JobDefinition` (RFC 3.4)**: Similar structure, `JobDefinitionSpec` with fields like `schedule`, `completions`.
* **`BuildDefinition` (RFC 3.5)**: Similar structure, `BuildDefinitionSpec` with fields like `buildContext`, `dockerfilePath`.
* **`Namespace` (RFC 3.7)**:
```protobuf
message NamespaceSpec {
// Potentially finalizers or other future spec fields
}
message Namespace {
ObjectMeta metadata = 1;
NamespaceSpec spec = 2;
// NamespaceStatus status = 3; // Define later
}
```
* **`Node` (Internal Representation - RFC 3.8)**: (This is for Leader's internal state, not a user-defined Quadlet)
```protobuf
message NodeResources {
string cpu = 1;
string memory = 2;
// map<string, string> custom_resources = 3; // e.g., for GPUs
}
message NodeStatusDetails { // For status reporting by agent
NodeResources capacity = 1;
NodeResources allocatable = 2;
// repeated WorkloadInstanceStatus workload_instances = 3;
// OverlayNetworkStatus overlay_network = 4;
string condition = 5; // e.g., "Ready", "NotReady"
google.protobuf.Timestamp last_heartbeat_time = 6;
}
message NodeSpec { // Configuration for a node, some set by leader
// repeated Taint taints = 1;
string overlay_subnet = 2; // Assigned by leader
}
message Node { // Represents a node in the cluster
ObjectMeta metadata = 1; // Name is the unique node name
NodeSpec spec = 2;
NodeStatusDetails status = 3;
}
```
* **`ClusterConfiguration` (RFC 3.9)**:
```protobuf
message ClusterConfigurationSpec {
string cluster_cidr = 1;
string service_cidr = 2;
int32 node_subnet_bits = 3;
string cluster_domain = 4;
int32 agent_port = 5;
int32 api_port = 6;
int32 etcd_peer_port = 7;
int32 etcd_client_port = 8;
string volume_base_path = 9;
string backup_path = 10;
int32 backup_interval_minutes = 11;
int32 agent_tick_seconds = 12;
int32 node_loss_timeout_seconds = 13;
}
message ClusterConfiguration {
ObjectMeta metadata = 1; // e.g., name of the cluster
ClusterConfigurationSpec spec = 2;
}
```
* Include `syntax = "proto3";` and appropriate `package` and `option go_package` statements.
* Import `google/protobuf/timestamp.proto` if used.
* **Potential Challenges**: Accurately translating all nested YAML structures from Quadlet definitions into Protobuf messages. Deciding on naming conventions.
* **Verification**: `kat.proto` file is syntactically correct. It includes initial definitions for the key resources.
4. **Set Up Protobuf Code Generation (`scripts/gen-proto.sh`, Makefile target)**
* **Purpose**: Automate the conversion of `.proto` definitions into Go code.
* **Details**:
* Install `protoc` (protobuf compiler) and `protoc-gen-go` plugin. Add to `go.mod` via `go get google.golang.org/protobuf/cmd/protoc-gen-go` and `go install google.golang.org/protobuf/cmd/protoc-gen-go`.
* Create `scripts/gen-proto.sh`:
```bash
#!/bin/bash
set -e
PROTOC_GEN_GO=$(go env GOBIN)/protoc-gen-go
if [ ! -f "$PROTOC_GEN_GO" ]; then
echo "protoc-gen-go not found. Please run: go install google.golang.org/protobuf/cmd/protoc-gen-go"
exit 1
fi
API_DIR="./api/v1alpha1"
OUT_DIR="${API_DIR}/generated" # Or directly into api/v1alpha1 if preferred
mkdir -p "$OUT_DIR"
protoc --proto_path="${API_DIR}" \
--go_out="${OUT_DIR}" --go_opt=paths=source_relative \
"${API_DIR}/kat.proto"
echo "Protobuf Go code generated in ${OUT_DIR}"
```
*(Adjust paths and options as needed. `paths=source_relative` is common.)*
* Make the script executable: `chmod +x scripts/gen-proto.sh`.
* (Optional) Add a Makefile target:
```makefile
.PHONY: generate
generate:
@echo "Generating Go code from Protobuf definitions..."
@./scripts/gen-proto.sh
```
* **Verification**:
* Running `scripts/gen-proto.sh` (or `make generate`) executes without errors.
* Go files (e.g., `kat.pb.go`) are generated in the specified output directory (`api/v1alpha1/generated/` or `api/v1alpha1/`).
* These generated files compile if included in a Go program.
5. **Implement Basic Parsing and Validation for `cluster.kat` (`internal/config/parse.go`, `internal/config/types.go`)**
* **Purpose**: Enable `kat-agent init` to read and understand its initial cluster-wide configuration.
* **Details**:
* In `internal/config/types.go` (or use generated proto types directly if preferred for consistency): Define Go structs that mirror `ClusterConfiguration` from `kat.proto`.
* If using proto types: the generated `ClusterConfiguration` struct can be used directly.
* In `internal/config/parse.go`:
* `ParseClusterConfiguration(filePath string) (*ClusterConfiguration, error)`:
1. Read the file content.
2. Unmarshal YAML into the Go struct (e.g., using `gopkg.in/yaml.v3`).
3. Perform basic validation:
* Check for required fields (e.g., `clusterCIDR`, `serviceCIDR`, ports).
* Validate CIDR formats.
* Ensure ports are within valid range.
* Ensure intervals are positive.
* `SetClusterConfigDefaults(config *ClusterConfiguration)`: Apply default values as per RFC 3.9 if fields are not set.
* **Potential Challenges**: Handling YAML unmarshalling intricacies, comprehensive validation logic.
* **Verification**:
* Unit tests for `ParseClusterConfiguration`:
* Test with a valid `examples/cluster.kat` file. Parsed struct should match expected values.
* Test with missing required fields; expect an error.
* Test with invalid field values (e.g., bad CIDR, invalid port); expect an error.
* Test with a file that includes some fields and omits optional ones; verify defaults are applied by `SetClusterConfigDefaults`.
* An example `examples/cluster.kat` file should be created for testing.
6. **Implement Basic Parsing/Validation for Quadlet Files (`internal/config/parse.go`, `internal/utils/tar.go`)**
* **Purpose**: Enable the Leader to understand submitted Workload definitions.
* **Details**:
* In `internal/utils/tar.go`:
* `UntarQuadlets(reader io.Reader) (map[string][]byte, error)`: Takes a `tar.gz` stream, unpacks it in memory (or temp dir), and returns a map of `fileName -> fileContent`.
* In `internal/config/parse.go`:
* `ParseQuadletFile(fileName string, content []byte) (interface{}, error)`:
1. Unmarshal YAML content based on `kind` field (e.g., into `Workload`, `VirtualLoadBalancer` generated proto structs).
2. Perform basic validation on the specific Quadlet type (e.g., `Workload` must have `metadata.name`, `spec.type`).
* `ParseQuadletDirectory(files map[string][]byte) (*Workload, *VirtualLoadBalancer, ..., error)`:
1. Iterate through files from `UntarQuadlets`.
2. Use `ParseQuadletFile` for each.
3. Perform cross-Quadlet file validation (e.g., if `build.kat` exists, `workload.kat` must have `spec.source.git`). Placeholder for now, more in later phases.
* **Potential Challenges**: Handling different Quadlet `kind`s, managing inter-file dependencies.
* **Verification**:
* Unit tests for `UntarQuadlets` with a sample `tar.gz` archive containing example Quadlet files.
* Unit tests for `ParseQuadletFile` for each Quadlet type (`workload.kat`, `VirtualLoadBalancer.kat` etc.) with valid and invalid content.
* An example Quadlet directory (e.g., `examples/simple-service/`) should be created and tarred for testing.
* `ParseQuadletDirectory` successfully parses a valid collection of Quadlet files from the tar.
* **Milestone Verification (Overall Phase 0)**:
1. Project repository is set up with Go modules and initial directory structure.
2. `make generate` (or `scripts/gen-proto.sh`) successfully compiles `api/v1alpha1/kat.proto` into Go source files without errors. The generated Go code includes structs for `Workload`, `VirtualLoadBalancer`, `JobDefinition`, `BuildDefinition`, `Namespace`, internal `Node`, and `ClusterConfiguration`.
3. Unit tests in `internal/config/parse_test.go` demonstrate:
* Successful parsing of a valid `cluster.kat` file into the `ClusterConfiguration` struct, including application of default values.
* Error handling for invalid or incomplete `cluster.kat` files.
4. Unit tests in `internal/config/parse_test.go` (and potentially `internal/utils/tar_test.go`) demonstrate:
* Successful untarring of a sample `tar.gz` Quadlet archive.
* Successful parsing of individual Quadlet files (e.g., `workload.kat`, `VirtualLoadBalancer.kat`) into their respective Go structs (using generated proto types).
* Basic validation of required fields within individual Quadlet files.
5. All code is committed to Git.
6. (Optional but good practice) A basic `README.md` is started.

81
docs/plan/phase1.md Normal file
View File

@ -0,0 +1,81 @@
# **Phase 1: State Management & Leader Election**
* **Goal**: Establish the foundational state layer using embedded etcd and implement a reliable leader election mechanism. A single `kat-agent` can initialize a cluster, become its leader, and store initial configuration.
* **RFC Sections Primarily Used**: 2.2 (Embedded etcd), 3.9 (ClusterConfiguration), 5.1 (State Store Interface), 5.2 (etcd Implementation Details), 5.3 (Leader Election).
**Tasks & Sub-Tasks:**
1. **Define `StateStore` Go Interface (`internal/store/interface.go`)**
* **Purpose**: Create the abstraction layer for all state operations, decoupling the rest of the system from direct etcd dependencies.
* **Details**: Transcribe the Go interface from RFC 5.1 verbatim. Include `KV`, `WatchEvent`, `EventType`, `Compare`, `Op`, `OpType` structs/constants.
* **Verification**: Code compiles. Interface definition matches RFC.
2. **Implement Embedded etcd Server Logic (`internal/store/etcd.go`)**
* **Purpose**: Allow `kat-agent` to run its own etcd instance for single-node clusters or as part of a multi-node quorum.
* **Details**:
* Use `go.etcd.io/etcd/server/v3/embed`.
* Function to start an embedded etcd server:
* Input: configuration parameters (data directory, peer URLs, client URLs, name). These will come from `cluster.kat` or defaults.
* Output: a running `embed.Etcd` instance or an error.
* Graceful shutdown logic for the embedded etcd server.
* **Verification**: A test can start and stop an embedded etcd server. Data directory is created and used.
3. **Implement `StateStore` with etcd Backend (`internal/store/etcd.go`)**
* **Purpose**: Provide the concrete implementation for interacting with an etcd cluster (embedded or external).
* **Details**:
* Create a struct that implements the `StateStore` interface and holds an `etcd/clientv3.Client`.
* Implement `Put(ctx, key, value)`: Use `client.Put()`.
* Implement `Get(ctx, key)`: Use `client.Get()`. Handle key-not-found. Populate `KV.Version` with `ModRevision`.
* Implement `Delete(ctx, key)`: Use `client.Delete()`.
* Implement `List(ctx, prefix)`: Use `client.Get()` with `clientv3.WithPrefix()`.
* Implement `Watch(ctx, keyOrPrefix, startRevision)`: Use `client.Watch()`. Translate etcd events to `WatchEvent`.
* Implement `Close()`: Close the `clientv3.Client`.
* Implement `Campaign(ctx, leaderID, leaseTTLSeconds)`:
* Use `concurrency.NewSession()` to create a lease.
* Use `concurrency.NewElection()` and `election.Campaign()`.
* Return a context that is cancelled when leadership is lost (e.g., by watching the campaign context or session done channel).
* Implement `Resign(ctx)`: Use `election.Resign()`.
* Implement `GetLeader(ctx)`: Observe the election or query the leader key.
* Implement `DoTransaction(ctx, checks, onSuccess, onFailure)`: Use `client.Txn()` with `clientv3.Compare` and `clientv3.Op`.
* **Potential Challenges**: Correctly handling etcd transaction semantics, context propagation, and error translation. Efficiently managing watches.
* **Verification**:
* Unit tests for each `StateStore` method using a real embedded etcd instance (test-scoped).
* Verify `Put` then `Get` retrieves the correct value and version.
* Verify `List` with prefix.
* Verify `Delete` removes the key.
* Verify `Watch` receives correct events for puts/deletes.
* Verify `DoTransaction` commits on success and rolls back on failure.
4. **Integrate Leader Election into `kat-agent` (`cmd/kat-agent/main.go`, `internal/leader/election.go` - new file maybe)**
* **Purpose**: Enable an agent instance to attempt to become the cluster leader.
* **Details**:
* `kat-agent` main function will initialize its `StateStore` client.
* A dedicated goroutine will call `StateStore.Campaign()`.
* The outcome of `Campaign` (e.g., leadership acquired, context for leadership duration) will determine if the agent activates its Leader-specific logic (Phase 2+).
* Leader ID could be `nodeName` or a UUID. Lease TTL from `cluster.kat`.
* **Verification**:
* Start one `kat-agent` with etcd enabled; it should log "became leader".
* Start a second `kat-agent` configured to connect to the first's etcd; it should log "observing leader <leaderID>" or similar, but not become leader itself.
* If the first agent (leader) is stopped, the second agent should eventually log "became leader".
5. **Implement Basic `kat-agent init` Command (`cmd/kat-agent/main.go`, `internal/config/parse.go`)**
* **Purpose**: Initialize a new KAT cluster (single node initially).
* **Details**:
* Define `init` subcommand in `kat-agent` using a CLI library (e.g., `cobra`).
* Flag: `--config <path_to_cluster.kat>`.
* Parse `cluster.kat` (from Phase 0, now used to extract etcd peer/client URLs, data dir, backup paths etc.).
* Generate a persistent Cluster UID and store it in etcd (e.g., `/kat/config/cluster_uid`).
* Store `cluster.kat` relevant parameters (or the whole sanitized config) into etcd (e.g., under `/kat/config/cluster_config`).
* Start the embedded etcd server using parsed configurations.
* Initiate leader election.
* **Potential Challenges**: Ensuring `cluster.kat` parsing is robust. Handling existing data directories.
* **Milestone Verification**:
* Running `kat-agent init --config examples/cluster.kat` on a clean system:
* Starts the `kat-agent` process.
* Creates the etcd data directory.
* Logs "Successfully initialized etcd".
* Logs "Became leader: <nodeName>".
* Using `etcdctl` (or a simple `StateStore.Get` test client):
* Verify `/kat/config/cluster_uid` exists and has a UUID.
* Verify `/kat/config/cluster_config` (or similar keys) contains data from `cluster.kat` (e.g., `clusterCIDR`, `serviceCIDR`, `agentPort`, `apiPort`).
* Verify the leader election key exists for the current leader.

98
docs/plan/phase2.md Normal file
View File

@ -0,0 +1,98 @@
# **Phase 2: Basic Agent & Node Lifecycle (Init, Join, PKI)**
* **Goal**: Implement the secure registration of a new agent node to an existing leader, including PKI for mTLS, and establish periodic heartbeating for status updates and failure detection.
* **RFC Sections Primarily Used**: 2.3 (Node Communication Protocol), 4.1.1 (Initial Leader Setup - CA), 4.1.2 (Agent Node Join - CSR), 10.1 (API Security - mTLS), 10.6 (Internal PKI), 4.1.3 (Node Heartbeat), 4.1.4 (Node Departure and Failure Detection - basic).
**Tasks & Sub-Tasks:**
1. **Implement Internal PKI Utilities (`internal/pki/ca.go`, `internal/pki/certs.go`)**
* **Purpose**: Create and manage the Certificate Authority and sign certificates for mTLS.
* **Details**:
* `GenerateCA()`: Creates a new RSA key pair and a self-signed X.509 CA certificate. Saves to disk (e.g., `/var/lib/kat/pki/ca.key`, `/var/lib/kat/pki/ca.crt`). Path from `cluster.kat` `backupPath` parent dir, or a new `pkiPath`.
* `GenerateCertificateRequest(commonName, keyOutPath, csrOutPath)`: Agent uses this. Generates RSA key, creates a CSR.
* `SignCertificateRequest(caKeyPath, caCertPath, csrData, certOutPath, duration)`: Leader uses this. Loads CA key/cert, parses CSR, issues a signed certificate.
* Helper functions to load keys and certs from disk.
* **Potential Challenges**: Handling cryptographic operations correctly and securely. Permissions for key storage.
* **Verification**: Unit tests for `GenerateCA`, `GenerateCertificateRequest`, `SignCertificateRequest`. Generated certs should be verifiable against the CA.
2. **Leader: Initialize CA & Its Own mTLS Certs on `init` (`cmd/kat-agent/main.go`)**
* **Purpose**: The first leader needs to establish the PKI and secure its own API endpoint.
* **Details**:
* During `kat-agent init`, after etcd is up and leadership is confirmed:
* Call `pki.GenerateCA()` if CA files don't exist.
* Generate its own server key and CSR (e.g., for `leader.kat.cluster.local`).
* Sign its own CSR using the CA to get its server certificate.
* Configure its (future) API HTTP server to use these server key/cert for TLS and require client certs (mTLS).
* **Verification**: After `kat-agent init`, CA key/cert and leader's server key/cert exist in the configured PKI path.
3. **Implement Basic API Server with mTLS on Leader (`internal/api/server.go`, `internal/api/router.go`)**
* **Purpose**: Provide the initial HTTP endpoints required for agent join, secured with mTLS.
* **Details**:
* Setup `http.Server` with `tls.Config`:
* `Certificates`: Leader's server key/cert.
* `ClientAuth: tls.RequireAndVerifyClientCert`.
* `ClientCAs`: Pool containing the cluster CA certificate.
* Minimal router (e.g., `gorilla/mux` or `http.ServeMux`) for:
* `POST /internal/v1alpha1/join`: Endpoint for agent to submit CSR. (Internal as it's part of bootstrap).
* **Verification**: An HTTPS client (e.g., `curl` with appropriate client certs) can connect to the leader's API port if it presents a cert signed by the cluster CA. Connection fails without a client cert or with a cert from a different CA.
4. **Agent: `join` Command & CSR Submission (`cmd/kat-agent/main.go`, `internal/cli/join.go` - or similar for agent logic)**
* **Purpose**: Allow a new agent to request to join the cluster and obtain its mTLS credentials.
* **Details**:
* `kat-agent join` subcommand:
* Flags: `--leader-api <ip:port>`, `--advertise-address <ip_or_interface_name>`, `--node-name <name>` (optional, leader can generate).
* Generate its own key pair and CSR using `pki.GenerateCertificateRequest()`.
* Make an HTTP POST to Leader's `/internal/v1alpha1/join` endpoint:
* Payload: CSR data, advertise address, requested node name, initial WireGuard public key (placeholder for now).
* For this *initial* join, the client may need to trust the leader's CA cert via an out-of-band mechanism or `--leader-ca-cert` flag, or use a token for initial auth if mTLS is strictly enforced from the start. *RFC implies mTLS is mandatory, so agent needs CA cert to trust leader, and leader needs to accept CSR perhaps based on a pre-shared token initially before agent has its own signed cert.* For simplicity in V1, the initial join POST might happen over HTTPS where the agent trusts the leader's self-signed cert (if leader has one before CA is used for client auth) or a pre-shared token authorizes the CSR signing. RFC 4.1.2 states "Leader, upon validating the join request (V1 has no strong token validation, relies on network trust)". This needs clarification. *Assume network trust for now: agent connects, sends CSR, leader signs.*
* Receive signed certificate and CA certificate from Leader. Store them locally.
* **Potential Challenges**: Securely bootstrapping trust for the very first communication to the leader to submit the CSR.
* **Verification**: `kat-agent join` command:
* Generates key/CSR.
* Successfully POSTs CSR to leader.
* Receives and saves its signed certificate and the CA certificate.
5. **Leader: CSR Signing & Node Registration (Handler for `/internal/v1alpha1/join`)**
* **Purpose**: Validate joining agent, sign its CSR, and record its registration.
* **Details**:
* Handler for `/internal/v1alpha1/join`:
* Parse CSR, advertise address, WG pubkey from request.
* Validate (minimal for now).
* Generate a unique Node Name if not provided. Assign a Node UID.
* Sign the CSR using `pki.SignCertificateRequest()`.
* Store Node registration data in etcd via `StateStore` (`/kat/nodes/registration/{nodeName}`: UID, advertise address, WG pubkey placeholder, join timestamp).
* Return the signed agent certificate and the cluster CA certificate to the agent.
* **Verification**:
* After agent joins, its certificate is signed by the cluster CA.
* Node registration data appears correctly in etcd under `/kat/nodes/registration/{nodeName}`.
6. **Agent: Establish mTLS Client for Subsequent Comms & Implement Heartbeating (`internal/agent/agent.go`)**
* **Purpose**: Agent uses its new mTLS certs to communicate status to the Leader.
* **Details**:
* Agent configures its HTTP client to use its signed key/cert and the cluster CA cert for all future Leader communications.
* Periodic Heartbeat (RFC 4.1.3):
* Ticker (e.g., every `agentTickSeconds` from `cluster.kat`, default 15s).
* On tick, gather basic node status (node name, timestamp, initial resource capacity stubs).
* HTTP `POST` to Leader's `/v1alpha1/nodes/{nodeName}/status` endpoint using the mTLS-configured client.
* **Verification**: Agent logs successful heartbeat POSTs.
7. **Leader: Receive Heartbeats & Basic Failure Detection (Handler for `/v1alpha1/nodes/{nodeName}/status`, `internal/leader/leader.go`)**
* **Purpose**: Leader tracks agent status and detects failures.
* **Details**:
* API endpoint `/v1alpha1/nodes/{nodeName}/status` (mTLS required):
* Receives status update from agent.
* Updates node's actual state in etcd (`/kat/nodes/status/{nodeName}/heartbeat`: timestamp, reported status). Could use an etcd lease for this key, renewed by agent heartbeats.
* Failure Detection (RFC 4.1.4):
* Leader has a reconciliation loop or periodic check.
* Scans `/kat/nodes/status/` in etcd.
* If a node's last heartbeat timestamp is older than `nodeLossTimeoutSeconds` (from `cluster.kat`), update its status in etcd to `NotReady` (e.g., `/kat/nodes/status/{nodeName}/condition: NotReady`).
* **Potential Challenges**: Efficiently scanning for dead nodes without excessive etcd load.
* **Milestone Verification**:
* `kat-agent init` runs as Leader, CA created, its API is up with mTLS.
* A second `kat-agent join ...` process successfully:
* Generates CSR, gets it signed by Leader.
* Saves its cert and CA cert.
* Starts sending heartbeats to Leader using mTLS.
* Leader logs receipt of heartbeats from the joined Agent.
* Node status (last heartbeat time) is updated in etcd by the Leader.
* If the joined Agent process is stopped, after `nodeLossTimeoutSeconds`, the Leader updates the node's status in etcd to `NotReady`. This can be verified using `etcdctl` or a `StateStore.Get` call.

102
docs/plan/phase3.md Normal file
View File

@ -0,0 +1,102 @@
# **Phase 3: Container Runtime Interface & Local Podman Management**
* **Goal**: Abstract container management operations behind a `ContainerRuntime` interface and implement it using Podman CLI, enabling an agent to manage containers rootlessly based on (mocked) instructions.
* **RFC Sections Primarily Used**: 6.1 (Runtime Interface Definition), 6.2 (Default Implementation: Podman), 6.3 (Rootless Execution Strategy).
**Tasks & Sub-Tasks:**
1. **Define `ContainerRuntime` Go Interface (`internal/runtime/interface.go`)**
* **Purpose**: Abstract all container operations (build, pull, run, stop, inspect, logs, etc.).
* **Details**: Transcribe the Go interface from RFC 6.1 precisely. Include all specified structs (`ImageSummary`, `ContainerStatus`, `BuildOptions`, `PortMapping`, `VolumeMount`, `ResourceSpec`, `ContainerCreateOptions`, `ContainerHealthCheck`) and enums (`ContainerState`, `HealthState`).
* **Verification**: Code compiles. Interface and type definitions match RFC.
2. **Implement Podman Backend for `ContainerRuntime` (`internal/runtime/podman.go`) - Core Lifecycle Methods**
* **Purpose**: Translate `ContainerRuntime` calls into `podman` CLI commands.
* **Details (for each method, focus on these first):**
* `PullImage(ctx, imageName, platform)`:
* Cmd: `podman pull {imageName}` (add `--platform` if specified).
* Parse output to get image ID (e.g., from `podman inspect {imageName} --format '{{.Id}}'`).
* `CreateContainer(ctx, opts ContainerCreateOptions)`:
* Cmd: `podman create ...`
* Translate `ContainerCreateOptions` into `podman create` flags:
* `--name {opts.InstanceID}` (KAT's unique ID for the instance).
* `--hostname {opts.Hostname}`.
* `--env` for `opts.Env`.
* `--label` for `opts.Labels` (include KAT ownership labels like `kat.dws.rip/workload-name`, `kat.dws.rip/namespace`, `kat.dws.rip/instance-id`).
* `--restart {opts.RestartPolicy}` (map to Podman's "no", "on-failure", "always").
* Resource mapping: `--cpus` (for quota), `--cpu-shares`, `--memory`.
* `--publish` for `opts.Ports`.
* `--volume` for `opts.Volumes` (source will be host path, destination is container path).
* `--network {opts.NetworkName}` and `--ip {opts.IPAddress}` if specified.
* `--user {opts.User}`.
* `--cap-add`, `--cap-drop`, `--security-opt`.
* Podman native healthcheck flags from `opts.HealthCheck`.
* `--systemd={opts.Systemd}`.
* Parse output for container ID.
* `StartContainer(ctx, containerID)`: Cmd: `podman start {containerID}`.
* `StopContainer(ctx, containerID, timeoutSeconds)`: Cmd: `podman stop -t {timeoutSeconds} {containerID}`.
* `RemoveContainer(ctx, containerID, force, removeVolumes)`: Cmd: `podman rm {containerID}` (add `--force`, `--volumes`).
* `GetContainerStatus(ctx, containerOrName)`:
* Cmd: `podman inspect {containerOrName}`.
* Parse JSON output to populate `ContainerStatus` struct (State, ExitCode, StartedAt, FinishedAt, Health, ImageID, ImageName, OverlayIP if available from inspect).
* Podman health status needs to be mapped to `HealthState`.
* `StreamContainerLogs(ctx, containerID, follow, since, stdout, stderr)`:
* Cmd: `podman logs {containerID}` (add `--follow`, `--since`).
* Stream `os/exec.Cmd.Stdout` and `os/exec.Cmd.Stderr` to the provided `io.Writer`s.
* **Helper**: A utility function to run `podman` commands as a specific rootless user (see Rootless Execution below).
* **Potential Challenges**: Correctly mapping all `ContainerCreateOptions` to Podman flags. Parsing varied `podman inspect` output. Managing `os/exec` for logs. Robust error handling from CLI output.
* **Verification**:
* Unit tests for each implemented method, mocking `os/exec` calls to verify command construction and output parsing.
* *Requires Podman installed for integration-style unit tests*: Tests that actually execute `podman` commands (e.g., pull alpine, create, start, inspect, stop, rm) and verify state changes.
3. **Implement Rootless Execution Strategy (`internal/runtime/podman.go` helpers, `internal/agent/runtime.go`)**
* **Purpose**: Ensure containers are run by unprivileged users using systemd for supervision.
* **Details**:
* **User Assumption**: For Phase 3, *assume* the dedicated user (e.g., `kat_wl_mywebapp`) already exists on the system and `loginctl enable-linger <username>` has been run manually. The username could be passed in `ContainerCreateOptions.User` or derived.
* **Podman Command Execution Context**:
* The `kat-agent` process itself might run as root or a privileged user.
* When executing `podman` commands for a workload, it MUST run them as the target unprivileged user.
* This can be achieved using `sudo -u {username} podman ...` or more directly via `nsenter`/`setuid` if the agent has capabilities, or by setting `XDG_RUNTIME_DIR` and `DBUS_SESSION_BUS_ADDRESS` appropriately for the target user if invoking `podman` via systemd user session D-Bus API. *Simplest for now might be `sudo -u {username} podman ...` if agent is root, or ensuring agent itself runs as a user who can switch to other `kat_wl_*` users.*
* The RFC prefers "systemd user sessions". This usually means `systemctl --user ...`. To control another user's systemd session, the agent process (if root) can use `machinectl shell {username}@.host /bin/bash -c "systemctl --user ..."` or `systemd-run --user --machine={username}@.host ...`. If the agent is not root, it cannot directly control other users' systemd sessions. *This is a critical design point: how does the agent (potentially root) interact with user-level systemd?*
* RFC: "Agent uses `systemctl --user --machine={username}@.host ...`". This implies agent has permissions to do this (likely running as root or with specific polkit rules).
* **Systemd Unit Generation & Management**:
* After `podman create ...` (or instead of direct create, if `podman generate systemd` is used to create the definition), generate systemd unit:
`podman generate systemd --new --name {opts.InstanceID} --files --time 10 {imageNameUsedInCreate}`. This creates a `{opts.InstanceID}.service` file.
* The `ContainerRuntime` implementation needs to:
1. Execute `podman create` to establish the container definition (this allows Podman to manage its internal state for the container ID).
2. Execute `podman generate systemd --name {containerID}` (using the ID from create) to get the unit file content.
3. Place this unit file in the target user's systemd path (e.g., `/home/{username}/.config/systemd/user/{opts.InstanceID}.service` or `/etc/systemd/user/{opts.InstanceID}.service` if agent is root and wants to enable for any user).
4. Run `systemctl --user --machine={username}@.host daemon-reload`.
5. Start/Enable: `systemctl --user --machine={username}@.host enable --now {opts.InstanceID}.service`.
* To stop: `systemctl --user --machine={username}@.host stop {opts.InstanceID}.service`.
* To remove: `systemctl --user --machine={username}@.host disable {opts.InstanceID}.service`, then `podman rm {opts.InstanceID}`, then remove the unit file.
* Status: `systemctl --user --machine={username}@.host status {opts.InstanceID}.service` (parse output), or rely on `podman inspect` which should reflect systemd-managed state.
* **Potential Challenges**: Managing permissions for interacting with other users' systemd sessions. Correctly placing and cleaning up systemd unit files. Ensuring `XDG_RUNTIME_DIR` is set correctly for rootless Podman if not using systemd units for direct `podman run`. Systemd unit generation nuances.
* **Verification**:
* A test in `internal/agent/runtime_test.go` (or similar) can take mock `ContainerCreateOptions`.
* It calls the (mocked or real) `ContainerRuntime` implementation.
* Verify:
* Podman commands are constructed to run as the target unprivileged user.
* A systemd unit file is generated for the container.
* `systemctl --user --machine...` commands are invoked correctly to manage the service.
* The container is actually started (verify with `podman ps -a --filter label=kat.dws.rip/instance-id={instanceID}` as the target user).
* Logs can be retrieved.
* The container can be stopped and removed, including its systemd unit.
* **Milestone Verification**:
* The `ContainerRuntime` Go interface is fully defined as per RFC 6.1.
* The Podman implementation for core lifecycle methods (`PullImage`, `CreateContainer` (leading to systemd unit generation), `StartContainer` (via systemd enable/start), `StopContainer` (via systemd stop), `RemoveContainer` (via systemd disable + podman rm + unit file removal), `GetContainerStatus`, `StreamContainerLogs`) is functional.
* An `internal/agent` test (or a temporary `main.go` test harness) can:
1. Define `ContainerCreateOptions` for a simple image like `docker.io/library/alpine` with a command like `sleep 30`.
2. Specify a (manually pre-created and linger-enabled) unprivileged username.
3. Call the `ContainerRuntime` methods.
4. **Result**:
* The alpine image is pulled (if not present).
* A systemd user service unit is generated and placed correctly for the specified user.
* The service is started using `systemctl --user --machine...`.
* `podman ps --all --filter label=kat.dws.rip/instance-id=...` (run as the target user or by root seeing all containers) shows the container running or having run.
* Logs can be retrieved using the `StreamContainerLogs` method.
* The container can be stopped and removed (including its systemd unit file).
* All container operations are verifiably performed by the specified unprivileged user.
This detailed plan should provide a clearer path for implementing these initial crucial phases. Remember to keep testing iterative and focused on the RFC specifications.

1014
docs/rfc/RFC001-KAT.md Normal file

File diff suppressed because it is too large Load Diff

18
examples/cluster.kat Normal file
View File

@ -0,0 +1,18 @@
apiVersion: kat.dws.rip/v1alpha1
kind: ClusterConfiguration
metadata:
name: my-kat-cluster
spec:
clusterCIDR: "10.100.0.0/16"
serviceCIDR: "10.200.0.0/16"
nodeSubnetBits: 7 # Results in /23 node subnets (e.g., 10.100.0.0/23, 10.100.2.0/23)
clusterDomain: "kat.example.local" # Overriding default
apiPort: 9115
agentPort: 9116
etcdPeerPort: 2380
etcdClientPort: 2379
volumeBasePath: "/opt/kat/volumes" # Overriding default
backupPath: "/opt/kat/backups" # Overriding default
backupIntervalMinutes: 60
agentTickSeconds: 10
nodeLossTimeoutSeconds: 45

View File

@ -0,0 +1,15 @@
apiVersion: kat.dws.rip/v1alpha1
kind: VirtualLoadBalancer
metadata:
name: my-simple-nginx # Should match workload name
namespace: default
spec:
ports:
- name: http
containerPort: 80
protocol: TCP
healthCheck:
exec:
command: ["curl", "-f", "http://localhost/"] # Nginx doesn't have curl by default, this is illustrative
initialDelaySeconds: 5
periodSeconds: 10

View File

@ -0,0 +1,21 @@
apiVersion: kat.dws.rip/v1alpha1
kind: Workload
metadata:
name: my-simple-nginx
namespace: default
spec:
type: SERVICE
source:
image: "nginx:latest"
replicas: 2
restartPolicy:
condition: ALWAYS
container:
name: nginx-container
resources:
requests:
cpu: "50m"
memory: "64Mi"
limits:
cpu: "100m"
memory: "128Mi"

9
go.mod
View File

@ -1,3 +1,10 @@
module git.dws.rip/dubey/kat module git.dws.rip/dubey/kat
go 1.23.2 go 1.22
toolchain go1.24.2
require (
google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v3 v3.0.1
)

10
go.sum Normal file
View File

@ -0,0 +1,10 @@
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

324
internal/config/parse.go Normal file
View File

@ -0,0 +1,324 @@
// File: internal/config/parse.go
package config
import (
"fmt"
"net"
"os"
pb "git.dws.rip/dubey/kat/api/v1alpha1"
"gopkg.in/yaml.v3"
"encoding/json"
)
var _ = yaml.Unmarshal // Used for Quadlet parsing
// ParseClusterConfiguration reads, unmarshals, and validates a cluster.kat file.
func ParseClusterConfiguration(filePath string) (*pb.ClusterConfiguration, error) {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil, fmt.Errorf("cluster configuration file not found: %s", filePath)
}
yamlFile, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read cluster configuration file %s: %w", filePath, err)
}
var config pb.ClusterConfiguration
// We expect the YAML to have top-level keys like 'apiVersion', 'kind', 'metadata', 'spec'
// but our proto is just the ClusterConfiguration message.
// So, we'll unmarshal into a temporary map to extract the 'spec' and 'metadata'.
var rawConfigMap map[string]interface{}
if err = yaml.Unmarshal(yamlFile, &rawConfigMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML from %s: %w", filePath, err)
}
// Quick check for kind
kind, ok := rawConfigMap["kind"].(string)
if !ok || kind != "ClusterConfiguration" {
return nil, fmt.Errorf("invalid kind in %s: expected ClusterConfiguration, got %v", filePath, rawConfigMap["kind"])
}
metadataMap, ok := rawConfigMap["metadata"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("metadata section not found or invalid in %s", filePath)
}
metadataBytes, err := json.Marshal(metadataMap)
if err != nil {
return nil, fmt.Errorf("failed to re-marshal metadata: %w", err)
}
if err = json.Unmarshal(metadataBytes, &config.Metadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata into proto: %w", err)
}
specMap, ok := rawConfigMap["spec"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("spec section not found or invalid in %s", filePath)
}
specBytes, err := json.Marshal(specMap)
if err != nil {
return nil, fmt.Errorf("failed to re-marshal spec: %w", err)
}
if err = json.Unmarshal(specBytes, &config.Spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal spec into proto: %w", err)
}
SetClusterConfigDefaults(&config)
if err := ValidateClusterConfiguration(&config); err != nil {
return nil, fmt.Errorf("invalid cluster configuration in %s: %w", filePath, err)
}
return &config, nil
}
// SetClusterConfigDefaults applies default values to the ClusterConfiguration spec.
func SetClusterConfigDefaults(config *pb.ClusterConfiguration) {
if config.Spec == nil {
config.Spec = &pb.ClusterConfigurationSpec{}
}
s := config.Spec
if s.ClusterDomain == "" {
s.ClusterDomain = DefaultClusterDomain
}
if s.AgentPort == 0 {
s.AgentPort = DefaultAgentPort
}
if s.ApiPort == 0 {
s.ApiPort = DefaultApiPort
}
if s.EtcdPeerPort == 0 {
s.EtcdPeerPort = DefaultEtcdPeerPort
}
if s.EtcdClientPort == 0 {
s.EtcdClientPort = DefaultEtcdClientPort
}
if s.VolumeBasePath == "" {
s.VolumeBasePath = DefaultVolumeBasePath
}
if s.BackupPath == "" {
s.BackupPath = DefaultBackupPath
}
if s.BackupIntervalMinutes == 0 {
s.BackupIntervalMinutes = DefaultBackupIntervalMins
}
if s.AgentTickSeconds == 0 {
s.AgentTickSeconds = DefaultAgentTickSeconds
}
if s.NodeLossTimeoutSeconds == 0 {
s.NodeLossTimeoutSeconds = DefaultNodeLossTimeoutSec
if s.AgentTickSeconds > 0 { // If agent tick is set, derive from it
s.NodeLossTimeoutSeconds = s.AgentTickSeconds * 4 // Example: 4 ticks
}
}
if s.NodeSubnetBits == 0 {
s.NodeSubnetBits = DefaultNodeSubnetBits
}
}
// ValidateClusterConfiguration performs basic validation on the ClusterConfiguration.
func ValidateClusterConfiguration(config *pb.ClusterConfiguration) error {
if config.Metadata == nil || config.Metadata.Name == "" {
return fmt.Errorf("metadata.name is required")
}
if config.Spec == nil {
return fmt.Errorf("spec is required")
}
s := config.Spec
if s.ClusterCidr == "" {
return fmt.Errorf("spec.clusterCIDR is required")
}
if _, _, err := net.ParseCIDR(s.ClusterCidr); err != nil {
return fmt.Errorf("invalid spec.clusterCIDR %s: %w", s.ClusterCidr, err)
}
if s.ServiceCidr == "" {
return fmt.Errorf("spec.serviceCIDR is required")
}
if _, _, err := net.ParseCIDR(s.ServiceCidr); err != nil {
return fmt.Errorf("invalid spec.serviceCIDR %s: %w", s.ServiceCidr, err)
}
// Validate ports
ports := []struct {
name string
port int32
}{
{"agentPort", s.AgentPort}, {"apiPort", s.ApiPort},
{"etcdPeerPort", s.EtcdPeerPort}, {"etcdClientPort", s.EtcdClientPort},
}
for _, p := range ports {
if p.port <= 0 || p.port > 65535 {
return fmt.Errorf("invalid port for %s: %d. Must be between 1 and 65535", p.name, p.port)
}
}
// Check for port conflicts among configured ports
portSet := make(map[int32]string)
for _, p := range ports {
if existing, found := portSet[p.port]; found {
return fmt.Errorf("port conflict: %s (%d) and %s (%d) use the same port", p.name, p.port, existing, p.port)
}
portSet[p.port] = p.name
}
if s.NodeSubnetBits <= 0 || s.NodeSubnetBits >= 32 {
return fmt.Errorf("invalid spec.nodeSubnetBits: %d. Must be > 0 and < 32", s.NodeSubnetBits)
}
// Validate nodeSubnetBits against clusterCIDR prefix length
_, clusterNet, _ := net.ParseCIDR(s.ClusterCidr)
clusterPrefixLen, _ := clusterNet.Mask.Size()
if int(s.NodeSubnetBits) <= clusterPrefixLen {
// This logic might be too simple. NodeSubnetBits is the number of *additional* bits for the subnet *within* the cluster prefix.
// So, the resulting node subnet prefix length would be clusterPrefixLen + s.NodeSubnetBits.
// This must be less than 32 (or 31 for usable IPs).
// The RFC states: "Default 7 (yielding /23 subnets if clusterCIDR=/16)"
// So if clusterCIDR is /16, node subnet is / (16+7) = /23. This is valid.
// A node subnet prefix length must be > clusterPrefixLen and < 32.
if (clusterPrefixLen + int(s.NodeSubnetBits)) >= 32 {
return fmt.Errorf("spec.nodeSubnetBits (%d) combined with clusterCIDR prefix (%d) results in an invalid subnet size (>= /32)", s.NodeSubnetBits, clusterPrefixLen)
}
} else {
// This case seems unlikely if nodeSubnetBits is the number of bits for the node part.
// Let's assume nodeSubnetBits is the number of bits *after* the cluster prefix that define the node subnet.
// e.g. cluster 10.0.0.0/8, nodeSubnetBits=8 -> node subnets are /16.
}
if s.BackupIntervalMinutes < 0 { // 0 could mean disabled, but RFC implies positive
return fmt.Errorf("spec.backupIntervalMinutes must be non-negative")
}
if s.AgentTickSeconds <= 0 {
return fmt.Errorf("spec.agentTickSeconds must be positive")
}
if s.NodeLossTimeoutSeconds <= 0 {
return fmt.Errorf("spec.nodeLossTimeoutSeconds must be positive")
}
if s.NodeLossTimeoutSeconds < s.AgentTickSeconds {
return fmt.Errorf("spec.nodeLossTimeoutSeconds must be greater than or equal to spec.agentTickSeconds")
}
// volumeBasePath and backupPath should be absolute paths, but validation can be tricky
// For now, just check if they are non-empty if specified, defaults handle empty.
// A more robust check would be filepath.IsAbs()
return nil
}
// ParsedQuadletFiles holds the structured data from a Quadlet directory.
type ParsedQuadletFiles struct {
Workload *pb.Workload
VirtualLoadBalancer *pb.VirtualLoadBalancer
JobDefinition *pb.JobDefinition
BuildDefinition *pb.BuildDefinition
// Namespace is typically a cluster-level resource, not part of a workload quadlet bundle.
// If it were, it would be: Namespace *pb.Namespace
// Store raw file contents for potential future use (e.g. annotations, original source)
RawFiles map[string][]byte
}
// ParseQuadletFile unmarshals a single Quadlet file content based on its kind.
// It returns the specific protobuf message.
func ParseQuadletFile(fileName string, content []byte) (interface{}, error) {
var base struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
}
if err := yaml.Unmarshal(content, &base); err != nil {
return nil, fmt.Errorf("failed to unmarshal base YAML from %s to determine kind: %w", fileName, err)
}
// TODO: Check apiVersion, e.g., base.ApiVersion == "kat.dws.rip/v1alpha1"
var resource interface{}
var err error
switch base.Kind {
case "Workload":
var wl pb.Workload
// Similar to ClusterConfiguration, need to unmarshal metadata and spec separately
// from a temporary map if the proto doesn't match the full YAML structure directly.
// For simplicity in Phase 0, assuming direct unmarshal works if YAML matches proto structure.
// If YAML has apiVersion/kind/metadata/spec at top level, then:
var raw map[string]interface{}
if err = yaml.Unmarshal(content, &raw); err == nil {
if meta, ok := raw["metadata"]; ok {
metaBytes, _ := yaml.Marshal(meta)
yaml.Unmarshal(metaBytes, &wl.Metadata)
}
if spec, ok := raw["spec"]; ok {
specBytes, _ := yaml.Marshal(spec)
yaml.Unmarshal(specBytes, &wl.Spec)
}
}
resource = &wl
case "VirtualLoadBalancer":
var vlb pb.VirtualLoadBalancer
var raw map[string]interface{}
if err = yaml.Unmarshal(content, &raw); err == nil {
if meta, ok := raw["metadata"]; ok {
metaBytes, _ := yaml.Marshal(meta)
yaml.Unmarshal(metaBytes, &vlb.Metadata)
}
if spec, ok := raw["spec"]; ok {
specBytes, _ := yaml.Marshal(spec)
yaml.Unmarshal(specBytes, &vlb.Spec)
}
}
resource = &vlb
// Add cases for JobDefinition, BuildDefinition as they are defined
case "JobDefinition":
var jd pb.JobDefinition
// ... unmarshal logic ...
resource = &jd
case "BuildDefinition":
var bd pb.BuildDefinition
// ... unmarshal logic ...
resource = &bd
default:
return nil, fmt.Errorf("unknown Kind '%s' in file %s", base.Kind, fileName)
}
if err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML for Kind '%s' from %s: %w", base.Kind, fileName, err)
}
// TODO: Add basic validation for each parsed type (e.g., required fields like metadata.name)
return resource, nil
}
// ParseQuadletDirectory processes a map of file contents (from UntarQuadlets).
func ParseQuadletDirectory(files map[string][]byte) (*ParsedQuadletFiles, error) {
parsed := &ParsedQuadletFiles{RawFiles: files}
for fileName, content := range files {
obj, err := ParseQuadletFile(fileName, content)
if err != nil {
return nil, fmt.Errorf("error parsing quadlet file %s: %w", fileName, err)
}
switch v := obj.(type) {
case *pb.Workload:
if parsed.Workload != nil {
return nil, fmt.Errorf("multiple Workload definitions found")
}
parsed.Workload = v
case *pb.VirtualLoadBalancer:
if parsed.VirtualLoadBalancer != nil {
return nil, fmt.Errorf("multiple VirtualLoadBalancer definitions found")
}
parsed.VirtualLoadBalancer = v
// Add cases for JobDefinition, BuildDefinition
}
}
// TODO: Perform cross-Quadlet file validation (e.g., workload.kat must exist)
if parsed.Workload == nil {
return nil, fmt.Errorf("required Workload definition (workload.kat) not found in Quadlet bundle")
}
if parsed.Workload.Metadata == nil || parsed.Workload.Metadata.Name == "" {
return nil, fmt.Errorf("workload.kat must have metadata.name defined")
}
return parsed, nil
}

View File

@ -0,0 +1,334 @@
package config
import (
"os"
"strings"
"testing"
pb "git.dws.rip/dubey/kat/api/v1alpha1"
)
func createTestClusterKatFile(t *testing.T, content string) string {
t.Helper()
tmpFile, err := os.CreateTemp(t.TempDir(), "cluster.*.kat")
if err != nil {
t.Fatalf("Failed to create temp file: %v", err)
}
if _, err := tmpFile.WriteString(content); err != nil {
tmpFile.Close()
t.Fatalf("Failed to write to temp file: %v", err)
}
if err := tmpFile.Close(); err != nil {
t.Fatalf("Failed to close temp file: %v", err)
}
return tmpFile.Name()
}
func TestParseClusterConfiguration_Valid(t *testing.T) {
yamlContent := `
apiVersion: kat.dws.rip/v1alpha1
kind: ClusterConfiguration
metadata:
name: test-cluster
spec:
cluster_cidr: "10.0.0.0/16"
service_cidr: "10.1.0.0/16"
node_subnet_bits: 8 # /24 for nodes
api_port: 8080 # Non-default
`
filePath := createTestClusterKatFile(t, yamlContent)
config, err := ParseClusterConfiguration(filePath)
if err != nil {
t.Fatalf("ParseClusterConfiguration() error = %v, wantErr %v", err, false)
}
if config.Metadata.Name != "test-cluster" {
t.Errorf("Expected metadata.name 'test-cluster', got '%s'", config.Metadata.Name)
}
if config.Spec.ClusterCidr != "10.0.0.0/16" {
t.Errorf("Expected spec.clusterCIDR '10.0.0.0/16', got '%s'", config.Spec.ClusterCidr)
}
if config.Spec.ApiPort != 8080 {
t.Errorf("Expected spec.apiPort 8080, got %d", config.Spec.ApiPort)
}
// Check a default value
if config.Spec.ClusterDomain != DefaultClusterDomain {
t.Errorf("Expected default spec.clusterDomain '%s', got '%s'", DefaultClusterDomain, config.Spec.ClusterDomain)
}
if config.Spec.NodeSubnetBits != 8 {
t.Errorf("Expected spec.nodeSubnetBits 8, got %d", config.Spec.NodeSubnetBits)
}
}
func TestParseClusterConfiguration_FileNotFound(t *testing.T) {
_, err := ParseClusterConfiguration("nonexistent.kat")
if err == nil {
t.Fatalf("ParseClusterConfiguration() with non-existent file did not return an error")
}
if !strings.Contains(err.Error(), "file not found") {
t.Errorf("Expected 'file not found' error, got: %v", err)
}
}
func TestParseClusterConfiguration_InvalidYAML(t *testing.T) {
filePath := createTestClusterKatFile(t, "this: is: not: valid: yaml")
_, err := ParseClusterConfiguration(filePath)
if err == nil {
t.Fatalf("ParseClusterConfiguration() with invalid YAML did not return an error")
}
if !strings.Contains(err.Error(), "unmarshal YAML") {
t.Errorf("Expected 'unmarshal YAML' error, got: %v", err)
}
}
func TestParseClusterConfiguration_MissingRequiredFields(t *testing.T) {
tests := []struct {
name string
content string
wantErr string
}{
{
name: "missing metadata name",
content: `
apiVersion: kat.dws.rip/v1alpha1
kind: ClusterConfiguration
spec:
clusterCIDR: "10.0.0.0/16"
serviceCIDR: "10.1.0.0/16"
`,
wantErr: "metadata section not found",
},
{
name: "missing clusterCIDR",
content: `
apiVersion: kat.dws.rip/v1alpha1
kind: ClusterConfiguration
metadata:
name: test-cluster
spec:
serviceCIDR: "10.1.0.0/16"
`,
wantErr: "spec.clusterCIDR is required",
},
{
name: "invalid kind",
content: `
apiVersion: kat.dws.rip/v1alpha1
kind: WrongKind
metadata:
name: test-cluster
spec:
clusterCIDR: "10.0.0.0/16"
serviceCIDR: "10.1.0.0/16"
`,
wantErr: "invalid kind",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filePath := createTestClusterKatFile(t, tt.content)
_, err := ParseClusterConfiguration(filePath)
if err == nil {
t.Fatalf("ParseClusterConfiguration() did not return an error for %s", tt.name)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("Expected error containing '%s', got: %v", tt.wantErr, err)
}
})
}
}
func TestSetClusterConfigDefaults(t *testing.T) {
config := &pb.ClusterConfiguration{
Spec: &pb.ClusterConfigurationSpec{},
}
SetClusterConfigDefaults(config)
if config.Spec.ClusterDomain != DefaultClusterDomain {
t.Errorf("DefaultClusterDomain: got %s, want %s", config.Spec.ClusterDomain, DefaultClusterDomain)
}
if config.Spec.ApiPort != DefaultApiPort {
t.Errorf("DefaultApiPort: got %d, want %d", config.Spec.ApiPort, DefaultApiPort)
}
if config.Spec.AgentPort != DefaultAgentPort {
t.Errorf("DefaultAgentPort: got %d, want %d", config.Spec.AgentPort, DefaultAgentPort)
}
if config.Spec.EtcdClientPort != DefaultEtcdClientPort {
t.Errorf("DefaultEtcdClientPort: got %d, want %d", config.Spec.EtcdClientPort, DefaultEtcdClientPort)
}
if config.Spec.EtcdPeerPort != DefaultEtcdPeerPort {
t.Errorf("DefaultEtcdPeerPort: got %d, want %d", config.Spec.EtcdPeerPort, DefaultEtcdPeerPort)
}
if config.Spec.VolumeBasePath != DefaultVolumeBasePath {
t.Errorf("DefaultVolumeBasePath: got %s, want %s", config.Spec.VolumeBasePath, DefaultVolumeBasePath)
}
if config.Spec.BackupPath != DefaultBackupPath {
t.Errorf("DefaultBackupPath: got %s, want %s", config.Spec.BackupPath, DefaultBackupPath)
}
if config.Spec.BackupIntervalMinutes != DefaultBackupIntervalMins {
t.Errorf("DefaultBackupIntervalMins: got %d, want %d", config.Spec.BackupIntervalMinutes, DefaultBackupIntervalMins)
}
if config.Spec.AgentTickSeconds != DefaultAgentTickSeconds {
t.Errorf("DefaultAgentTickSeconds: got %d, want %d", config.Spec.AgentTickSeconds, DefaultAgentTickSeconds)
}
if config.Spec.NodeLossTimeoutSeconds != DefaultNodeLossTimeoutSec {
t.Errorf("DefaultNodeLossTimeoutSec: got %d, want %d", config.Spec.NodeLossTimeoutSeconds, DefaultNodeLossTimeoutSec)
}
if config.Spec.NodeSubnetBits != DefaultNodeSubnetBits {
t.Errorf("DefaultNodeSubnetBits: got %d, want %d", config.Spec.NodeSubnetBits, DefaultNodeSubnetBits)
}
// Test NodeLossTimeoutSeconds derivation
configWithTick := &pb.ClusterConfiguration{
Spec: &pb.ClusterConfigurationSpec{AgentTickSeconds: 10},
}
SetClusterConfigDefaults(configWithTick)
if configWithTick.Spec.NodeLossTimeoutSeconds != 40 { // 10 * 4
t.Errorf("Derived NodeLossTimeoutSeconds: got %d, want %d", configWithTick.Spec.NodeLossTimeoutSeconds, 40)
}
}
func TestValidateClusterConfiguration_InvalidValues(t *testing.T) {
baseValidSpec := func() *pb.ClusterConfigurationSpec {
return &pb.ClusterConfigurationSpec{
ClusterCidr: "10.0.0.0/16",
ServiceCidr: "10.1.0.0/16",
NodeSubnetBits: 8,
ClusterDomain: "test.local",
AgentPort: 10250,
ApiPort: 10251,
EtcdPeerPort: 2380,
EtcdClientPort: 2379,
VolumeBasePath: "/var/lib/kat/volumes",
BackupPath: "/var/lib/kat/backups",
BackupIntervalMinutes: 30,
AgentTickSeconds: 15,
NodeLossTimeoutSeconds: 60,
}
}
baseValidMetadata := func() *pb.ObjectMeta {
return &pb.ObjectMeta{Name: "test"}
}
tests := []struct {
name string
mutator func(cfg *pb.ClusterConfiguration)
wantErr string
}{
{"invalid clusterCIDR", func(cfg *pb.ClusterConfiguration) { cfg.Spec.ClusterCidr = "invalid" }, "invalid spec.clusterCIDR"},
{"invalid serviceCIDR", func(cfg *pb.ClusterConfiguration) { cfg.Spec.ServiceCidr = "invalid" }, "invalid spec.serviceCIDR"},
{"invalid agentPort low", func(cfg *pb.ClusterConfiguration) { cfg.Spec.AgentPort = 0 }, "invalid port for agentPort"},
{"invalid agentPort high", func(cfg *pb.ClusterConfiguration) { cfg.Spec.AgentPort = 70000 }, "invalid port for agentPort"},
{"port conflict", func(cfg *pb.ClusterConfiguration) { cfg.Spec.ApiPort = cfg.Spec.AgentPort }, "port conflict"},
{"invalid nodeSubnetBits low", func(cfg *pb.ClusterConfiguration) { cfg.Spec.NodeSubnetBits = 0 }, "invalid spec.nodeSubnetBits"},
{"invalid nodeSubnetBits high", func(cfg *pb.ClusterConfiguration) { cfg.Spec.NodeSubnetBits = 32 }, "invalid spec.nodeSubnetBits"},
{"invalid nodeSubnetBits vs clusterCIDR", func(cfg *pb.ClusterConfiguration) {
cfg.Spec.ClusterCidr = "10.0.0.0/28"
cfg.Spec.NodeSubnetBits = 8
}, "results in an invalid subnet size"},
{"invalid agentTickSeconds", func(cfg *pb.ClusterConfiguration) { cfg.Spec.AgentTickSeconds = 0 }, "agentTickSeconds must be positive"},
{"invalid nodeLossTimeoutSeconds", func(cfg *pb.ClusterConfiguration) { cfg.Spec.NodeLossTimeoutSeconds = 0 }, "nodeLossTimeoutSeconds must be positive"},
{"nodeLoss < agentTick", func(cfg *pb.ClusterConfiguration) {
cfg.Spec.NodeLossTimeoutSeconds = cfg.Spec.AgentTickSeconds - 1
}, "nodeLossTimeoutSeconds must be greater"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := &pb.ClusterConfiguration{Metadata: baseValidMetadata(), Spec: baseValidSpec()}
tt.mutator(config)
err := ValidateClusterConfiguration(config)
if err == nil {
t.Fatalf("ValidateClusterConfiguration() did not return an error for %s", tt.name)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("Expected error containing '%s', got: %v", tt.wantErr, err)
}
})
}
}
func TestParseQuadletDirectory_ValidSimple(t *testing.T) {
files := map[string][]byte{
"workload.kat": []byte(`
apiVersion: kat.dws.rip/v1alpha1
kind: Workload
metadata:
name: test-workload
spec:
type: SERVICE
source:
image: "nginx:latest"
`),
"vlb.kat": []byte(`
apiVersion: kat.dws.rip/v1alpha1
kind: VirtualLoadBalancer
metadata:
name: test-workload # Assumed to match workload name
spec:
ports:
- containerPort: 80
`),
}
parsed, err := ParseQuadletDirectory(files)
if err != nil {
t.Fatalf("ParseQuadletDirectory() error = %v", err)
}
if parsed.Workload == nil {
t.Fatal("Parsed Workload is nil")
}
if parsed.Workload.Metadata.Name != "test-workload" {
t.Errorf("Expected Workload name 'test-workload', got '%s'", parsed.Workload.Metadata.Name)
}
if parsed.VirtualLoadBalancer == nil {
t.Fatal("Parsed VirtualLoadBalancer is nil")
}
if parsed.VirtualLoadBalancer.Metadata.Name != "test-workload" {
t.Errorf("Expected VLB name 'test-workload', got '%s'", parsed.VirtualLoadBalancer.Metadata.Name)
}
}
func TestParseQuadletDirectory_MissingWorkload(t *testing.T) {
files := map[string][]byte{
"vlb.kat": []byte(`kind: VirtualLoadBalancer`),
}
_, err := ParseQuadletDirectory(files)
if err == nil {
t.Fatal("ParseQuadletDirectory() with missing workload.kat did not return an error")
}
if !strings.Contains(err.Error(), "required Workload definition (workload.kat) not found") {
t.Errorf("Expected 'required Workload' error, got: %v", err)
}
}
func TestParseQuadletDirectory_MultipleWorkloads(t *testing.T) {
files := map[string][]byte{
"workload1.kat": []byte(`
apiVersion: kat.dws.rip/v1alpha1
kind: Workload
metadata:
name: wl1
spec:
type: SERVICE
source: {image: "img1"}`),
"workload2.kat": []byte(`
apiVersion: kat.dws.rip/v1alpha1
kind: Workload
metadata:
name: wl2
spec:
type: SERVICE
source: {image: "img2"}`),
}
_, err := ParseQuadletDirectory(files)
if err == nil {
t.Fatal("ParseQuadletDirectory() with multiple workload.kat did not return an error")
}
if !strings.Contains(err.Error(), "multiple Workload definitions found") {
t.Errorf("Expected 'multiple Workload' error, got: %v", err)
}
}

23
internal/config/types.go Normal file
View File

@ -0,0 +1,23 @@
// File: internal/config/types.go
package config
// For Phase 0, we will primarily use the generated protobuf types
// (e.g., *v1alpha1.ClusterConfiguration) directly.
// This file can hold auxiliary types or constants related to config parsing if needed later.
const (
DefaultClusterDomain = "kat.cluster.local"
DefaultAgentPort = 9116
DefaultApiPort = 9115
DefaultEtcdPeerPort = 2380
DefaultEtcdClientPort = 2379
DefaultVolumeBasePath = "/var/lib/kat/volumes"
DefaultBackupPath = "/var/lib/kat/backups"
DefaultBackupIntervalMins = 30
DefaultAgentTickSeconds = 15
DefaultNodeLossTimeoutSec = 60 // DefaultNodeLossTimeoutSeconds = DefaultAgentTickSeconds * 4 (example logic)
DefaultNodeSubnetBits = 7 // yields /23 from /16, or /31 from /24 etc. (5 bits for /29, 7 for /25)
// RFC says 7 for /23 from /16. This means 2^(32-16-7) = 2^9 = 512 IPs per node subnet.
// If nodeSubnetBits means bits for the node portion *within* the host part of clusterCIDR:
// e.g. /16 -> 16 host bits. If nodeSubnetBits = 7, then node subnet is / (16+7) = /23.
)

87
internal/utils/tar.go Normal file
View File

@ -0,0 +1,87 @@
package utils
import (
"archive/tar"
"compress/gzip"
"fmt"
"io"
"path/filepath"
"strings"
)
const maxQuadletFileSize = 1 * 1024 * 1024 // 1MB limit per file in tarball
const maxTotalQuadletSize = 5 * 1024 * 1024 // 5MB limit for total uncompressed size
const maxQuadletFiles = 20 // Max number of files in a quadlet bundle
// UntarQuadlets unpacks a tar.gz stream in memory and returns a map of fileName -> fileContent.
// It performs basic validation on file names and sizes.
func UntarQuadlets(reader io.Reader) (map[string][]byte, error) {
gzr, err := gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
defer gzr.Close()
tr := tar.NewReader(gzr)
files := make(map[string][]byte)
var totalSize int64
fileCount := 0
for {
header, err := tr.Next()
if err == io.EOF {
break // End of archive
}
if err != nil {
return nil, fmt.Errorf("failed to read tar header: %w", err)
}
// Basic security checks
if strings.Contains(header.Name, "..") {
return nil, fmt.Errorf("invalid file path in tar: %s (contains '..')", header.Name)
}
// Ensure files are *.kat and are not in subdirectories within the tarball
// The Quadlet concept implies a flat directory of *.kat files.
if filepath.Dir(header.Name) != "." && filepath.Dir(header.Name) != "" {
return nil, fmt.Errorf("invalid file path in tar: %s (subdirectories are not allowed for Quadlet files)", header.Name)
}
if !strings.HasSuffix(strings.ToLower(header.Name), ".kat") {
return nil, fmt.Errorf("invalid file type in tar: %s (only .kat files are allowed)", header.Name)
}
switch header.Typeflag {
case tar.TypeReg: // Regular file
fileCount++
if fileCount > maxQuadletFiles {
return nil, fmt.Errorf("too many files in quadlet bundle; limit %d", maxQuadletFiles)
}
if header.Size > maxQuadletFileSize {
return nil, fmt.Errorf("file %s in tar is too large: %d bytes (max %d)", header.Name, header.Size, maxQuadletFileSize)
}
totalSize += header.Size
if totalSize > maxTotalQuadletSize {
return nil, fmt.Errorf("total size of files in tar is too large (max %d MB)", maxTotalQuadletSize/(1024*1024))
}
content, err := io.ReadAll(tr)
if err != nil {
return nil, fmt.Errorf("failed to read file content for %s from tar: %w", header.Name, err)
}
if int64(len(content)) != header.Size {
return nil, fmt.Errorf("file %s in tar has inconsistent size: header %d, read %d", header.Name, header.Size, len(content))
}
files[header.Name] = content
case tar.TypeDir: // Directory
// Directories are ignored; we expect a flat structure of .kat files.
continue
default:
// Symlinks, char devices, etc. are not allowed.
return nil, fmt.Errorf("unsupported file type in tar for %s: typeflag %c", header.Name, header.Typeflag)
}
}
if len(files) == 0 {
return nil, fmt.Errorf("no .kat files found in the provided archive")
}
return files, nil
}

205
internal/utils/tar_test.go Normal file
View File

@ -0,0 +1,205 @@
package utils
import (
"archive/tar"
"bytes"
"compress/gzip"
"io"
"path/filepath"
"strings"
"testing"
)
func createTestTarGz(t *testing.T, files map[string]string, modifyHeader func(hdr *tar.Header)) io.Reader {
t.Helper()
var buf bytes.Buffer
gzw := gzip.NewWriter(&buf)
tw := tar.NewWriter(gzw)
for name, content := range files {
hdr := &tar.Header{
Name: name,
Mode: 0644,
Size: int64(len(content)),
}
if modifyHeader != nil {
modifyHeader(hdr)
}
if err := tw.WriteHeader(hdr); err != nil {
t.Fatalf("Failed to write tar header for %s: %v", name, err)
}
if _, err := tw.Write([]byte(content)); err != nil {
t.Fatalf("Failed to write tar content for %s: %v", name, err)
}
}
if err := tw.Close(); err != nil {
t.Fatalf("Failed to close tar writer: %v", err)
}
if err := gzw.Close(); err != nil {
t.Fatalf("Failed to close gzip writer: %v", err)
}
return &buf
}
func TestUntarQuadlets_Valid(t *testing.T) {
inputFiles := map[string]string{
"workload.kat": "kind: Workload",
"vlb.kat": "kind: VirtualLoadBalancer",
}
reader := createTestTarGz(t, inputFiles, nil)
outputFiles, err := UntarQuadlets(reader)
if err != nil {
t.Fatalf("UntarQuadlets() error = %v, wantErr %v", err, false)
}
if len(outputFiles) != len(inputFiles) {
t.Errorf("Expected %d files, got %d", len(inputFiles), len(outputFiles))
}
for name, content := range inputFiles {
outContent, ok := outputFiles[name]
if !ok {
t.Errorf("Expected file %s not found in output", name)
}
if string(outContent) != content {
t.Errorf("Content mismatch for %s: got '%s', want '%s'", name, string(outContent), content)
}
}
}
func TestUntarQuadlets_EmptyArchive(t *testing.T) {
reader := createTestTarGz(t, map[string]string{}, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with empty archive did not return an error")
}
if !strings.Contains(err.Error(), "no .kat files found") {
t.Errorf("Expected 'no .kat files found' error, got: %v", err)
}
}
func TestUntarQuadlets_NonKatFile(t *testing.T) {
inputFiles := map[string]string{"config.txt": "some data"}
reader := createTestTarGz(t, inputFiles, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with non-.kat file did not return an error")
}
if !strings.Contains(err.Error(), "only .kat files are allowed") {
t.Errorf("Expected 'only .kat files are allowed' error, got: %v", err)
}
}
func TestUntarQuadlets_FileInSubdirectory(t *testing.T) {
inputFiles := map[string]string{"subdir/workload.kat": "kind: Workload"}
reader := createTestTarGz(t, inputFiles, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with file in subdirectory did not return an error")
}
if !strings.Contains(err.Error(), "subdirectories are not allowed") {
t.Errorf("Expected 'subdirectories are not allowed' error, got: %v", err)
}
}
func TestUntarQuadlets_PathTraversal(t *testing.T) {
inputFiles := map[string]string{"../workload.kat": "kind: Workload"}
reader := createTestTarGz(t, inputFiles, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with path traversal did not return an error")
}
if !strings.Contains(err.Error(), "contains '..'") {
t.Errorf("Expected 'contains ..' error, got: %v", err)
}
}
func TestUntarQuadlets_FileTooLarge(t *testing.T) {
largeContent := strings.Repeat("a", int(maxQuadletFileSize)+1)
inputFiles := map[string]string{"large.kat": largeContent}
reader := createTestTarGz(t, inputFiles, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with large file did not return an error")
}
if !strings.Contains(err.Error(), "file large.kat in tar is too large") {
t.Errorf("Expected 'file ... too large' error, got: %v", err)
}
}
func TestUntarQuadlets_TotalSizeTooLarge(t *testing.T) {
numFiles := (maxTotalQuadletSize / maxQuadletFileSize) * 4
fileSize := maxQuadletFileSize / 2
inputFiles := make(map[string]string)
content := strings.Repeat("a", int(fileSize))
for i := 0; i < int(numFiles); i++ {
inputFiles[filepath.Join(".", "file"+string(rune(i+'0'))+".kat")] = content
}
reader := createTestTarGz(t, inputFiles, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with total large size did not return an error")
}
if !strings.Contains(err.Error(), "total size of files in tar is too large") {
t.Errorf("Expected 'total size ... too large' error, got: %v", err)
}
}
func TestUntarQuadlets_TooManyFiles(t *testing.T) {
inputFiles := make(map[string]string)
for i := 0; i <= maxQuadletFiles; i++ {
inputFiles[filepath.Join(".", "file"+string(rune(i+'a'))+".kat")] = "content"
}
reader := createTestTarGz(t, inputFiles, nil)
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with too many files did not return an error")
}
if !strings.Contains(err.Error(), "too many files in quadlet bundle") {
t.Errorf("Expected 'too many files' error, got: %v", err)
}
}
func TestUntarQuadlets_UnsupportedFileType(t *testing.T) {
reader := createTestTarGz(t, map[string]string{"link.kat": ""}, func(hdr *tar.Header) {
hdr.Typeflag = tar.TypeSymlink
hdr.Linkname = "target.kat"
hdr.Size = 0
})
_, err := UntarQuadlets(reader)
if err == nil {
t.Fatal("UntarQuadlets() with symlink did not return an error")
}
if !strings.Contains(err.Error(), "unsupported file type") {
t.Errorf("Expected 'unsupported file type' error, got: %v", err)
}
}
func TestUntarQuadlets_CorruptedGzip(t *testing.T) {
corruptedInput := bytes.NewBufferString("this is not a valid gzip stream")
_, err := UntarQuadlets(corruptedInput)
if err == nil {
t.Fatal("UntarQuadlets() with corrupted gzip did not return an error")
}
if !strings.Contains(err.Error(), "failed to create gzip reader") && !strings.Contains(err.Error(), "gzip: invalid header") {
t.Errorf("Expected 'gzip format' or 'invalid header' error, got: %v", err)
}
}
func TestUntarQuadlets_CorruptedTar(t *testing.T) {
var buf bytes.Buffer
gzw := gzip.NewWriter(&buf)
_, _ = gzw.Write([]byte("this is not a valid tar stream but inside gzip"))
_ = gzw.Close()
_, err := UntarQuadlets(&buf)
if err == nil {
t.Fatal("UntarQuadlets() with corrupted tar did not return an error")
}
if !strings.Contains(err.Error(), "tar") {
t.Errorf("Expected error related to 'tar' format, got: %v", err)
}
}

47
scripts/gen-proto.sh Executable file
View File

@ -0,0 +1,47 @@
#!/bin/bash
# File: scripts/gen-proto.sh
set -xe
# Find protoc-gen-go
PROTOC_GEN_GO_PATH=""
if command -v protoc-gen-go &> /dev/null; then
PROTOC_GEN_GO_PATH=$(command -v protoc-gen-go)
elif [ -f "$(go env GOBIN)/protoc-gen-go" ]; then
PROTOC_GEN_GO_PATH="$(go env GOBIN)/protoc-gen-go"
elif [ -f "$(go env GOPATH)/bin/protoc-gen-go" ]; then
PROTOC_GEN_GO_PATH="$(go env GOPATH)/bin/protoc-gen-go"
else
echo "protoc-gen-go not found. Please run:"
echo "go install google.golang.org/protobuf/cmd/protoc-gen-go"
echo "And ensure GOBIN or GOPATH/bin is in your PATH."
exit 1
fi
# Project root assumed to be parent of 'scripts' directory
PROJECT_ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )/.." && pwd )"
API_DIR="${PROJECT_ROOT}/api/v1alpha1"
# Output generated code directly into the api/v1alpha1 directory, alongside kat.proto
# This is a common pattern and simplifies imports.
# The go_package option in kat.proto already points here.
OUT_DIR="${API_DIR}"
# Ensure output directory exists (it should, it's the same as API_DIR)
mkdir -p "$OUT_DIR"
echo "Generating Go code from Protobuf definitions..."
protoc --proto_path="${API_DIR}" \
--plugin="protoc-gen-go=${PROTOC_GEN_GO_PATH}" \
--go_out="${OUT_DIR}" --go_opt=paths=source_relative \
"${API_DIR}/kat.proto"
echo "Protobuf Go code generated in ${OUT_DIR}"
# Optional: Generate gRPC stubs if/when you add services
# PROTOC_GEN_GO_GRPC_PATH="" # Similar logic to find protoc-gen-go-grpc
# go install google.golang.org/grpc/cmd/protoc-gen-go-grpc
# protoc --proto_path="${API_DIR}" \
# --plugin="protoc-gen-go=${PROTOC_GEN_GO_PATH}" \
# --plugin="protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_PATH}" \
# --go_out="${OUT_DIR}" --go_opt=paths=source_relative \
# --go-grpc_out="${OUT_DIR}" --go-grpc_opt=paths=source_relative \
# "${API_DIR}/kat.proto"