Skip to main content

stream_writer.go

stream_writer.go - Overview

This file defines the StreamWriter struct and its associated methods for efficiently writing sorted data streams to a BadgerDB instance. It's designed for bootstrapping new databases or replicating data, offering significant performance advantages over standard write methods when data is pre-sorted.

Detailed Documentation

StreamWriter

type StreamWriter struct {
writeLock sync.Mutex
db *DB
done func()
throttle *y.Throttle
maxVersion uint64
writers map[uint32]*sortedWriter
prevLevel int
}
  • Purpose: StreamWriter facilitates writing data streams to a BadgerDB. It is optimized for sorted data and is typically used for bootstrapping or replication.
  • Fields:
    • writeLock: A mutex to ensure thread-safe writes.
    • db: A pointer to the BadgerDB instance.
    • done: A function to be executed upon completion or cancellation, handling cleanup tasks.
    • throttle: A y.Throttle to manage the rate of table creation.
    • maxVersion: The highest version number encountered in the stream.
    • writers: A map of sortedWriter instances, one per stream ID.
    • prevLevel: The previous level of the LSM tree. It's used to determine the target level for new SSTables.

(*DB) NewStreamWriter

func (db *DB) NewStreamWriter() *StreamWriter
  • Purpose: Creates a new StreamWriter instance.
  • Parameters:
    • db: A pointer to the DB instance.
  • Returns: A pointer to the newly created StreamWriter.

(*StreamWriter) Prepare

func (sw *StreamWriter) Prepare() error
  • Purpose: Prepares the StreamWriter for writing by dropping all existing data in the database, stopping compactions, and preventing other writes.
  • Parameters: None
  • Returns: An error if any operation fails.

(*StreamWriter) PrepareIncremental

func (sw *StreamWriter) PrepareIncremental() error
  • Purpose: Prepares the StreamWriter for incremental writing. It stops compactions and ensures that the memtables are empty. Tables are written one level above the current base level.
  • Parameters: None
  • Returns: An error if preparation fails.

(*StreamWriter) Write

func (sw *StreamWriter) Write(buf *z.Buffer) error
  • Purpose: Writes a buffer of key-value pairs to the database. It demultiplexes the writes based on the stream ID in each KV entry.
  • Parameters:
    • buf: A pointer to a z.Buffer containing the data to be written.
  • Returns: An error if any write operation fails.

(*StreamWriter) Flush

func (sw *StreamWriter) Flush() error
  • Purpose: Flushes all remaining data, syncs directories, and updates the oracle with the maximum version.
  • Parameters: None
  • Returns: An error if any operation fails.

(*StreamWriter) Cancel

func (sw *StreamWriter) Cancel()
  • Purpose: Cancels the StreamWriter operation, signaling all goroutines to exit. It doesn't erase any partially written data.
  • Parameters: None
  • Returns: None

sortedWriter

type sortedWriter struct {
db *DB
throttle *y.Throttle
opts table.Options

builder *table.Builder
lastKey []byte
level int
streamID uint32
reqCh chan *request
// Have separate closer for each writer, as it can be closed at any time.
closer *z.Closer
}
  • Purpose: sortedWriter writes a single stream of sorted key-value pairs to a table.
  • Fields:
    • db: A pointer to the DB instance.
    • throttle: A y.Throttle to manage the rate of table creation.
    • opts: Options for building the table.
    • builder: A table.Builder for constructing the SSTable.
    • lastKey: The last key written, used to ensure sorted order.
    • level: The level in the LSM tree where the table will be written.
    • streamID: The ID of the stream being written.
    • reqCh: A channel for receiving write requests.
    • closer: Used to signal and wait for goroutine completion.

(*StreamWriter) newWriter

func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error)
  • Purpose: Creates a new sortedWriter instance for a given stream ID.
  • Parameters:
    • streamID: The ID of the stream.
  • Returns: A pointer to the new sortedWriter and an error, if any.

(*sortedWriter) handleRequests

func (w *sortedWriter) handleRequests()
  • Purpose: Goroutine that handles incoming write requests to sortedWriter.
  • Parameters: None
  • Returns: None

(*sortedWriter) Add

func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error
  • Purpose: Adds a key-value pair to the sortedWriter.
  • Parameters:
    • key: The key to add.
    • vs: The value struct to add.
  • Returns: An error if adding fails (e.g., keys are not in sorted order).

(*sortedWriter) send

func (w *sortedWriter) send(done bool) error
  • Purpose: Sends the current table builder to be created as a table.
  • Parameters:
    • done: Indicates if this is the last send.
  • Returns: An error if sending fails.

(*sortedWriter) Done

func (w *sortedWriter) Done() error
  • Purpose: Completes writing to the sortedWriter.
  • Parameters: None
  • Returns: An error, if any.

(*sortedWriter) createTable

func (w *sortedWriter) createTable(builder *table.Builder) error
  • Purpose: Creates a table from the provided builder.
  • Parameters:
    • builder: The table builder.
  • Returns: An error, if any.

Code Examples

The StreamWriter is typically used as follows:

db, err := badger.Open(options)
if err != nil {
// Handle error
}
defer db.Close()

streamWriter := db.NewStreamWriter()
err = streamWriter.Prepare() // or streamWriter.PrepareIncremental()
if err != nil {
// Handle error
}
defer streamWriter.Cancel()

// Assuming you have a function that produces a stream of pb.KVList messages
for {
buf, err := getNextBuffer()
if err != nil {
// Handle error
}
if buf == nil {
break // End of stream
}
err = streamWriter.Write(buf)
if err != nil {
// Handle error
}
}

err = streamWriter.Flush()
if err != nil {
// Handle error
}

Getting Started Relevance