txn.go
txn.go - Overview
This file defines the transaction management logic for BadgerDB, including the Txn
struct, the oracle
struct for timestamp management, and related functions for committing, discarding, and handling conflicts.
Detailed Documentation
oracle struct
type oracle struct {
isManaged bool
detectConflicts bool
sync.Mutex
writeChLock sync.Mutex
nextTxnTs uint64
txnMark *y.WaterMark
discardTs uint64
readMark *y.WaterMark
committedTxns []committedTxn
lastCleanupTs uint64
closer *z.Closer
}
- Purpose: Manages transaction timestamps, conflict detection, and discard timestamps. It ensures that transactions are processed in a consistent order and that conflicts are detected.
isManaged
: Indicates whether the database is in managed transaction mode.detectConflicts
: Indicates whether transactions should be checked for conflicts.sync.Mutex
: Mutex for protectingnextTxnTs
andcommits
.writeChLock
: Mutex for serializing transactions going to the write channel.nextTxnTs
: The next available transaction timestamp.txnMark
: Watermark to track transactions.discardTs
: Used by ManagedDB to track the oldest timestamp that can be discarded.readMark
: Watermark to track read timestamps.committedTxns
: Keeps track of the entries written and their commit timestamp.lastCleanupTs
: Timestamp of the last cleanup operation.closer
: Used to stop watermarks.
committedTxn struct
type committedTxn struct {
ts uint64
conflictKeys map[uint64]struct{}
}
- Purpose: Represents a committed transaction, storing its timestamp and the keys it wrote (for conflict detection).
ts
: Timestamp of the committed transaction.conflictKeys
: Keeps track of the entries written at timestamp ts (fingerprints).
newOracle func
func newOracle(opt Options) *oracle {
- Purpose: Creates a new
oracle
instance. - Parameters:
opt
:Options
- Database options.
- Returns:
*oracle
- A pointer to the newly createdoracle
.
oracle.Stop func
func (o *oracle) Stop() {
- Purpose: Stops the oracle by signaling and waiting on its closer.
oracle.readTs func
func (o *oracle) readTs() uint64 {
- Purpose: Returns a read timestamp. Blocks until all previous transactions have been committed.
- Returns:
uint64
- The read timestamp.
oracle.nextTs func
func (o *oracle) nextTs() uint64 {
- Purpose: Returns the next transaction timestamp without incrementing it.
- Returns:
uint64
- The next transaction timestamp.
oracle.incrementNextTs func
func (o *oracle) incrementNextTs() {
- Purpose: Increments the next transaction timestamp.
oracle.setDiscardTs func
func (o *oracle) setDiscardTs(ts uint64) {
- Purpose: Sets the discard timestamp.
- Parameters:
ts
:uint64
- The discard timestamp to set.
oracle.discardAtOrBelow func
func (o *oracle) discardAtOrBelow() uint64 {
- Purpose: Returns the discard timestamp.
- Returns:
uint64
- The discard timestamp.
oracle.hasConflict func
func (o *oracle) hasConflict(txn *Txn) bool {
- Purpose: Checks if the given transaction has a conflict with any committed transaction.
- Parameters:
txn
:*Txn
- The transaction to check.
- Returns:
bool
- True if there is a conflict, false otherwise.
oracle.newCommitTs func
func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
- Purpose: Generates a new commit timestamp for the given transaction. It also checks for conflicts and updates internal state.
- Parameters:
txn
:*Txn
- The transaction to commit.
- Returns:
uint64
- The commit timestamp.bool
- True if there was a conflict, false otherwise.
oracle.doneRead func
func (o *oracle) doneRead(txn *Txn) {
- Purpose: Marks the read timestamp as done.
- Parameters:
txn
:*Txn
- The transaction.
oracle.cleanupCommittedTransactions func
func (o *oracle) cleanupCommittedTransactions() {
- Purpose: Cleans up the committed transactions slice by removing transactions whose timestamps are below the maximum read timestamp.
oracle.doneCommit func
func (o *oracle) doneCommit(cts uint64) {
- Purpose: Marks a commit timestamp as done.
- Parameters:
cts
:uint64
- The commit timestamp.
Txn struct
type Txn struct {
readTs uint64
commitTs uint64
size int64
count int64
db *DB
reads []uint64
conflictKeys map[uint64]struct{}
readsLock sync.Mutex
pendingWrites map[string]*Entry
duplicateWrites []*Entry
numIterators atomic.Int32
discarded bool
doneRead bool
update bool
}
- Purpose: Represents a Badger transaction.
readTs
: The read timestamp of the transaction.commitTs
: The commit timestamp of the transaction.size
: The size of the transaction.count
: The number of entries in the transaction.db
: A pointer to theDB
instance.reads
: Contains fingerprints of keys read.conflictKeys
: Contains fingerprints of keys written, used for conflict detection.readsLock
: Guards the reads slice.pendingWrites
: Cache stores any writes done by txn.duplicateWrites
: Used in managed mode to store duplicate entries.numIterators
: Atomic counter for the number of iterators.discarded
: Indicates if the transaction has been discarded.doneRead
: Indicates if the read timestamp has been marked as done.update
: Indicates whether the transaction is for update (read-write).
pendingWritesIterator struct
type pendingWritesIterator struct {
entries []*Entry
nextIdx int
readTs uint64
reversed bool
}
- Purpose: Iterator for pending writes in a transaction.
entries
: Slice of entries.nextIdx
: Index of the next entry.readTs
: Read timestamp.reversed
: Indicates if the iterator is reversed.
pendingWritesIterator.Next func
func (pi *pendingWritesIterator) Next() {
- Purpose: Advances the iterator to the next entry.
pendingWritesIterator.Rewind func
func (pi *pendingWritesIterator) Rewind() {
- Purpose: Resets the iterator to the beginning.
pendingWritesIterator.Seek func
func (pi *pendingWritesIterator) Seek(key []byte) {
- Purpose: Seeks to the entry with the given key.
- Parameters:
key
:[]byte
- The key to seek to.
pendingWritesIterator.Key func
func (pi *pendingWritesIterator) Key() []byte {
- Purpose: Returns the key of the current entry.
- Returns:
[]byte
- The key of the current entry.
pendingWritesIterator.Value func
func (pi *pendingWritesIterator) Value() y.ValueStruct {
- Purpose: Returns the value struct of the current entry.
- Returns:
y.ValueStruct
- The value struct of the current entry.
pendingWritesIterator.Valid func
func (pi *pendingWritesIterator) Valid() bool {
- Purpose: Returns whether the iterator is valid.
- Returns:
bool
- True if the iterator is valid, false otherwise.
pendingWritesIterator.Close func
func (pi *pendingWritesIterator) Close() error {
- Purpose: Closes the iterator.
- Returns:
error
- Always nil.
Txn.newPendingWritesIterator func
func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
- Purpose: Creates a new iterator for the pending writes.
- Parameters:
reversed
:bool
- Whether the iterator should be reversed.
- Returns:
*pendingWritesIterator
- A pointer to the new iterator.
Txn.checkSize func
func (txn *Txn) checkSize(e *Entry) error {
- Purpose: Checks if adding the entry would exceed the transaction size limits.
- Parameters:
e
:*Entry
- The entry to check.
- Returns:
error
-ErrTxnTooBig
if the transaction would be too big, nil otherwise.
exceedsSize func
func exceedsSize(prefix string, max int64, key []byte) error {
- Purpose: Returns an error indicating that a size limit has been exceeded.
- Parameters:
prefix
:string
- A prefix for the error message.max
:int64
- The maximum allowed size.key
:[]byte
- The key that exceeded the size.
- Returns:
error
- An error indicating that the size limit has been exceeded.
Txn.modify func
func (txn *Txn) modify(e *Entry) error {
- Purpose: Modifies the transaction by adding, updating, or deleting an entry.
- Parameters:
e
:*Entry
- The entry to modify.
- Returns:
error
- An error if the transaction cannot be modified.
Txn.Set func
func (txn *Txn) Set(key, val []byte) error {
- Purpose: Adds a key-value pair to the database.
- Parameters:
key
:[]byte
- The key.val
:[]byte
- The value.
- Returns:
error
- An error if the key-value pair cannot be set.
Txn.SetEntry func
func (txn *Txn) SetEntry(e *Entry) error {
- Purpose: Adds an entry to the database.
- Parameters:
e
:*Entry
- The entry to add.
- Returns:
error
- An error if the entry cannot be set.
Txn.Delete func
func (txn *Txn) Delete(key []byte) error {
- Purpose: Deletes a key from the database.
- Parameters:
key
:[]byte
- The key to delete.
- Returns:
error
- An error if the key cannot be deleted.
Txn.Get func
func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
- Purpose: Retrieves an item from the database.
- Parameters:
key
:[]byte
- The key to retrieve.
- Returns:
*Item
- The retrieved item.rerr
- An error if the item cannot be retrieved.
Txn.addReadKey func
func (txn *Txn) addReadKey(key []byte) {
- Purpose: Adds a key to the list of keys read by the transaction. Used for conflict detection.
- Parameters:
key
:[]byte
- The key that was read.
Txn.Discard func
func (txn *Txn) Discard() {
- Purpose: Discards the transaction. This must be called on all transactions.
Txn.commitAndSend func
func (txn *Txn) commitAndSend() (func() error, error) {
- Purpose: Commits the transaction and sends the writes to the value log.
- Returns:
func() error
- A function to wait for the commit to complete.error
- An error if the commit failed.
Txn.commitPrecheck func
func (txn *Txn) commitPrecheck() error {
- Purpose: Performs pre-commit checks.
- Returns:
error
- An error if the pre-commit checks failed.
Txn.Commit func
func (txn *Txn) Commit() error {
- Purpose: Commits the transaction.
- Returns:
error
- An error if the commit failed.
txnCb struct
type txnCb struct {
commit func() error
user func(error)
err error
}
- Purpose: Structure for handling transaction callbacks.
commit
: Function to commit the transaction.user
: User-provided callback function.err
: Error encountered during commit.
runTxnCallback func
func runTxnCallback(cb *txnCb) {
- Purpose: Runs a transaction callback.
- Parameters:
cb
:*txnCb
- The callback to run.
Txn.CommitWith func
func (txn *Txn) CommitWith(cb func(error)) {
- Purpose: Commits the transaction with a callback.
- Parameters:
cb
:func(error)
- The callback to run after the commit.
Txn.ReadTs func
func (txn *Txn) ReadTs() uint64 {
- Purpose: Returns the read timestamp of the transaction.
- Returns:
uint64
- The read timestamp.
DB.NewTransaction func
func (db *DB) NewTransaction(update bool) *Txn {
- Purpose: Creates a new transaction.
- Parameters:
update
:bool
- Whether the transaction is for update (read-write).
- Returns:
*Txn
- A pointer to the new transaction.
DB.newTransaction func
func (db *DB) newTransaction(update, isManaged bool) *Txn {
- Purpose: Creates a new transaction (internal use).
- Parameters:
update
:bool
- Whether the transaction is for update (read-write).isManaged
:bool
- Whether the transaction is managed.
- Returns:
*Txn
- A pointer to the new transaction.
DB.View func
func (db *DB) View(fn func(txn *Txn) error) error {
- Purpose: Executes a function within a read-only transaction.
- Parameters:
fn
:func(txn *Txn) error
- The function to execute.
- Returns:
error
- An error if the function returns an error.
DB.Update func
func (db *DB) Update(fn func(txn *Txn) error) error {
- Purpose: Executes a function within a read-write transaction.
- Parameters:
fn
:func(txn *Txn) error
- The function to execute.
- Returns:
error
- An error if the function returns an error or if the commit fails.