Implement Milestone 1

This commit is contained in:
2025-10-09 19:19:31 -04:00
parent f7674cc2b0
commit 5e6ae2e429
12 changed files with 1671 additions and 5 deletions

View File

@ -0,0 +1,163 @@
package storage
import (
"encoding/binary"
"fmt"
"os"
"sync"
"git.dws.rip/DWS/onyx/internal/models"
)
// OplogWriter handles writing entries to the oplog file
type OplogWriter struct {
path string
file *os.File
mu sync.Mutex
nextID uint64
isClosed bool
}
// OpenOplog opens an existing oplog file or creates a new one
func OpenOplog(path string) (*OplogWriter, error) {
// Open file for append and read
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open oplog file: %w", err)
}
writer := &OplogWriter{
path: path,
file: file,
nextID: 1,
}
// Calculate next ID by reading existing entries
if err := writer.calculateNextID(); err != nil {
file.Close()
return nil, fmt.Errorf("failed to calculate next ID: %w", err)
}
return writer, nil
}
// calculateNextID scans the oplog to determine the next entry ID
func (w *OplogWriter) calculateNextID() error {
// Seek to the beginning
if _, err := w.file.Seek(0, 0); err != nil {
return fmt.Errorf("failed to seek to beginning: %w", err)
}
var maxID uint64 = 0
// Read through all entries to find the max ID
for {
// Read entry length (4 bytes)
var entryLen uint32
err := binary.Read(w.file, binary.LittleEndian, &entryLen)
if err != nil {
// EOF is expected at the end
if err.Error() == "EOF" {
break
}
return fmt.Errorf("failed to read entry length: %w", err)
}
// Read the entry data
entryData := make([]byte, entryLen)
n, err := w.file.Read(entryData)
if err != nil || n != int(entryLen) {
return fmt.Errorf("failed to read entry data: %w", err)
}
// Deserialize to get the ID
entry, err := models.DeserializeOplogEntry(entryData)
if err != nil {
return fmt.Errorf("failed to deserialize entry: %w", err)
}
if entry.ID > maxID {
maxID = entry.ID
}
}
w.nextID = maxID + 1
// Seek to the end for appending
if _, err := w.file.Seek(0, 2); err != nil {
return fmt.Errorf("failed to seek to end: %w", err)
}
return nil
}
// AppendEntry appends a new entry to the oplog
func (w *OplogWriter) AppendEntry(entry *models.OplogEntry) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.isClosed {
return fmt.Errorf("oplog writer is closed")
}
// Assign ID if not set
if entry.ID == 0 {
entry.ID = w.nextID
w.nextID++
}
// Serialize the entry
data, err := entry.Serialize()
if err != nil {
return fmt.Errorf("failed to serialize entry: %w", err)
}
// Write entry length (4 bytes) followed by entry data
entryLen := uint32(len(data))
if err := binary.Write(w.file, binary.LittleEndian, entryLen); err != nil {
return fmt.Errorf("failed to write entry length: %w", err)
}
if _, err := w.file.Write(data); err != nil {
return fmt.Errorf("failed to write entry data: %w", err)
}
// Sync to disk for durability
if err := w.file.Sync(); err != nil {
return fmt.Errorf("failed to sync file: %w", err)
}
return nil
}
// GetNextID returns the next entry ID that will be assigned
func (w *OplogWriter) GetNextID() uint64 {
w.mu.Lock()
defer w.mu.Unlock()
return w.nextID
}
// Close closes the oplog file
func (w *OplogWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.isClosed {
return nil
}
w.isClosed = true
return w.file.Close()
}
// Flush ensures all buffered data is written to disk
func (w *OplogWriter) Flush() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.isClosed {
return fmt.Errorf("oplog writer is closed")
}
return w.file.Sync()
}