164 lines
3.5 KiB
Go
164 lines
3.5 KiB
Go
package storage
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
|
|
"git.dws.rip/DWS/onyx/internal/models"
|
|
)
|
|
|
|
// OplogWriter handles writing entries to the oplog file
|
|
type OplogWriter struct {
|
|
path string
|
|
file *os.File
|
|
mu sync.Mutex
|
|
nextID uint64
|
|
isClosed bool
|
|
}
|
|
|
|
// OpenOplog opens an existing oplog file or creates a new one
|
|
func OpenOplog(path string) (*OplogWriter, error) {
|
|
// Open file for append and read
|
|
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open oplog file: %w", err)
|
|
}
|
|
|
|
writer := &OplogWriter{
|
|
path: path,
|
|
file: file,
|
|
nextID: 1,
|
|
}
|
|
|
|
// Calculate next ID by reading existing entries
|
|
if err := writer.calculateNextID(); err != nil {
|
|
file.Close()
|
|
return nil, fmt.Errorf("failed to calculate next ID: %w", err)
|
|
}
|
|
|
|
return writer, nil
|
|
}
|
|
|
|
// calculateNextID scans the oplog to determine the next entry ID
|
|
func (w *OplogWriter) calculateNextID() error {
|
|
// Seek to the beginning
|
|
if _, err := w.file.Seek(0, 0); err != nil {
|
|
return fmt.Errorf("failed to seek to beginning: %w", err)
|
|
}
|
|
|
|
var maxID uint64 = 0
|
|
|
|
// Read through all entries to find the max ID
|
|
for {
|
|
// Read entry length (4 bytes)
|
|
var entryLen uint32
|
|
err := binary.Read(w.file, binary.LittleEndian, &entryLen)
|
|
if err != nil {
|
|
// EOF is expected at the end
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return fmt.Errorf("failed to read entry length: %w", err)
|
|
}
|
|
|
|
// Read the entry data
|
|
entryData := make([]byte, entryLen)
|
|
n, err := w.file.Read(entryData)
|
|
if err != nil || n != int(entryLen) {
|
|
return fmt.Errorf("failed to read entry data: %w", err)
|
|
}
|
|
|
|
// Deserialize to get the ID
|
|
entry, err := models.DeserializeOplogEntry(entryData)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to deserialize entry: %w", err)
|
|
}
|
|
|
|
if entry.ID > maxID {
|
|
maxID = entry.ID
|
|
}
|
|
}
|
|
|
|
w.nextID = maxID + 1
|
|
|
|
// Seek to the end for appending
|
|
if _, err := w.file.Seek(0, 2); err != nil {
|
|
return fmt.Errorf("failed to seek to end: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AppendEntry appends a new entry to the oplog
|
|
func (w *OplogWriter) AppendEntry(entry *models.OplogEntry) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.isClosed {
|
|
return fmt.Errorf("oplog writer is closed")
|
|
}
|
|
|
|
// Assign ID if not set
|
|
if entry.ID == 0 {
|
|
entry.ID = w.nextID
|
|
w.nextID++
|
|
}
|
|
|
|
// Serialize the entry
|
|
data, err := entry.Serialize()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to serialize entry: %w", err)
|
|
}
|
|
|
|
// Write entry length (4 bytes) followed by entry data
|
|
entryLen := uint32(len(data))
|
|
if err := binary.Write(w.file, binary.LittleEndian, entryLen); err != nil {
|
|
return fmt.Errorf("failed to write entry length: %w", err)
|
|
}
|
|
|
|
if _, err := w.file.Write(data); err != nil {
|
|
return fmt.Errorf("failed to write entry data: %w", err)
|
|
}
|
|
|
|
// Sync to disk for durability
|
|
if err := w.file.Sync(); err != nil {
|
|
return fmt.Errorf("failed to sync file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetNextID returns the next entry ID that will be assigned
|
|
func (w *OplogWriter) GetNextID() uint64 {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.nextID
|
|
}
|
|
|
|
// Close closes the oplog file
|
|
func (w *OplogWriter) Close() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.isClosed {
|
|
return nil
|
|
}
|
|
|
|
w.isClosed = true
|
|
return w.file.Close()
|
|
}
|
|
|
|
// Flush ensures all buffered data is written to disk
|
|
func (w *OplogWriter) Flush() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.isClosed {
|
|
return fmt.Errorf("oplog writer is closed")
|
|
}
|
|
|
|
return w.file.Sync()
|
|
}
|