Files
onyx-prebootstrap/internal/core/transaction.go
Tanishq Dubey a0f80c5c7d
Some checks failed
CI / Test (pull_request) Failing after 6s
CI / Build (pull_request) Failing after 7s
CI / Lint (pull_request) Failing after 13s
milestone 2 complete
2025-10-10 19:03:31 -04:00

187 lines
5.1 KiB
Go

package core
import (
"fmt"
"path/filepath"
"git.dws.rip/DWS/onyx/internal/models"
"git.dws.rip/DWS/onyx/internal/storage"
)
// Transaction represents a transactional operation with oplog support
type Transaction struct {
repo *OnyxRepository
oplogWriter *storage.OplogWriter
stateCapture *storage.StateCapture
}
// NewTransaction creates a new transaction for the given repository
func NewTransaction(repo *OnyxRepository) (*Transaction, error) {
oplogPath := filepath.Join(repo.GetOnyxPath(), "oplog")
oplogWriter, err := storage.OpenOplog(oplogPath)
if err != nil {
return nil, fmt.Errorf("failed to open oplog: %w", err)
}
stateCapture := storage.NewStateCapture(repo.GetGitRepo())
return &Transaction{
repo: repo,
oplogWriter: oplogWriter,
stateCapture: stateCapture,
}, nil
}
// ExecuteWithTransaction executes a function within a transaction context
// It captures the state before and after the operation and logs it to the oplog
func (t *Transaction) ExecuteWithTransaction(operation, description string, fn func() error) error {
// 1. Capture state_before
stateBefore, err := t.stateCapture.CaptureState()
if err != nil {
return fmt.Errorf("failed to capture state before: %w", err)
}
// 2. Execute the function
err = fn()
if err != nil {
// On error, we don't log to oplog since the operation failed
return fmt.Errorf("operation failed: %w", err)
}
// 3. Capture state_after
stateAfter, err := t.stateCapture.CaptureState()
if err != nil {
// Even if we can't capture the after state, we should try to log what we can
// This is a warning situation rather than a failure
fmt.Printf("Warning: failed to capture state after: %v\n", err)
stateAfter = stateBefore // Use the before state as a fallback
}
// 4. Create oplog entry
entry := models.NewOplogEntry(0, operation, description, stateBefore, stateAfter)
// 5. Write to oplog
err = t.oplogWriter.AppendEntry(entry)
if err != nil {
return fmt.Errorf("failed to write to oplog: %w", err)
}
return nil
}
// Close closes the transaction and releases resources
func (t *Transaction) Close() error {
return t.oplogWriter.Close()
}
// ExecuteWithTransactionAndMetadata executes a function with custom metadata
func (t *Transaction) ExecuteWithTransactionAndMetadata(
operation, description string,
metadata map[string]string,
fn func() error,
) error {
// Capture state_before
stateBefore, err := t.stateCapture.CaptureState()
if err != nil {
return fmt.Errorf("failed to capture state before: %w", err)
}
// Execute the function
err = fn()
if err != nil {
return fmt.Errorf("operation failed: %w", err)
}
// Capture state_after
stateAfter, err := t.stateCapture.CaptureState()
if err != nil {
fmt.Printf("Warning: failed to capture state after: %v\n", err)
stateAfter = stateBefore
}
// Create oplog entry with metadata
entry := models.NewOplogEntry(0, operation, description, stateBefore, stateAfter)
entry.Metadata = metadata
// Write to oplog
err = t.oplogWriter.AppendEntry(entry)
if err != nil {
return fmt.Errorf("failed to write to oplog: %w", err)
}
return nil
}
// Rollback attempts to rollback to a previous state
func (t *Transaction) Rollback(entryID uint64) error {
// Read the oplog entry
oplogPath := filepath.Join(t.repo.GetOnyxPath(), "oplog")
reader := storage.NewOplogReader(oplogPath)
entry, err := reader.ReadEntry(entryID)
if err != nil {
return fmt.Errorf("failed to read entry %d: %w", entryID, err)
}
// Restore the state_before from that entry
if entry.StateBefore == nil {
return fmt.Errorf("entry %d has no state_before to restore", entryID)
}
err = t.stateCapture.RestoreState(entry.StateBefore)
if err != nil {
return fmt.Errorf("failed to restore state: %w", err)
}
// Log the rollback operation
stateAfter, _ := t.stateCapture.CaptureState()
rollbackEntry := models.NewOplogEntry(
0,
"rollback",
fmt.Sprintf("Rolled back to entry %d", entryID),
stateAfter, // The current state becomes the "before"
entry.StateBefore, // The restored state becomes the "after"
)
rollbackEntry.Metadata = map[string]string{
"rollback_to_entry_id": fmt.Sprintf("%d", entryID),
}
err = t.oplogWriter.AppendEntry(rollbackEntry)
if err != nil {
// Don't fail the rollback if we can't log it
fmt.Printf("Warning: failed to log rollback: %v\n", err)
}
return nil
}
// Commit captures the final state and writes to oplog
func (t *Transaction) Commit(operation, description string) error {
// Capture state_after
stateAfter, err := t.stateCapture.CaptureState()
if err != nil {
return fmt.Errorf("failed to capture state: %w", err)
}
// Create oplog entry
entry := models.NewOplogEntry(0, operation, description, nil, stateAfter)
// Write to oplog
if err := t.oplogWriter.AppendEntry(entry); err != nil {
return fmt.Errorf("failed to write to oplog: %w", err)
}
return t.Close()
}
// Helper function to execute a transaction on a repository
func ExecuteWithTransaction(repo *OnyxRepository, operation, description string, fn func() error) error {
txn, err := NewTransaction(repo)
if err != nil {
return err
}
defer txn.Close()
return txn.ExecuteWithTransaction(operation, description, fn)
}