Skip to main content

batch.go

batch.go - Overview

This file defines the WriteBatch struct and its associated methods, which provide a way to efficiently perform batched writes to a Badger database. It allows batching multiple writes into a single transaction, using callbacks for asynchronous commit handling, and managing transaction size limits.

Detailed Documentation

WriteBatch

type WriteBatch struct {
sync.Mutex
txn *Txn
db *DB
throttle *y.Throttle
err atomic.Value

isManaged bool
commitTs uint64
finished bool
}
  • Purpose: Holds the necessary information to perform batched writes.
  • Fields:
    • sync.Mutex: Protects concurrent access to the WriteBatch instance.
    • txn: A pointer to the transaction used for the batch.
    • db: A pointer to the Badger database.
    • throttle: A pointer to a throttle used to limit the number of pending transactions.
    • err: An atomic value to store any errors encountered during the batch.
    • isManaged: A boolean indicating if the transaction is managed.
    • commitTs: A uint64 representing the commit timestamp.
    • finished: A boolean indicating if the batch has been finished (either flushed or canceled).

NewWriteBatch

func (db *DB) NewWriteBatch() *WriteBatch {
if db.opt.managedTxns {
panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
}
return db.newWriteBatch(false)
}
  • Purpose: Creates a new WriteBatch instance. Panics if used in managed mode.
  • Parameters: None
  • Returns: A pointer to the newly created WriteBatch.

newWriteBatch

func (db *DB) newWriteBatch(isManaged bool) *WriteBatch {
return &WriteBatch{
db: db,
isManaged: isManaged,
txn: db.newTransaction(true, isManaged),
throttle: y.NewThrottle(16),
}
}
  • Purpose: Internal helper to create a new WriteBatch instance.
  • Parameters:
    • isManaged: A boolean indicating if the transaction is managed.
  • Returns: A pointer to the newly created WriteBatch.

SetMaxPendingTxns

func (wb *WriteBatch) SetMaxPendingTxns(max int) {
wb.throttle = y.NewThrottle(max)
}
  • Purpose: Sets a limit on the maximum number of pending transactions while writing batches.
  • Parameters:
    • max: An integer representing the maximum number of pending transactions.
  • Returns: None

Cancel

func (wb *WriteBatch) Cancel() {
wb.Lock()
defer wb.Unlock()
wb.finished = true
if err := wb.throttle.Finish(); err != nil {
wb.db.opt.Errorf("WatchBatch.Cancel error while finishing: %v", err)
}
wb.txn.Discard()
}
  • Purpose: Cancels the WriteBatch, discarding the underlying transaction. It must be called if Flush is not called, to avoid memory leaks.
  • Parameters: None
  • Returns: None

callback

func (wb *WriteBatch) callback(err error) {
defer wb.throttle.Done(err)
if err == nil {
return
}
if err := wb.Error(); err != nil {
return
}
wb.err.Store(err)
}
  • Purpose: A callback function that is called after a transaction is committed. It handles errors and updates the WriteBatch's error status.
  • Parameters:
    • err: An error that occurred during the transaction commit.
  • Returns: None

writeKV

func (wb *WriteBatch) writeKV(kv *pb.KV) error {
e := Entry{Key: kv.Key, Value: kv.Value}
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]
}
y.AssertTrue(kv.Version != 0)
e.version = kv.Version
return wb.handleEntry(&e)
}
  • Purpose: Writes a key-value pair to the batch.
  • Parameters:
    • kv: A pointer to a pb.KV struct containing the key-value pair.
  • Returns: An error, if any.

Write

func (wb *WriteBatch) Write(buf *z.Buffer) error {
wb.Lock()
defer wb.Unlock()

err := buf.SliceIterate(func(s []byte) error {
kv := &pb.KV{}
if err := proto.Unmarshal(s, kv); err != nil {
return err
}
return wb.writeKV(kv)
})
return err
}
  • Purpose: Writes a buffer of serialized KVs to the batch.
  • Parameters:
    • buf: A pointer to a z.Buffer containing the serialized key-value pairs.
  • Returns: An error, if any.

WriteList

func (wb *WriteBatch) WriteList(kvList *pb.KVList) error {
wb.Lock()
defer wb.Unlock()
for _, kv := range kvList.Kv {
if err := wb.writeKV(kv); err != nil {
return err
}
}
return nil
}
  • Purpose: Writes a list of key-value pairs to the batch.
  • Parameters:
    • kvList: A pointer to a pb.KVList struct containing the list of key-value pairs.
  • Returns: An error, if any.

SetEntryAt

func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error {
if !wb.db.opt.managedTxns {
return errors.New("SetEntryAt can only be used in managed mode. Use SetEntry instead")
}
e.version = ts
return wb.SetEntry(e)
}
  • Purpose: Sets an entry with a specific version (timestamp). Can only be used in managed mode.
  • Parameters:
    • e: A pointer to the Entry struct to be set.
    • ts: A uint64 representing the version (timestamp) of the entry.
  • Returns: An error, if any.

handleEntry

func (wb *WriteBatch) handleEntry(e *Entry) error {
if err := wb.txn.SetEntry(e); err != ErrTxnTooBig {
return err
}
// Txn has reached it's zenith. Commit now.
if cerr := wb.commit(); cerr != nil {
return cerr
}
// This time the error must not be ErrTxnTooBig, otherwise, we make the
// error permanent.
if err := wb.txn.SetEntry(e); err != nil {
wb.err.Store(err)
return err
}
return nil
}
  • Purpose: Handles adding an entry to the batch. If the transaction becomes too large, it commits the current transaction and starts a new one.
  • Parameters:
    • e: A pointer to the Entry struct to be added.
  • Returns: An error, if any.

SetEntry

func (wb *WriteBatch) SetEntry(e *Entry) error {
wb.Lock()
defer wb.Unlock()
return wb.handleEntry(e)
}
  • Purpose: Sets an entry in the batch.
  • Parameters:
    • e: A pointer to the Entry struct to be set.
  • Returns: An error, if any.

Set

func (wb *WriteBatch) Set(k, v []byte) error {
e := &Entry{Key: k, Value: v}
return wb.SetEntry(e)
}
  • Purpose: Sets a key-value pair in the batch.
  • Parameters:
    • k: The key as a byte slice.
    • v: The value as a byte slice.
  • Returns: An error, if any.

DeleteAt

func (wb *WriteBatch) DeleteAt(k []byte, ts uint64) error {
e := Entry{Key: k, meta: bitDelete, version: ts}
return wb.SetEntry(&e)
}
  • Purpose: Deletes a key at a specific version (timestamp).
  • Parameters:
    • k: The key to delete as a byte slice.
    • ts: A uint64 representing the version (timestamp) of the delete.
  • Returns: An error, if any.

Delete

func (wb *WriteBatch) Delete(k []byte) error {
wb.Lock()
defer wb.Unlock()

if err := wb.txn.Delete(k); err != ErrTxnTooBig {
return err
}
if err := wb.commit(); err != nil {
return err
}
if err := wb.txn.Delete(k); err != nil {
wb.err.Store(err)
return err
}
return nil
}
  • Purpose: Deletes a key from the batch. If the transaction becomes too large, it commits the current transaction and starts a new one.
  • Parameters:
    • k: The key to delete as a byte slice.
  • Returns: An error, if any.

commit

func (wb *WriteBatch) commit() error {
if err := wb.Error(); err != nil {
return err
}
if wb.finished {
return y.ErrCommitAfterFinish
}
if err := wb.throttle.Do(); err != nil {
wb.err.Store(err)
return err
}
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, wb.isManaged)
wb.txn.commitTs = wb.commitTs
return wb.Error()
}
  • Purpose: Commits the current transaction.
  • Parameters: None
  • Returns: An error, if any.

Flush

func (wb *WriteBatch) Flush() error {
wb.Lock()
err := wb.commit()
if err != nil {
wb.Unlock()
return err
}
wb.finished = true
wb.txn.Discard()
wb.Unlock()

if err := wb.throttle.Finish(); err != nil {
if wb.Error() != nil {
return fmt.Errorf("wb.err: %w err: %w", wb.Error(), err)
}
return err
}

return wb.Error()
}
  • Purpose: Flushes any pending writes and commits the transaction, ensuring all writes are persisted.
  • Parameters: None
  • Returns: An error, if any.

Error

func (wb *WriteBatch) Error() error {
err, _ := wb.err.Load().(error)
return err
}
  • Purpose: Returns any errors encountered during the batch.
  • Parameters: None
  • Returns: An error, if any.

Code Examples

None.

Getting Started Relevance