stream_writer.go
stream_writer.go - Overview
This file defines the StreamWriter
struct and its associated methods for efficiently writing sorted data streams to a BadgerDB instance. It's designed for bootstrapping new databases or replicating data, offering significant performance advantages over standard write methods when data is pre-sorted.
Detailed Documentation
StreamWriter
type StreamWriter struct {
writeLock sync.Mutex
db *DB
done func()
throttle *y.Throttle
maxVersion uint64
writers map[uint32]*sortedWriter
prevLevel int
}
- Purpose:
StreamWriter
facilitates writing data streams to a BadgerDB. It is optimized for sorted data and is typically used for bootstrapping or replication. - Fields:
writeLock
: A mutex to ensure thread-safe writes.db
: A pointer to the BadgerDB instance.done
: A function to be executed upon completion or cancellation, handling cleanup tasks.throttle
: Ay.Throttle
to manage the rate of table creation.maxVersion
: The highest version number encountered in the stream.writers
: A map ofsortedWriter
instances, one per stream ID.prevLevel
: The previous level of the LSM tree. It's used to determine the target level for new SSTables.
(*DB) NewStreamWriter
func (db *DB) NewStreamWriter() *StreamWriter
- Purpose: Creates a new
StreamWriter
instance. - Parameters:
db
: A pointer to theDB
instance.
- Returns: A pointer to the newly created
StreamWriter
.
(*StreamWriter) Prepare
func (sw *StreamWriter) Prepare() error
- Purpose: Prepares the
StreamWriter
for writing by dropping all existing data in the database, stopping compactions, and preventing other writes. - Parameters: None
- Returns: An error if any operation fails.
(*StreamWriter) PrepareIncremental
func (sw *StreamWriter) PrepareIncremental() error
- Purpose: Prepares the
StreamWriter
for incremental writing. It stops compactions and ensures that the memtables are empty. Tables are written one level above the current base level. - Parameters: None
- Returns: An error if preparation fails.
(*StreamWriter) Write
func (sw *StreamWriter) Write(buf *z.Buffer) error
- Purpose: Writes a buffer of key-value pairs to the database. It demultiplexes the writes based on the stream ID in each
KV
entry. - Parameters:
buf
: A pointer to az.Buffer
containing the data to be written.
- Returns: An error if any write operation fails.
(*StreamWriter) Flush
func (sw *StreamWriter) Flush() error
- Purpose: Flushes all remaining data, syncs directories, and updates the oracle with the maximum version.
- Parameters: None
- Returns: An error if any operation fails.
(*StreamWriter) Cancel
func (sw *StreamWriter) Cancel()
- Purpose: Cancels the
StreamWriter
operation, signaling all goroutines to exit. It doesn't erase any partially written data. - Parameters: None
- Returns: None
sortedWriter
type sortedWriter struct {
db *DB
throttle *y.Throttle
opts table.Options
builder *table.Builder
lastKey []byte
level int
streamID uint32
reqCh chan *request
// Have separate closer for each writer, as it can be closed at any time.
closer *z.Closer
}
- Purpose:
sortedWriter
writes a single stream of sorted key-value pairs to a table. - Fields:
db
: A pointer to theDB
instance.throttle
: Ay.Throttle
to manage the rate of table creation.opts
: Options for building the table.builder
: Atable.Builder
for constructing the SSTable.lastKey
: The last key written, used to ensure sorted order.level
: The level in the LSM tree where the table will be written.streamID
: The ID of the stream being written.reqCh
: A channel for receiving write requests.closer
: Used to signal and wait for goroutine completion.
(*StreamWriter) newWriter
func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error)
- Purpose: Creates a new
sortedWriter
instance for a given stream ID. - Parameters:
streamID
: The ID of the stream.
- Returns: A pointer to the new
sortedWriter
and an error, if any.
(*sortedWriter) handleRequests
func (w *sortedWriter) handleRequests()
- Purpose: Goroutine that handles incoming write requests to
sortedWriter
. - Parameters: None
- Returns: None
(*sortedWriter) Add
func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error
- Purpose: Adds a key-value pair to the
sortedWriter
. - Parameters:
key
: The key to add.vs
: The value struct to add.
- Returns: An error if adding fails (e.g., keys are not in sorted order).
(*sortedWriter) send
func (w *sortedWriter) send(done bool) error
- Purpose: Sends the current table builder to be created as a table.
- Parameters:
done
: Indicates if this is the last send.
- Returns: An error if sending fails.
(*sortedWriter) Done
func (w *sortedWriter) Done() error
- Purpose: Completes writing to the
sortedWriter
. - Parameters: None
- Returns: An error, if any.
(*sortedWriter) createTable
func (w *sortedWriter) createTable(builder *table.Builder) error
- Purpose: Creates a table from the provided builder.
- Parameters:
builder
: The table builder.
- Returns: An error, if any.
Code Examples
The StreamWriter
is typically used as follows:
db, err := badger.Open(options)
if err != nil {
// Handle error
}
defer db.Close()
streamWriter := db.NewStreamWriter()
err = streamWriter.Prepare() // or streamWriter.PrepareIncremental()
if err != nil {
// Handle error
}
defer streamWriter.Cancel()
// Assuming you have a function that produces a stream of pb.KVList messages
for {
buf, err := getNextBuffer()
if err != nil {
// Handle error
}
if buf == nil {
break // End of stream
}
err = streamWriter.Write(buf)
if err != nil {
// Handle error
}
}
err = streamWriter.Flush()
if err != nil {
// Handle error
}