batch.go
batch.go - Overview
This file defines the WriteBatch
struct and its associated methods, which provide a way to efficiently perform batched writes to a Badger database. It allows batching multiple writes into a single transaction, using callbacks for asynchronous commit handling, and managing transaction size limits.
Detailed Documentation
WriteBatch
type WriteBatch struct {
sync.Mutex
txn *Txn
db *DB
throttle *y.Throttle
err atomic.Value
isManaged bool
commitTs uint64
finished bool
}
- Purpose: Holds the necessary information to perform batched writes.
- Fields:
sync.Mutex
: Protects concurrent access to theWriteBatch
instance.txn
: A pointer to the transaction used for the batch.db
: A pointer to the Badger database.throttle
: A pointer to a throttle used to limit the number of pending transactions.err
: An atomic value to store any errors encountered during the batch.isManaged
: A boolean indicating if the transaction is managed.commitTs
: Auint64
representing the commit timestamp.finished
: A boolean indicating if the batch has been finished (either flushed or canceled).
NewWriteBatch
func (db *DB) NewWriteBatch() *WriteBatch {
if db.opt.managedTxns {
panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
}
return db.newWriteBatch(false)
}
- Purpose: Creates a new
WriteBatch
instance. Panics if used in managed mode. - Parameters: None
- Returns: A pointer to the newly created
WriteBatch
.
newWriteBatch
func (db *DB) newWriteBatch(isManaged bool) *WriteBatch {
return &WriteBatch{
db: db,
isManaged: isManaged,
txn: db.newTransaction(true, isManaged),
throttle: y.NewThrottle(16),
}
}
- Purpose: Internal helper to create a new
WriteBatch
instance. - Parameters:
isManaged
: A boolean indicating if the transaction is managed.
- Returns: A pointer to the newly created
WriteBatch
.
SetMaxPendingTxns
func (wb *WriteBatch) SetMaxPendingTxns(max int) {
wb.throttle = y.NewThrottle(max)
}
- Purpose: Sets a limit on the maximum number of pending transactions while writing batches.
- Parameters:
max
: An integer representing the maximum number of pending transactions.
- Returns: None
Cancel
func (wb *WriteBatch) Cancel() {
wb.Lock()
defer wb.Unlock()
wb.finished = true
if err := wb.throttle.Finish(); err != nil {
wb.db.opt.Errorf("WatchBatch.Cancel error while finishing: %v", err)
}
wb.txn.Discard()
}
- Purpose: Cancels the
WriteBatch
, discarding the underlying transaction. It must be called ifFlush
is not called, to avoid memory leaks. - Parameters: None
- Returns: None
callback
func (wb *WriteBatch) callback(err error) {
defer wb.throttle.Done(err)
if err == nil {
return
}
if err := wb.Error(); err != nil {
return
}
wb.err.Store(err)
}
- Purpose: A callback function that is called after a transaction is committed. It handles errors and updates the
WriteBatch
's error status. - Parameters:
err
: An error that occurred during the transaction commit.
- Returns: None
writeKV
func (wb *WriteBatch) writeKV(kv *pb.KV) error {
e := Entry{Key: kv.Key, Value: kv.Value}
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]
}
y.AssertTrue(kv.Version != 0)
e.version = kv.Version
return wb.handleEntry(&e)
}
- Purpose: Writes a key-value pair to the batch.
- Parameters:
kv
: A pointer to apb.KV
struct containing the key-value pair.
- Returns: An error, if any.
Write
func (wb *WriteBatch) Write(buf *z.Buffer) error {
wb.Lock()
defer wb.Unlock()
err := buf.SliceIterate(func(s []byte) error {
kv := &pb.KV{}
if err := proto.Unmarshal(s, kv); err != nil {
return err
}
return wb.writeKV(kv)
})
return err
}
- Purpose: Writes a buffer of serialized KVs to the batch.
- Parameters:
buf
: A pointer to az.Buffer
containing the serialized key-value pairs.
- Returns: An error, if any.
WriteList
func (wb *WriteBatch) WriteList(kvList *pb.KVList) error {
wb.Lock()
defer wb.Unlock()
for _, kv := range kvList.Kv {
if err := wb.writeKV(kv); err != nil {
return err
}
}
return nil
}
- Purpose: Writes a list of key-value pairs to the batch.
- Parameters:
kvList
: A pointer to apb.KVList
struct containing the list of key-value pairs.
- Returns: An error, if any.
SetEntryAt
func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error {
if !wb.db.opt.managedTxns {
return errors.New("SetEntryAt can only be used in managed mode. Use SetEntry instead")
}
e.version = ts
return wb.SetEntry(e)
}
- Purpose: Sets an entry with a specific version (timestamp). Can only be used in managed mode.
- Parameters:
e
: A pointer to theEntry
struct to be set.ts
: Auint64
representing the version (timestamp) of the entry.
- Returns: An error, if any.
handleEntry
func (wb *WriteBatch) handleEntry(e *Entry) error {
if err := wb.txn.SetEntry(e); err != ErrTxnTooBig {
return err
}
// Txn has reached it's zenith. Commit now.
if cerr := wb.commit(); cerr != nil {
return cerr
}
// This time the error must not be ErrTxnTooBig, otherwise, we make the
// error permanent.
if err := wb.txn.SetEntry(e); err != nil {
wb.err.Store(err)
return err
}
return nil
}
- Purpose: Handles adding an entry to the batch. If the transaction becomes too large, it commits the current transaction and starts a new one.
- Parameters:
e
: A pointer to theEntry
struct to be added.
- Returns: An error, if any.
SetEntry
func (wb *WriteBatch) SetEntry(e *Entry) error {
wb.Lock()
defer wb.Unlock()
return wb.handleEntry(e)
}
- Purpose: Sets an entry in the batch.
- Parameters:
e
: A pointer to theEntry
struct to be set.
- Returns: An error, if any.
Set
func (wb *WriteBatch) Set(k, v []byte) error {
e := &Entry{Key: k, Value: v}
return wb.SetEntry(e)
}
- Purpose: Sets a key-value pair in the batch.
- Parameters:
k
: The key as a byte slice.v
: The value as a byte slice.
- Returns: An error, if any.
DeleteAt
func (wb *WriteBatch) DeleteAt(k []byte, ts uint64) error {
e := Entry{Key: k, meta: bitDelete, version: ts}
return wb.SetEntry(&e)
}
- Purpose: Deletes a key at a specific version (timestamp).
- Parameters:
k
: The key to delete as a byte slice.ts
: Auint64
representing the version (timestamp) of the delete.
- Returns: An error, if any.
Delete
func (wb *WriteBatch) Delete(k []byte) error {
wb.Lock()
defer wb.Unlock()
if err := wb.txn.Delete(k); err != ErrTxnTooBig {
return err
}
if err := wb.commit(); err != nil {
return err
}
if err := wb.txn.Delete(k); err != nil {
wb.err.Store(err)
return err
}
return nil
}
- Purpose: Deletes a key from the batch. If the transaction becomes too large, it commits the current transaction and starts a new one.
- Parameters:
k
: The key to delete as a byte slice.
- Returns: An error, if any.
commit
func (wb *WriteBatch) commit() error {
if err := wb.Error(); err != nil {
return err
}
if wb.finished {
return y.ErrCommitAfterFinish
}
if err := wb.throttle.Do(); err != nil {
wb.err.Store(err)
return err
}
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, wb.isManaged)
wb.txn.commitTs = wb.commitTs
return wb.Error()
}
- Purpose: Commits the current transaction.
- Parameters: None
- Returns: An error, if any.
Flush
func (wb *WriteBatch) Flush() error {
wb.Lock()
err := wb.commit()
if err != nil {
wb.Unlock()
return err
}
wb.finished = true
wb.txn.Discard()
wb.Unlock()
if err := wb.throttle.Finish(); err != nil {
if wb.Error() != nil {
return fmt.Errorf("wb.err: %w err: %w", wb.Error(), err)
}
return err
}
return wb.Error()
}
- Purpose: Flushes any pending writes and commits the transaction, ensuring all writes are persisted.
- Parameters: None
- Returns: An error, if any.
Error
func (wb *WriteBatch) Error() error {
err, _ := wb.err.Load().(error)
return err
}
- Purpose: Returns any errors encountered during the batch.
- Parameters: None
- Returns: An error, if any.
Code Examples
None.