stream.go
stream.go - Overview
This file defines the Stream
struct and associated methods for efficiently iterating over a snapshot of a Badger database, batching key-value pairs, and sending them to a user-defined Send
function. It provides a framework for concurrent iteration over key ranges, with options to filter keys and customize the output format.
Detailed Documentation
batchSize
const batchSize = 16 << 20 // 16 MB
Defines the size of the intermediate batches in bytes during streaming.
maxStreamSize
var maxStreamSize = uint64(100 << 20) // 100MB
Maximum allowed size of a stream batch.
Stream
type Stream struct {
Prefix []byte
NumGo int
LogPrefix string
ChooseKey func(item *Item) bool
MaxSize uint64
KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
Send func(buf *z.Buffer) error
SinceTs uint64
readTs uint64
db *DB
rangeCh chan keyRange
kvChan chan *z.Buffer
nextStreamId atomic.Uint32
doneMarkers bool
scanned atomic.Uint64
numProducers atomic.Int32
}
The Stream
struct provides a framework to concurrently iterate over a snapshot of Badger, pick up key-values, batch them up and call Send
.
Prefix
: Prefix to only iterate over certain range of keys. If set to nil (default), Stream would iterate over the entire DB.NumGo
: Number of goroutines to use for iterating over key ranges. Defaults to 8.LogPrefix
: Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can be used to help differentiate them from other activities. Default is "Badger.Stream".ChooseKey
: Invoked each time a new key is encountered. Can be left nil to select all keys.MaxSize
: The maximum allowed size of a stream batch.KeyToList
: Similar to ChooseKey, is only invoked on the highest version of the value. Can be left nil to use ToList function by default.Send
: This is the method where Stream sends the final output.SinceTs
: Read data above the sinceTs. All keys with version =< sinceTs will be ignored.readTs
: Timestamp for reading data.db
: A pointer to the Badger DB instance.rangeCh
: Channel for distributing key ranges to worker goroutines.kvChan
: Channel for sending batched key-value lists.nextStreamId
: Atomic counter for generating unique stream IDs.doneMarkers
: Boolean to determine if done markers are sent on the stream.scanned
: Atomic counter for tracking the number of scanned keys.numProducers
: Atomic counter for tracking the number of active producer goroutines.
Stream.SendDoneMarkers(done bool)
func (st *Stream) SendDoneMarkers(done bool) {
st.doneMarkers = done
}
Sets whether to send done markers on the stream.
done
: A boolean indicating whether to send done markers.- Returns: None.
Stream.ToList(key []byte, itr *Iterator) (*pb.KVList, error)
func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
// function body
}
ToList
is a default implementation of KeyToList
. It picks up all valid versions of the key, skipping over deleted or expired keys.
key
: The key to retrieve versions for.itr
: The iterator to use for traversing versions.- Returns: A
*pb.KVList
containing the valid key-value versions, or an error if any.
Stream.produceRanges(ctx context.Context)
func (st *Stream) produceRanges(ctx context.Context) {
// function body
}
produceRanges
picks up key ranges from the SSTables and sends them to rangeCh
.
ctx
: The context for cancellation.- Returns: None.
Stream.produceKVs(ctx context.Context, threadId int) error
func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
// function body
}
produceKVs
picks up ranges from rangeCh
, generates KV lists, and sends them to kvChan
.
ctx
: The context for cancellation.threadId
: The ID of the current thread.- Returns: An error, if any.
Stream.streamKVs(ctx context.Context) error
func (st *Stream) streamKVs(ctx context.Context) error {
// function body
}
streamKVs
picks up KV lists from kvChan
, batches them up further, and sends them to Output.Send
.
ctx
: The context for cancellation.- Returns: An error, if any.
Stream.Orchestrate(ctx context.Context) error
func (st *Stream) Orchestrate(ctx context.Context) error {
// function body
}
Orchestrate
runs Stream. It picks up ranges from the SSTables, then runs NumGo
number of goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single goroutine to pick these lists, batch them up further and send to Output.Send
.
ctx
: The context for cancellation.- Returns: An error, if any.
DB.newStream() *Stream
func (db *DB) newStream() *Stream {
return &Stream{
db: db,
NumGo: db.opt.NumGoroutines,
LogPrefix: "Badger.Stream",
MaxSize: maxStreamSize,
}
}
Creates a new Stream
instance with default values.
- Returns: A pointer to a new
Stream
instance.
DB.NewStream() *Stream
func (db *DB) NewStream() *Stream {
if db.opt.managedTxns {
panic("This API can not be called in managed mode.")
}
return db.newStream()
}
Creates a new Stream
.
- Returns: A pointer to a new
Stream
instance.
DB.NewStreamAt(readTs uint64) *Stream
func (db *DB) NewStreamAt(readTs uint64) *Stream {
if !db.opt.managedTxns {
panic("This API can only be called in managed mode.")
}
stream := db.newStream()
stream.readTs = readTs
return stream
}
Creates a new Stream
at a particular timestamp. Should only be used with managed DB.
readTs
: The timestamp to read data at.- Returns: A pointer to a new
Stream
instance.
BufferToKVList(buf *z.Buffer) (*pb.KVList, error)
func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
// function body
}
Converts a buffer to a KVList.
buf
: The buffer to convert.- Returns: A
*pb.KVList
and an error, if any.
KVToBuffer(kv *pb.KV, buf *z.Buffer)
func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
// function body
}
Appends a KV to a buffer.
kv
: The KV to append.buf
: The buffer to append to.- Returns: None.