package storage import ( "encoding/binary" "fmt" "os" "sync" "git.dws.rip/DWS/onyx/internal/models" ) // OplogWriter handles writing entries to the oplog file type OplogWriter struct { path string file *os.File mu sync.Mutex nextID uint64 isClosed bool } // OpenOplog opens an existing oplog file or creates a new one func OpenOplog(path string) (*OplogWriter, error) { // Open file for append and read file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, fmt.Errorf("failed to open oplog file: %w", err) } writer := &OplogWriter{ path: path, file: file, nextID: 1, } // Calculate next ID by reading existing entries if err := writer.calculateNextID(); err != nil { file.Close() return nil, fmt.Errorf("failed to calculate next ID: %w", err) } return writer, nil } // calculateNextID scans the oplog to determine the next entry ID func (w *OplogWriter) calculateNextID() error { // Seek to the beginning if _, err := w.file.Seek(0, 0); err != nil { return fmt.Errorf("failed to seek to beginning: %w", err) } var maxID uint64 = 0 // Read through all entries to find the max ID for { // Read entry length (4 bytes) var entryLen uint32 err := binary.Read(w.file, binary.LittleEndian, &entryLen) if err != nil { // EOF is expected at the end if err.Error() == "EOF" { break } return fmt.Errorf("failed to read entry length: %w", err) } // Read the entry data entryData := make([]byte, entryLen) n, err := w.file.Read(entryData) if err != nil || n != int(entryLen) { return fmt.Errorf("failed to read entry data: %w", err) } // Deserialize to get the ID entry, err := models.DeserializeOplogEntry(entryData) if err != nil { return fmt.Errorf("failed to deserialize entry: %w", err) } if entry.ID > maxID { maxID = entry.ID } } w.nextID = maxID + 1 // Seek to the end for appending if _, err := w.file.Seek(0, 2); err != nil { return fmt.Errorf("failed to seek to end: %w", err) } return nil } // AppendEntry appends a new entry to the oplog func (w *OplogWriter) AppendEntry(entry *models.OplogEntry) error { w.mu.Lock() defer w.mu.Unlock() if w.isClosed { return fmt.Errorf("oplog writer is closed") } // Assign ID if not set if entry.ID == 0 { entry.ID = w.nextID w.nextID++ } // Serialize the entry data, err := entry.Serialize() if err != nil { return fmt.Errorf("failed to serialize entry: %w", err) } // Write entry length (4 bytes) followed by entry data entryLen := uint32(len(data)) if err := binary.Write(w.file, binary.LittleEndian, entryLen); err != nil { return fmt.Errorf("failed to write entry length: %w", err) } if _, err := w.file.Write(data); err != nil { return fmt.Errorf("failed to write entry data: %w", err) } // Sync to disk for durability if err := w.file.Sync(); err != nil { return fmt.Errorf("failed to sync file: %w", err) } return nil } // GetNextID returns the next entry ID that will be assigned func (w *OplogWriter) GetNextID() uint64 { w.mu.Lock() defer w.mu.Unlock() return w.nextID } // Close closes the oplog file func (w *OplogWriter) Close() error { w.mu.Lock() defer w.mu.Unlock() if w.isClosed { return nil } w.isClosed = true return w.file.Close() } // Flush ensures all buffered data is written to disk func (w *OplogWriter) Flush() error { w.mu.Lock() defer w.mu.Unlock() if w.isClosed { return fmt.Errorf("oplog writer is closed") } return w.file.Sync() }