Skip to main content

txn.go

txn.go - Overview

This file defines the transaction management logic for BadgerDB, including the Txn struct, the oracle struct for timestamp management, and related functions for committing, discarding, and handling conflicts.

Detailed Documentation

oracle struct

type oracle struct {
isManaged bool
detectConflicts bool

sync.Mutex
writeChLock sync.Mutex
nextTxnTs uint64

txnMark *y.WaterMark

discardTs uint64
readMark *y.WaterMark

committedTxns []committedTxn
lastCleanupTs uint64

closer *z.Closer
}
  • Purpose: Manages transaction timestamps, conflict detection, and discard timestamps. It ensures that transactions are processed in a consistent order and that conflicts are detected.
    • isManaged: Indicates whether the database is in managed transaction mode.
    • detectConflicts: Indicates whether transactions should be checked for conflicts.
    • sync.Mutex: Mutex for protecting nextTxnTs and commits.
    • writeChLock: Mutex for serializing transactions going to the write channel.
    • nextTxnTs: The next available transaction timestamp.
    • txnMark: Watermark to track transactions.
    • discardTs: Used by ManagedDB to track the oldest timestamp that can be discarded.
    • readMark: Watermark to track read timestamps.
    • committedTxns: Keeps track of the entries written and their commit timestamp.
    • lastCleanupTs: Timestamp of the last cleanup operation.
    • closer: Used to stop watermarks.

committedTxn struct

type committedTxn struct {
ts uint64
conflictKeys map[uint64]struct{}
}
  • Purpose: Represents a committed transaction, storing its timestamp and the keys it wrote (for conflict detection).
    • ts: Timestamp of the committed transaction.
    • conflictKeys: Keeps track of the entries written at timestamp ts (fingerprints).

newOracle func

func newOracle(opt Options) *oracle {
  • Purpose: Creates a new oracle instance.
  • Parameters:
    • opt: Options - Database options.
  • Returns: *oracle - A pointer to the newly created oracle.

oracle.Stop func

func (o *oracle) Stop() {
  • Purpose: Stops the oracle by signaling and waiting on its closer.

oracle.readTs func

func (o *oracle) readTs() uint64 {
  • Purpose: Returns a read timestamp. Blocks until all previous transactions have been committed.
  • Returns: uint64 - The read timestamp.

oracle.nextTs func

func (o *oracle) nextTs() uint64 {
  • Purpose: Returns the next transaction timestamp without incrementing it.
  • Returns: uint64 - The next transaction timestamp.

oracle.incrementNextTs func

func (o *oracle) incrementNextTs() {
  • Purpose: Increments the next transaction timestamp.

oracle.setDiscardTs func

func (o *oracle) setDiscardTs(ts uint64) {
  • Purpose: Sets the discard timestamp.
  • Parameters:
    • ts: uint64 - The discard timestamp to set.

oracle.discardAtOrBelow func

func (o *oracle) discardAtOrBelow() uint64 {
  • Purpose: Returns the discard timestamp.
  • Returns: uint64 - The discard timestamp.

oracle.hasConflict func

func (o *oracle) hasConflict(txn *Txn) bool {
  • Purpose: Checks if the given transaction has a conflict with any committed transaction.
  • Parameters:
    • txn: *Txn - The transaction to check.
  • Returns: bool - True if there is a conflict, false otherwise.

oracle.newCommitTs func

func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
  • Purpose: Generates a new commit timestamp for the given transaction. It also checks for conflicts and updates internal state.
  • Parameters:
    • txn: *Txn - The transaction to commit.
  • Returns:
    • uint64 - The commit timestamp.
    • bool - True if there was a conflict, false otherwise.

oracle.doneRead func

func (o *oracle) doneRead(txn *Txn) {
  • Purpose: Marks the read timestamp as done.
  • Parameters:
    • txn: *Txn - The transaction.

oracle.cleanupCommittedTransactions func

func (o *oracle) cleanupCommittedTransactions() {
  • Purpose: Cleans up the committed transactions slice by removing transactions whose timestamps are below the maximum read timestamp.

oracle.doneCommit func

func (o *oracle) doneCommit(cts uint64) {
  • Purpose: Marks a commit timestamp as done.
  • Parameters:
    • cts: uint64 - The commit timestamp.

Txn struct

type Txn struct {
readTs uint64
commitTs uint64
size int64
count int64
db *DB

reads []uint64
conflictKeys map[uint64]struct{}
readsLock sync.Mutex

pendingWrites map[string]*Entry
duplicateWrites []*Entry

numIterators atomic.Int32
discarded bool
doneRead bool
update bool
}
  • Purpose: Represents a Badger transaction.
    • readTs: The read timestamp of the transaction.
    • commitTs: The commit timestamp of the transaction.
    • size: The size of the transaction.
    • count: The number of entries in the transaction.
    • db: A pointer to the DB instance.
    • reads: Contains fingerprints of keys read.
    • conflictKeys: Contains fingerprints of keys written, used for conflict detection.
    • readsLock: Guards the reads slice.
    • pendingWrites: Cache stores any writes done by txn.
    • duplicateWrites: Used in managed mode to store duplicate entries.
    • numIterators: Atomic counter for the number of iterators.
    • discarded: Indicates if the transaction has been discarded.
    • doneRead: Indicates if the read timestamp has been marked as done.
    • update: Indicates whether the transaction is for update (read-write).

pendingWritesIterator struct

type pendingWritesIterator struct {
entries []*Entry
nextIdx int
readTs uint64
reversed bool
}
  • Purpose: Iterator for pending writes in a transaction.
    • entries: Slice of entries.
    • nextIdx: Index of the next entry.
    • readTs: Read timestamp.
    • reversed: Indicates if the iterator is reversed.

pendingWritesIterator.Next func

func (pi *pendingWritesIterator) Next() {
  • Purpose: Advances the iterator to the next entry.

pendingWritesIterator.Rewind func

func (pi *pendingWritesIterator) Rewind() {
  • Purpose: Resets the iterator to the beginning.

pendingWritesIterator.Seek func

func (pi *pendingWritesIterator) Seek(key []byte) {
  • Purpose: Seeks to the entry with the given key.
  • Parameters:
    • key: []byte - The key to seek to.

pendingWritesIterator.Key func

func (pi *pendingWritesIterator) Key() []byte {
  • Purpose: Returns the key of the current entry.
  • Returns: []byte - The key of the current entry.

pendingWritesIterator.Value func

func (pi *pendingWritesIterator) Value() y.ValueStruct {
  • Purpose: Returns the value struct of the current entry.
  • Returns: y.ValueStruct - The value struct of the current entry.

pendingWritesIterator.Valid func

func (pi *pendingWritesIterator) Valid() bool {
  • Purpose: Returns whether the iterator is valid.
  • Returns: bool - True if the iterator is valid, false otherwise.

pendingWritesIterator.Close func

func (pi *pendingWritesIterator) Close() error {
  • Purpose: Closes the iterator.
  • Returns: error - Always nil.

Txn.newPendingWritesIterator func

func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
  • Purpose: Creates a new iterator for the pending writes.
  • Parameters:
    • reversed: bool - Whether the iterator should be reversed.
  • Returns: *pendingWritesIterator - A pointer to the new iterator.

Txn.checkSize func

func (txn *Txn) checkSize(e *Entry) error {
  • Purpose: Checks if adding the entry would exceed the transaction size limits.
  • Parameters:
    • e: *Entry - The entry to check.
  • Returns: error - ErrTxnTooBig if the transaction would be too big, nil otherwise.

exceedsSize func

func exceedsSize(prefix string, max int64, key []byte) error {
  • Purpose: Returns an error indicating that a size limit has been exceeded.
  • Parameters:
    • prefix: string - A prefix for the error message.
    • max: int64 - The maximum allowed size.
    • key: []byte - The key that exceeded the size.
  • Returns: error - An error indicating that the size limit has been exceeded.

Txn.modify func

func (txn *Txn) modify(e *Entry) error {
  • Purpose: Modifies the transaction by adding, updating, or deleting an entry.
  • Parameters:
    • e: *Entry - The entry to modify.
  • Returns: error - An error if the transaction cannot be modified.

Txn.Set func

func (txn *Txn) Set(key, val []byte) error {
  • Purpose: Adds a key-value pair to the database.
  • Parameters:
    • key: []byte - The key.
    • val: []byte - The value.
  • Returns: error - An error if the key-value pair cannot be set.

Txn.SetEntry func

func (txn *Txn) SetEntry(e *Entry) error {
  • Purpose: Adds an entry to the database.
  • Parameters:
    • e: *Entry - The entry to add.
  • Returns: error - An error if the entry cannot be set.

Txn.Delete func

func (txn *Txn) Delete(key []byte) error {
  • Purpose: Deletes a key from the database.
  • Parameters:
    • key: []byte - The key to delete.
  • Returns: error - An error if the key cannot be deleted.

Txn.Get func

func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
  • Purpose: Retrieves an item from the database.
  • Parameters:
    • key: []byte - The key to retrieve.
  • Returns:
    • *Item - The retrieved item.
    • rerr - An error if the item cannot be retrieved.

Txn.addReadKey func

func (txn *Txn) addReadKey(key []byte) {
  • Purpose: Adds a key to the list of keys read by the transaction. Used for conflict detection.
  • Parameters:
    • key: []byte - The key that was read.

Txn.Discard func

func (txn *Txn) Discard() {
  • Purpose: Discards the transaction. This must be called on all transactions.

Txn.commitAndSend func

func (txn *Txn) commitAndSend() (func() error, error) {
  • Purpose: Commits the transaction and sends the writes to the value log.
  • Returns:
    • func() error - A function to wait for the commit to complete.
    • error - An error if the commit failed.

Txn.commitPrecheck func

func (txn *Txn) commitPrecheck() error {
  • Purpose: Performs pre-commit checks.
  • Returns: error - An error if the pre-commit checks failed.

Txn.Commit func

func (txn *Txn) Commit() error {
  • Purpose: Commits the transaction.
  • Returns: error - An error if the commit failed.

txnCb struct

type txnCb struct {
commit func() error
user func(error)
err error
}
  • Purpose: Structure for handling transaction callbacks.
    • commit: Function to commit the transaction.
    • user: User-provided callback function.
    • err: Error encountered during commit.

runTxnCallback func

func runTxnCallback(cb *txnCb) {
  • Purpose: Runs a transaction callback.
  • Parameters:
    • cb: *txnCb - The callback to run.

Txn.CommitWith func

func (txn *Txn) CommitWith(cb func(error)) {
  • Purpose: Commits the transaction with a callback.
  • Parameters:
    • cb: func(error) - The callback to run after the commit.

Txn.ReadTs func

func (txn *Txn) ReadTs() uint64 {
  • Purpose: Returns the read timestamp of the transaction.
  • Returns: uint64 - The read timestamp.

DB.NewTransaction func

func (db *DB) NewTransaction(update bool) *Txn {
  • Purpose: Creates a new transaction.
  • Parameters:
    • update: bool - Whether the transaction is for update (read-write).
  • Returns: *Txn - A pointer to the new transaction.

DB.newTransaction func

func (db *DB) newTransaction(update, isManaged bool) *Txn {
  • Purpose: Creates a new transaction (internal use).
  • Parameters:
    • update: bool - Whether the transaction is for update (read-write).
    • isManaged: bool - Whether the transaction is managed.
  • Returns: *Txn - A pointer to the new transaction.

DB.View func

func (db *DB) View(fn func(txn *Txn) error) error {
  • Purpose: Executes a function within a read-only transaction.
  • Parameters:
    • fn: func(txn *Txn) error - The function to execute.
  • Returns: error - An error if the function returns an error.

DB.Update func

func (db *DB) Update(fn func(txn *Txn) error) error {
  • Purpose: Executes a function within a read-write transaction.
  • Parameters:
    • fn: func(txn *Txn) error - The function to execute.
  • Returns: error - An error if the function returns an error or if the commit fails.