Skip to main content

stream.go

stream.go - Overview

This file defines the Stream struct and associated methods for efficiently iterating over a snapshot of a Badger database, batching key-value pairs, and sending them to a user-defined Send function. It provides a framework for concurrent iteration over key ranges, with options to filter keys and customize the output format.

Detailed Documentation

batchSize

const batchSize = 16 << 20 // 16 MB

Defines the size of the intermediate batches in bytes during streaming.

maxStreamSize

var maxStreamSize = uint64(100 << 20) // 100MB

Maximum allowed size of a stream batch.

Stream

type Stream struct {
Prefix []byte
NumGo int
LogPrefix string
ChooseKey func(item *Item) bool
MaxSize uint64
KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
Send func(buf *z.Buffer) error
SinceTs uint64
readTs uint64
db *DB
rangeCh chan keyRange
kvChan chan *z.Buffer
nextStreamId atomic.Uint32
doneMarkers bool
scanned atomic.Uint64
numProducers atomic.Int32
}

The Stream struct provides a framework to concurrently iterate over a snapshot of Badger, pick up key-values, batch them up and call Send.

  • Prefix: Prefix to only iterate over certain range of keys. If set to nil (default), Stream would iterate over the entire DB.
  • NumGo: Number of goroutines to use for iterating over key ranges. Defaults to 8.
  • LogPrefix: Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can be used to help differentiate them from other activities. Default is "Badger.Stream".
  • ChooseKey: Invoked each time a new key is encountered. Can be left nil to select all keys.
  • MaxSize: The maximum allowed size of a stream batch.
  • KeyToList: Similar to ChooseKey, is only invoked on the highest version of the value. Can be left nil to use ToList function by default.
  • Send: This is the method where Stream sends the final output.
  • SinceTs: Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
  • readTs: Timestamp for reading data.
  • db: A pointer to the Badger DB instance.
  • rangeCh: Channel for distributing key ranges to worker goroutines.
  • kvChan: Channel for sending batched key-value lists.
  • nextStreamId: Atomic counter for generating unique stream IDs.
  • doneMarkers: Boolean to determine if done markers are sent on the stream.
  • scanned: Atomic counter for tracking the number of scanned keys.
  • numProducers: Atomic counter for tracking the number of active producer goroutines.

Stream.SendDoneMarkers(done bool)

func (st *Stream) SendDoneMarkers(done bool) {
st.doneMarkers = done
}

Sets whether to send done markers on the stream.

  • done: A boolean indicating whether to send done markers.
  • Returns: None.

Stream.ToList(key []byte, itr *Iterator) (*pb.KVList, error)

func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
// function body
}

ToList is a default implementation of KeyToList. It picks up all valid versions of the key, skipping over deleted or expired keys.

  • key: The key to retrieve versions for.
  • itr: The iterator to use for traversing versions.
  • Returns: A *pb.KVList containing the valid key-value versions, or an error if any.

Stream.produceRanges(ctx context.Context)

func (st *Stream) produceRanges(ctx context.Context) {
// function body
}

produceRanges picks up key ranges from the SSTables and sends them to rangeCh.

  • ctx: The context for cancellation.
  • Returns: None.

Stream.produceKVs(ctx context.Context, threadId int) error

func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
// function body
}

produceKVs picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.

  • ctx: The context for cancellation.
  • threadId: The ID of the current thread.
  • Returns: An error, if any.

Stream.streamKVs(ctx context.Context) error

func (st *Stream) streamKVs(ctx context.Context) error {
// function body
}

streamKVs picks up KV lists from kvChan, batches them up further, and sends them to Output.Send.

  • ctx: The context for cancellation.
  • Returns: An error, if any.

Stream.Orchestrate(ctx context.Context) error

func (st *Stream) Orchestrate(ctx context.Context) error {
// function body
}

Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single goroutine to pick these lists, batch them up further and send to Output.Send.

  • ctx: The context for cancellation.
  • Returns: An error, if any.

DB.newStream() *Stream

func (db *DB) newStream() *Stream {
return &Stream{
db: db,
NumGo: db.opt.NumGoroutines,
LogPrefix: "Badger.Stream",
MaxSize: maxStreamSize,
}
}

Creates a new Stream instance with default values.

  • Returns: A pointer to a new Stream instance.

DB.NewStream() *Stream

func (db *DB) NewStream() *Stream {
if db.opt.managedTxns {
panic("This API can not be called in managed mode.")
}
return db.newStream()
}

Creates a new Stream.

  • Returns: A pointer to a new Stream instance.

DB.NewStreamAt(readTs uint64) *Stream

func (db *DB) NewStreamAt(readTs uint64) *Stream {
if !db.opt.managedTxns {
panic("This API can only be called in managed mode.")
}
stream := db.newStream()
stream.readTs = readTs
return stream
}

Creates a new Stream at a particular timestamp. Should only be used with managed DB.

  • readTs: The timestamp to read data at.
  • Returns: A pointer to a new Stream instance.

BufferToKVList(buf *z.Buffer) (*pb.KVList, error)

func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
// function body
}

Converts a buffer to a KVList.

  • buf: The buffer to convert.
  • Returns: A *pb.KVList and an error, if any.

KVToBuffer(kv *pb.KV, buf *z.Buffer)

func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
// function body
}

Appends a KV to a buffer.

  • kv: The KV to append.
  • buf: The buffer to append to.
  • Returns: None.

Getting Started Relevance