watermark.go
watermark.go - Overview
This file defines a WaterMark
struct and its associated methods for tracking the progress of asynchronous tasks identified by monotonically increasing indices. It allows marking indices as "begun" and "done", determining the minimum un-finished index, and waiting for a specific index to be marked as done.
Detailed Documentation
uint64Heap
type uint64Heap []uint64
- Purpose: Defines a heap of
uint64
values, used for tracking the minimum un-finished index. It implements theheap
interface.
func (u uint64Heap) Len() int
- Purpose: Returns the length of the heap.
- Returns: The length of the heap (int).
func (u uint64Heap) Less(i, j int) bool
- Purpose: Compares two elements at indices i and j in the heap.
- Parameters:
i
: The index of the first element.j
: The index of the second element.
- Returns:
true
if the element at index i is less than the element at index j,false
otherwise.
func (u uint64Heap) Swap(i, j int)
- Purpose: Swaps two elements at indices i and j in the heap.
- Parameters:
i
: The index of the first element.j
: The index of the second element.
func (u *uint64Heap) Push(x interface{})
- Purpose: Pushes a new element onto the heap.
- Parameters:
x
: The element to push (interface{}). It's type-asserted touint64
.
func (u *uint64Heap) Pop() interface{}
- Purpose: Pops the minimum element from the heap.
- Returns: The minimum element (interface{}). It's type-asserted to
uint64
.
mark
type mark struct {
index uint64
waiter chan struct{}
indices []uint64
done bool
}
- Purpose: Represents a single mark, which can indicate the start or completion of one or more indices. It can also contain a channel for waiting goroutines.
- Fields:
index
: A single index.waiter
: A channel to signal when the index is done.indices
: A slice of indices.done
: A boolean indicating whether the index/indices are done.
WaterMark
type WaterMark struct {
doneUntil atomic.Uint64
lastIndex atomic.Uint64
Name string
markCh chan mark
}
- Purpose: Keeps track of the minimum un-finished index.
- Fields:
doneUntil
: The maximum index for which all indices less than or equal to it are done.lastIndex
: The last index for whichBegin
has been called.Name
: A name for the watermark (purpose unclear from the code).markCh
: A channel for sending marks to the processing goroutine.
func (w *WaterMark) Init(closer *z.Closer)
func (w *WaterMark) Init(closer *z.Closer)
- Purpose: Initializes a
WaterMark
struct. Must be called before using it. - Parameters:
closer
: Az.Closer
for managing the lifecycle of the processing goroutine.
func (w *WaterMark) Begin(index uint64)
func (w *WaterMark) Begin(index uint64)
- Purpose: Sets the last index to the given value and sends a mark indicating the beginning of the index.
- Parameters:
index
: The index that is beginning.
func (w *WaterMark) BeginMany(indices []uint64)
func (w *WaterMark) BeginMany(indices []uint64)
- Purpose: Works like
Begin
but accepts multiple indices. - Parameters:
indices
: A slice of indices that are beginning.
func (w *WaterMark) Done(index uint64)
func (w *WaterMark) Done(index uint64)
- Purpose: Sets a single index as done.
- Parameters:
index
: The index that is done.
func (w *WaterMark) DoneMany(indices []uint64)
func (w *WaterMark) DoneMany(indices []uint64)
- Purpose: Works like
Done
but accepts multiple indices. - Parameters:
indices
: A slice of indices that are done.
func (w *WaterMark) DoneUntil() uint64
func (w *WaterMark) DoneUntil() uint64
- Purpose: Returns the maximum index that has the property that all indices less than or equal to it are done.
- Returns: The maximum done index.
func (w *WaterMark) SetDoneUntil(val uint64)
func (w *WaterMark) SetDoneUntil(val uint64)
- Purpose: Sets the maximum index that has the property that all indices less than or equal to it are done.
- Parameters:
val
: The new value fordoneUntil
.
func (w *WaterMark) LastIndex() uint64
func (w *WaterMark) LastIndex() uint64
- Purpose: Returns the last index for which
Begin
has been called. - Returns: The last index.
func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error
func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error
- Purpose: Waits until the given index is marked as done.
- Parameters:
ctx
: A context for cancellation.index
: The index to wait for.
- Returns: An error if the context is cancelled, or nil if the index is done.
func (w *WaterMark) process(closer *z.Closer)
func (w *WaterMark) process(closer *z.Closer)
- Purpose: Processes the
markCh
channel, updating thedoneUntil
value and notifying any waiting goroutines. - Parameters:
closer
: Az.Closer
for managing the lifecycle of the processing goroutine.
Code Examples
(No specific examples are beneficial for this file.)