kat/internal/store/etcd_test.go
2025-05-16 22:13:42 -04:00

396 lines
10 KiB
Go

package store
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestEtcdStore tests the basic operations of the EtcdStore implementation
// This is an integration test that requires starting an embedded etcd server
func TestEtcdStore(t *testing.T) {
// Create a temporary directory for etcd data
tempDir, err := os.MkdirTemp("", "etcd-test-*")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
// Configure and start embedded etcd
etcdConfig := EtcdEmbedConfig{
Name: "test-node",
DataDir: tempDir,
ClientURLs: []string{"http://localhost:0"}, // Use port 0 to get a random available port
PeerURLs: []string{"http://localhost:0"},
}
etcdServer, err := StartEmbeddedEtcd(etcdConfig)
require.NoError(t, err)
// Use a cleanup function instead of defer to avoid double-close
var once sync.Once
t.Cleanup(func() {
once.Do(func() {
if etcdServer != nil {
// Wrap in a recover to handle potential "close of closed channel" panic
func() {
defer func() {
if r := recover(); r != nil {
// Log the panic but continue - the server was likely already closed
t.Logf("Recovered from panic while closing etcd server: %v", r)
}
}()
etcdServer.Close()
}()
}
})
})
// Get the actual client URL that was assigned
clientURL := etcdServer.Clients[0].Addr().String()
// Create the store
store, err := NewEtcdStore([]string{clientURL}, etcdServer)
require.NoError(t, err)
defer store.Close()
// Test context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Test Put and Get
t.Run("PutAndGet", func(t *testing.T) {
key := "/test/key1"
value := []byte("test-value-1")
err := store.Put(ctx, key, value)
require.NoError(t, err)
kv, err := store.Get(ctx, key)
require.NoError(t, err)
assert.Equal(t, key, kv.Key)
assert.Equal(t, value, kv.Value)
assert.Greater(t, kv.Version, int64(0))
})
// Test List
t.Run("List", func(t *testing.T) {
// Put multiple keys with same prefix
prefix := "/test/list/"
for i := 0; i < 3; i++ {
key := fmt.Sprintf("%s%d", prefix, i)
value := []byte(fmt.Sprintf("value-%d", i))
err := store.Put(ctx, key, value)
require.NoError(t, err)
}
// List keys with prefix
kvs, err := store.List(ctx, prefix)
require.NoError(t, err)
assert.Len(t, kvs, 3)
// Verify each key starts with prefix
for _, kv := range kvs {
assert.True(t, len(kv.Key) > len(prefix))
assert.Equal(t, prefix, kv.Key[:len(prefix)])
}
})
// Test Delete
t.Run("Delete", func(t *testing.T) {
key := "/test/delete-key"
value := []byte("delete-me")
// Put a key
err := store.Put(ctx, key, value)
require.NoError(t, err)
// Verify it exists
_, err = store.Get(ctx, key)
require.NoError(t, err)
// Delete it
err = store.Delete(ctx, key)
require.NoError(t, err)
// Verify it's gone
_, err = store.Get(ctx, key)
require.Error(t, err)
})
// Test Watch
t.Run("Watch", func(t *testing.T) {
prefix := "/test/watch/"
key := prefix + "key1"
// Start watching before any changes
watchCh, err := store.Watch(ctx, prefix, 0)
require.NoError(t, err)
// Make changes in a goroutine
go func() {
time.Sleep(100 * time.Millisecond)
store.Put(ctx, key, []byte("watch-value-1"))
time.Sleep(100 * time.Millisecond)
store.Put(ctx, key, []byte("watch-value-2"))
time.Sleep(100 * time.Millisecond)
store.Delete(ctx, key)
}()
// Collect events
var events []WatchEvent
timeout := time.After(2 * time.Second)
eventLoop:
for {
select {
case event, ok := <-watchCh:
if !ok {
break eventLoop
}
events = append(events, event)
if len(events) >= 3 {
break eventLoop
}
case <-timeout:
t.Fatal("Timed out waiting for watch events")
break eventLoop
}
}
// Verify events
require.Len(t, events, 3)
// First event: Put watch-value-1
assert.Equal(t, EventTypePut, events[0].Type)
assert.Equal(t, key, events[0].KV.Key)
assert.Equal(t, []byte("watch-value-1"), events[0].KV.Value)
// Second event: Put watch-value-2
assert.Equal(t, EventTypePut, events[1].Type)
assert.Equal(t, key, events[1].KV.Key)
assert.Equal(t, []byte("watch-value-2"), events[1].KV.Value)
// Third event: Delete
assert.Equal(t, EventTypeDelete, events[2].Type)
assert.Equal(t, key, events[2].KV.Key)
})
// Test DoTransaction
t.Run("DoTransaction", func(t *testing.T) {
key1 := "/test/txn/key1"
key2 := "/test/txn/key2"
// Put key1 first
err := store.Put(ctx, key1, []byte("txn-value-1"))
require.NoError(t, err)
// Get key1 to get its version
kv, err := store.Get(ctx, key1)
require.NoError(t, err)
version := kv.Version
// Transaction: If key1 has expected version, put key2
checks := []Compare{
{Key: key1, ExpectedVersion: version},
}
onSuccess := []Op{
{Type: OpPut, Key: key2, Value: []byte("txn-value-2")},
}
onFailure := []Op{} // Empty for this test
committed, err := store.DoTransaction(ctx, checks, onSuccess, onFailure)
require.NoError(t, err)
assert.True(t, committed)
// Verify key2 was created
kv2, err := store.Get(ctx, key2)
require.NoError(t, err)
assert.Equal(t, []byte("txn-value-2"), kv2.Value)
// Now try a transaction that should fail
checks = []Compare{
{Key: key1, ExpectedVersion: version + 100}, // Wrong version
}
committed, err = store.DoTransaction(ctx, checks, onSuccess, onFailure)
require.NoError(t, err)
assert.False(t, committed)
})
}
// TestLeaderElection tests the Campaign, Resign, and GetLeader methods
func TestLeaderElection(t *testing.T) {
// Create a temporary directory for etcd data
tempDir, err := os.MkdirTemp("", "etcd-election-test-*")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
// Configure and start embedded etcd
etcdConfig := EtcdEmbedConfig{
Name: "election-test-node",
DataDir: tempDir,
ClientURLs: []string{"http://localhost:0"},
PeerURLs: []string{"http://localhost:0"},
}
etcdServer, err := StartEmbeddedEtcd(etcdConfig)
require.NoError(t, err)
// Use a cleanup function instead of defer to avoid double-close
var once sync.Once
t.Cleanup(func() {
once.Do(func() {
if etcdServer != nil {
// Wrap in a recover to handle potential "close of closed channel" panic
func() {
defer func() {
if r := recover(); r != nil {
// Log the panic but continue - the server was likely already closed
t.Logf("Recovered from panic while closing etcd server: %v", r)
}
}()
etcdServer.Close()
}()
}
})
})
// Get the actual client URL that was assigned
clientURL := etcdServer.Clients[0].Addr().String()
// Create the store
store, err := NewEtcdStore([]string{clientURL}, etcdServer)
require.NoError(t, err)
defer store.Close()
// Test context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Test Campaign and GetLeader
t.Run("CampaignAndGetLeader", func(t *testing.T) {
leaderID := "test-leader-" + uuid.New().String()[:8]
// Campaign for leadership
leadershipCtx, err := store.Campaign(ctx, leaderID, 5)
require.NoError(t, err)
require.NotNil(t, leadershipCtx)
// Wait a moment for leadership to be established
time.Sleep(100 * time.Millisecond)
// Verify we are the leader
currentLeader, err := store.GetLeader(ctx)
require.NoError(t, err)
assert.Equal(t, leaderID, currentLeader)
// Resign leadership
err = store.Resign(ctx)
require.NoError(t, err)
// Wait a moment for resignation to take effect
time.Sleep(500 * time.Millisecond)
// Verify leadership context is cancelled
select {
case <-leadershipCtx.Done():
// Expected
default:
t.Fatal("Leadership context should be cancelled after resign")
}
// Verify no leader or different leader
currentLeader, err = store.GetLeader(ctx)
require.NoError(t, err)
assert.NotEqual(t, leaderID, currentLeader, "Should not still be leader after resigning")
})
// Test multiple candidates
t.Run("MultipleLeaderCandidates", func(t *testing.T) {
// Create a second store client
store2, err := NewEtcdStore([]string{clientURL}, nil) // No embedded server for this one
require.NoError(t, err)
defer store2.Close()
leaderID1 := "leader1-" + uuid.New().String()[:8]
leaderID2 := "leader2-" + uuid.New().String()[:8]
// First store campaigns
leadershipCtx1, err := store.Campaign(ctx, leaderID1, 5)
require.NoError(t, err)
// Wait a moment for leadership to be established
time.Sleep(100 * time.Millisecond)
// Verify first store is leader
currentLeader, err := store.GetLeader(ctx)
require.NoError(t, err)
assert.Equal(t, leaderID1, currentLeader)
// Second store campaigns but shouldn't become leader yet
leadershipCtx2, err := store2.Campaign(ctx, leaderID2, 5)
require.NoError(t, err)
// Wait a moment to ensure leadership state is stable
time.Sleep(100 * time.Millisecond)
// Verify first store is still leader
currentLeader, err = store.GetLeader(ctx)
require.NoError(t, err)
assert.Equal(t, leaderID1, currentLeader)
// First store resigns
err = store.Resign(ctx)
require.NoError(t, err)
// Wait for second store to become leader
deadline := time.Now().Add(3 * time.Second)
var leaderFound bool
for time.Now().Before(deadline) {
currentLeader, err = store2.GetLeader(ctx)
if err == nil && currentLeader == leaderID2 {
leaderFound = true
break
}
time.Sleep(100 * time.Millisecond)
}
// Verify second store is now leader
assert.True(t, leaderFound, "Second candidate should have become leader")
assert.Equal(t, leaderID2, currentLeader)
// Verify first leadership context is cancelled
select {
case <-leadershipCtx1.Done():
// Expected
default:
t.Fatal("First leadership context should be cancelled after resign")
}
// Second store resigns
err = store2.Resign(ctx)
require.NoError(t, err)
// Verify second leadership context is cancelled
select {
case <-leadershipCtx2.Done():
// Expected
default:
t.Fatal("Second leadership context should be cancelled after resign")
}
})
}
// TestEtcdStoreWithMockEmbeddedEtcd tests the EtcdStore with a mock embedded etcd
// This is a unit test that doesn't require starting a real etcd server
func TestEtcdStoreWithMockEmbeddedEtcd(t *testing.T) {
// This test would use mocks to test the EtcdStore without starting a real etcd server
// For brevity, we'll skip the implementation of this test
t.Skip("Mock-based unit test not implemented")
}