wal_aof.go
wal_aof.go - Overview
This file implements the Append-Only File (AOF) WAL (Write-Ahead Logging) functionality for DiceDB. It handles writing commands to disk, segment rotation, and replaying the log for recovery.
Detailed Documentation
const
Definitions
segmentPrefix
: Defines the prefix for WAL segment files (e.g., "seg-").segmentSuffix
: Defines the suffix for WAL segment files (e.g., ".wal").defaultVersion
: Defines the default version string for WAL entries (e.g., "v0.0.1").RotationModeTime
: Defines the rotation mode based on time.RetentionModeTime
: Defines the retention mode based on time.WALModeUnbuffered
: Defines the unbuffered WAL mode.
AOF
struct
type AOF struct {
logDir string
currentSegmentFile *os.File
walMode string
writeMode string
maxSegmentSize int
maxSegmentCount int
currentSegmentIndex int
oldestSegmentIndex int
byteOffset int
bufferSize int
retentionMode string
recoveryMode string
rotationMode string
lastSequenceNo uint64
bufWriter *bufio.Writer
bufferSyncTicker *time.Ticker
segmentRotationTicker *time.Ticker
segmentRetentionTicker *time.Ticker
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
}
- Purpose: Represents the AOF WAL.
logDir
: The directory where WAL segment files are stored.currentSegmentFile
: The currently active segment file.walMode
: The WAL mode ("unbuffered", etc.).writeMode
: The write mode ("fsync", etc.).maxSegmentSize
: The maximum size of a segment file in bytes.maxSegmentCount
: The maximum number of segment files to keep.currentSegmentIndex
: The index of the current segment file.oldestSegmentIndex
: The index of the oldest segment file.byteOffset
: The current byte offset within the current segment file.bufferSize
: The size of the buffer used for writing to the segment file.retentionMode
: The WAL retention mode.recoveryMode
: The WAL recovery mode.rotationMode
: The WAL rotation mode.lastSequenceNo
: The last sequence number assigned to a WAL entry.bufWriter
: The buffered writer used to write to the segment file.bufferSyncTicker
: Ticker for periodically syncing the buffer to disk.segmentRotationTicker
: Ticker for periodically rotating log segments.segmentRetentionTicker
: Ticker for periodically deleting old log segments.mu
: Mutex to protect concurrent access to the WAL.ctx
: Context for managing goroutines.cancel
: Cancel function for the context.
NewAOFWAL
Function
func NewAOFWAL(directory string) (*AOF, error) {
ctx, cancel := context.WithCancel(context.Background())
return &AOF{
logDir: directory,
walMode: config.Config.WALMode,
bufferSyncTicker: time.NewTicker(time.Duration(config.Config.WALBufferSyncIntervalMillis) * time.Millisecond),
segmentRotationTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRotationTimeSec) * time.Second),
segmentRetentionTicker: time.NewTicker(time.Duration(config.Config.WALMaxSegmentRetentionDurationSec) * time.Second),
writeMode: config.Config.WALWriteMode,
maxSegmentSize: config.Config.WALMaxSegmentSizeMB * 1024 * 1024,
maxSegmentCount: config.Config.WALMaxSegmentCount,
bufferSize: config.Config.WALBufferSizeMB * 1024 * 1024,
retentionMode: config.Config.WALRetentionMode,
recoveryMode: config.Config.WALRecoveryMode,
rotationMode: config.Config.WALRotationMode,
ctx: ctx,
cancel: cancel,
}, nil
}
- Purpose: Creates a new AOF WAL instance.
- Parameters:
directory
: The directory where WAL segment files will be stored.
- Returns: A pointer to the new
AOF
instance and an error, if any.
- Parameters:
AOF.Init
Function
func (wal *AOF) Init(t time.Time) error {
// TODO - Restore existing checkpoints to memory
// Create the directory if it doesn't exist
if err := os.MkdirAll(wal.logDir, 0755); err != nil {
return nil
}
// Get the list of log segment files in the directory
files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*"+segmentSuffix))
if err != nil {
return nil
}
if len(files) > 0 {
slog.Info("Found existing log segments", slog.Any("files", files))
// TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them
}
wal.lastSequenceNo = 0
wal.currentSegmentIndex = 0
wal.oldestSegmentIndex = 0
wal.byteOffset = 0
newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+"0"+segmentSuffix), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
wal.currentSegmentFile = newFile
if _, err := wal.currentSegmentFile.Seek(0, io.SeekEnd); err != nil {
return err
}
wal.bufWriter = bufio.NewWriterSize(wal.currentSegmentFile, wal.bufferSize)
go wal.keepSyncingBuffer()
if wal.rotationMode == RotationModeTime {
go wal.rotateSegmentPeriodically()
}
if wal.retentionMode == RetentionModeTime {
go wal.deleteSegmentPeriodically()
}
return nil
}
- Purpose: Initializes the AOF WAL. Creates the log directory if it doesn't exist, retrieves the list of log segments and sets up the initial segment file. Also starts the ticker goroutines for buffer syncing, segment rotation and segment deletion.
- Parameters:
t
:time.Time
argument, purpose unclear from the code.
- Returns: An error, if any.
- Parameters:
AOF.LogCommand
Function
func (wal *AOF) LogCommand(data []byte) error {
return wal.writeEntry(data)
}
- Purpose: Logs a command to the WAL.
- Parameters:
data
: The byte slice to be logged.
- Returns: An error, if any.
- Parameters:
AOF.writeEntry
Function
func (wal *AOF) writeEntry(data []byte) error {
wal.mu.Lock()
defer wal.mu.Unlock()
wal.lastSequenceNo++
entry := &WALEntry{
Version: defaultVersion,
LogSequenceNumber: wal.lastSequenceNo,
Data: data,
Crc32: crc32.ChecksumIEEE(append(data, byte(wal.lastSequenceNo))),
Timestamp: time.Now().UnixNano(),
}
entrySize := getEntrySize(data)
if err := wal.rotateLogIfNeeded(entrySize); err != nil {
return err
}
wal.byteOffset += entrySize
if err := wal.writeEntryToBuffer(entry); err != nil {
return err
}
// if wal-mode unbuffered immediately sync to disk
if wal.walMode == WALModeUnbuffered {
if err := wal.Sync(); err != nil {
return err
}
}
return nil
}
- Purpose: Writes a WAL entry to the current segment.
- Parameters:
data
: The byte slice to be written as the entry's data.
- Returns: An error, if any.
- Parameters:
AOF.writeEntryToBuffer
Function
func (wal *AOF) writeEntryToBuffer(entry *WALEntry) error {
marshaledEntry := MustMarshal(entry)
size := int32(len(marshaledEntry))
if err := binary.Write(wal.bufWriter, binary.LittleEndian, size); err != nil {
return err
}
_, err := wal.bufWriter.Write(marshaledEntry)
return err
}
- Purpose: Writes a marshaled WAL entry to the buffer.
- Parameters:
entry
: TheWALEntry
to write.
- Returns: An error, if any.
- Parameters:
AOF.rotateLogIfNeeded
Function
func (wal *AOF) rotateLogIfNeeded(entrySize int) error {
if wal.byteOffset+entrySize > wal.maxSegmentSize {
if err := wal.rotateLog(); err != nil {
return err
}
}
return nil
}
- Purpose: Checks if log rotation is needed based on the current byte offset and entry size.
- Parameters:
entrySize
: The size of the entry to be written.
- Returns: An error, if any.
- Parameters:
AOF.rotateLog
Function
func (wal *AOF) rotateLog() error {
if err := wal.Sync(); err != nil {
return err
}
if err := wal.currentSegmentFile.Close(); err != nil {
return err
}
wal.currentSegmentIndex++
if wal.currentSegmentIndex-wal.oldestSegmentIndex+1 > wal.maxSegmentCount {
if err := wal.deleteOldestSegment(); err != nil {
return err
}
wal.oldestSegmentIndex++
}
newFile, err := os.OpenFile(filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.currentSegmentIndex)+segmentSuffix), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatalf("failed opening file: %s", err)
}
wal.byteOffset = 0
wal.currentSegmentFile = newFile
wal.bufWriter = bufio.NewWriter(newFile)
return nil
}
- Purpose: Rotates the log by closing the current segment, creating a new segment, and updating the relevant indices and file pointers.
- Returns: An error, if any.
AOF.deleteOldestSegment
Function
func (wal *AOF) deleteOldestSegment() error {
oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)+segmentSuffix)
// TODO: checkpoint before deleting the file
if err := os.Remove(oldestSegmentFilePath); err != nil {
return err
}
wal.oldestSegmentIndex++
return nil
}
- Purpose: Deletes the oldest segment file.
- Returns: An error, if any.
AOF.Close
Function
func (wal *AOF) Close() error {
wal.cancel()
if err := wal.Sync(); err != nil {
return err
}
return wal.currentSegmentFile.Close()
}
- Purpose: Closes the WAL file after syncing.
- Returns: An error, if any.
AOF.Sync
Function
func (wal *AOF) Sync() error {
if err := wal.bufWriter.Flush(); err != nil {
return err
}
if wal.writeMode == "fsync" {
if err := wal.currentSegmentFile.Sync(); err != nil {
return err
}
}
return nil
}
- Purpose: Flushes the buffer and optionally performs an fsync on the current segment file.
- Returns: An error, if any.
AOF.keepSyncingBuffer
Function
func (wal *AOF) keepSyncingBuffer() {
for {
select {
case <-wal.bufferSyncTicker.C:
wal.mu.Lock()
err := wal.Sync()
wal.mu.Unlock()
if err != nil {
slog.Error("failed to sync buffer", slog.String("error", err.Error()))
}
case <-wal.ctx.Done():
return
}
}
}
- Purpose: Goroutine that periodically syncs the buffer to disk.
AOF.rotateSegmentPeriodically
Function
func (wal *AOF) rotateSegmentPeriodically() {
for {
select {
case <-wal.segmentRotationTicker.C:
wal.mu.Lock()
err := wal.rotateLog()
wal.mu.Unlock()
if err != nil {
slog.Error("failed to rotate segment", slog.String("error", err.Error()))
}
case <-wal.ctx.Done():
return
}
}
}
- Purpose: Goroutine that periodically rotates the log segment.
AOF.deleteSegmentPeriodically
Function
func (wal *AOF) deleteSegmentPeriodically() {
for {
select {
case <-wal.segmentRetentionTicker.C:
wal.mu.Lock()
err := wal.deleteOldestSegment()
wal.mu.Unlock()
if err != nil {
slog.Error("failed to delete segment", slog.String("error", err.Error()))
}
case <-wal.ctx.Done():
return
}
}
}
- Purpose: Goroutine that periodically deletes old log segments.
AOF.segmentFiles
Function
func (wal *AOF) segmentFiles() ([]string, error) {
// Get all segment files matching the pattern
files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*"+segmentSuffix))
if err != nil {
return nil, err
}
// Sort files by numeric suffix
sort.Slice(files, func(i, j int) bool {
parseSuffix := func(name string) int64 {
num, _ := strconv.ParseInt(
strings.TrimPrefix(strings.TrimSuffix(filepath.Base(name), segmentSuffix), segmentPrefix), 10, 64)
return num
}
return parseSuffix(files[i]) < parseSuffix(files[j])
})
return files, nil
}
- Purpose: Returns a sorted list of segment files in the log directory.
- Returns: A slice of strings representing the file paths and an error, if any.
AOF.Replay
Function
func (wal *AOF) Replay(callback func(*WALEntry) error) error {
// Get list of segment files sorted by timestamp
segments, err := wal.segmentFiles()
if err != nil {
return fmt.Errorf("error getting wal-segment files: %w", err)
}
// Process each segment file in order
for _, segment := range segments {
file, err := os.Open(segment)
if err != nil {
return fmt.Errorf("error opening wal-segment file %s: %w", segment, err)
}
reader := bufio.NewReader(file)
for {
// Read entry size
var entrySize int32
if err := binary.Read(reader, binary.LittleEndian, &entrySize); err != nil {
if err == io.EOF {
break
}
file.Close()
return fmt.Errorf("error reading wal entry size: %w", err)
}
// Read entry data
entryData := make([]byte, entrySize)
if _, err := io.ReadFull(reader, entryData); err != nil {
file.Close()
return fmt.Errorf("error reading wal entry data: %w", err)
}
// Unmarshal entry
var entry WALEntry
MustUnmarshal(entryData, &entry)
// Call provided replay function with parsed command
if err := wal.ForEachCommand(&entry, callback); err != nil {
file.Close()
return fmt.Errorf("error replaying command: %w", err)
}
}
file.Close()
}
return nil
}
- Purpose: Replays the WAL by reading each segment file and applying a callback function to each entry.
- Parameters:
callback
: A function to be called for eachWALEntry
during replay.
- Returns: An error, if any.
- Parameters:
AOF.ForEachCommand
Function
func (wal *AOF) ForEachCommand(entry *WALEntry, callback func(*WALEntry) error) error {
// Validate CRC
expectedCRC := crc32.ChecksumIEEE(append(entry.Data, byte(entry.LogSequenceNumber)))
if entry.Crc32 != expectedCRC {
return fmt.Errorf("checksum mismatch for log sequence %d: expected %d, got %d",
entry.LogSequenceNumber, expectedCRC, entry.Crc32)
}
return callback(entry)
}
- Purpose: Verifies the CRC of a WAL entry and then executes a callback function with the entry.
- Parameters:
entry
: A pointer to theWALEntry
to be processed.callback
: A function to be called with theWALEntry
if the CRC is valid.
- Returns: An error, if any.
- Parameters:
Code Examples
None