temp for tree extraction
This commit is contained in:
163
internal/storage/oplog_writer.go
Normal file
163
internal/storage/oplog_writer.go
Normal file
@ -0,0 +1,163 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"git.dws.rip/DWS/onyx/internal/models"
|
||||
)
|
||||
|
||||
// OplogWriter handles writing entries to the oplog file
|
||||
type OplogWriter struct {
|
||||
path string
|
||||
file *os.File
|
||||
mu sync.Mutex
|
||||
nextID uint64
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
// OpenOplog opens an existing oplog file or creates a new one
|
||||
func OpenOplog(path string) (*OplogWriter, error) {
|
||||
// Open file for append and read
|
||||
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open oplog file: %w", err)
|
||||
}
|
||||
|
||||
writer := &OplogWriter{
|
||||
path: path,
|
||||
file: file,
|
||||
nextID: 1,
|
||||
}
|
||||
|
||||
// Calculate next ID by reading existing entries
|
||||
if err := writer.calculateNextID(); err != nil {
|
||||
file.Close()
|
||||
return nil, fmt.Errorf("failed to calculate next ID: %w", err)
|
||||
}
|
||||
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
// calculateNextID scans the oplog to determine the next entry ID
|
||||
func (w *OplogWriter) calculateNextID() error {
|
||||
// Seek to the beginning
|
||||
if _, err := w.file.Seek(0, 0); err != nil {
|
||||
return fmt.Errorf("failed to seek to beginning: %w", err)
|
||||
}
|
||||
|
||||
var maxID uint64 = 0
|
||||
|
||||
// Read through all entries to find the max ID
|
||||
for {
|
||||
// Read entry length (4 bytes)
|
||||
var entryLen uint32
|
||||
err := binary.Read(w.file, binary.LittleEndian, &entryLen)
|
||||
if err != nil {
|
||||
// EOF is expected at the end
|
||||
if err.Error() == "EOF" {
|
||||
break
|
||||
}
|
||||
return fmt.Errorf("failed to read entry length: %w", err)
|
||||
}
|
||||
|
||||
// Read the entry data
|
||||
entryData := make([]byte, entryLen)
|
||||
n, err := w.file.Read(entryData)
|
||||
if err != nil || n != int(entryLen) {
|
||||
return fmt.Errorf("failed to read entry data: %w", err)
|
||||
}
|
||||
|
||||
// Deserialize to get the ID
|
||||
entry, err := models.DeserializeOplogEntry(entryData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to deserialize entry: %w", err)
|
||||
}
|
||||
|
||||
if entry.ID > maxID {
|
||||
maxID = entry.ID
|
||||
}
|
||||
}
|
||||
|
||||
w.nextID = maxID + 1
|
||||
|
||||
// Seek to the end for appending
|
||||
if _, err := w.file.Seek(0, 2); err != nil {
|
||||
return fmt.Errorf("failed to seek to end: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AppendEntry appends a new entry to the oplog
|
||||
func (w *OplogWriter) AppendEntry(entry *models.OplogEntry) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.isClosed {
|
||||
return fmt.Errorf("oplog writer is closed")
|
||||
}
|
||||
|
||||
// Assign ID if not set
|
||||
if entry.ID == 0 {
|
||||
entry.ID = w.nextID
|
||||
w.nextID++
|
||||
}
|
||||
|
||||
// Serialize the entry
|
||||
data, err := entry.Serialize()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to serialize entry: %w", err)
|
||||
}
|
||||
|
||||
// Write entry length (4 bytes) followed by entry data
|
||||
entryLen := uint32(len(data))
|
||||
if err := binary.Write(w.file, binary.LittleEndian, entryLen); err != nil {
|
||||
return fmt.Errorf("failed to write entry length: %w", err)
|
||||
}
|
||||
|
||||
if _, err := w.file.Write(data); err != nil {
|
||||
return fmt.Errorf("failed to write entry data: %w", err)
|
||||
}
|
||||
|
||||
// Sync to disk for durability
|
||||
if err := w.file.Sync(); err != nil {
|
||||
return fmt.Errorf("failed to sync file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetNextID returns the next entry ID that will be assigned
|
||||
func (w *OplogWriter) GetNextID() uint64 {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.nextID
|
||||
}
|
||||
|
||||
// Close closes the oplog file
|
||||
func (w *OplogWriter) Close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.isClosed {
|
||||
return nil
|
||||
}
|
||||
|
||||
w.isClosed = true
|
||||
return w.file.Close()
|
||||
}
|
||||
|
||||
// Flush ensures all buffered data is written to disk
|
||||
func (w *OplogWriter) Flush() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.isClosed {
|
||||
return fmt.Errorf("oplog writer is closed")
|
||||
}
|
||||
|
||||
return w.file.Sync()
|
||||
}
|
Reference in New Issue
Block a user