Skip to main content

watermark.go

watermark.go - Overview

This file defines a WaterMark struct and its associated methods for tracking the progress of asynchronous tasks identified by monotonically increasing indices. It allows marking indices as "begun" and "done", determining the minimum un-finished index, and waiting for a specific index to be marked as done.

Detailed Documentation

uint64Heap

type uint64Heap []uint64
  • Purpose: Defines a heap of uint64 values, used for tracking the minimum un-finished index. It implements the heap interface.

func (u uint64Heap) Len() int

  • Purpose: Returns the length of the heap.
  • Returns: The length of the heap (int).

func (u uint64Heap) Less(i, j int) bool

  • Purpose: Compares two elements at indices i and j in the heap.
  • Parameters:
    • i: The index of the first element.
    • j: The index of the second element.
  • Returns: true if the element at index i is less than the element at index j, false otherwise.

func (u uint64Heap) Swap(i, j int)

  • Purpose: Swaps two elements at indices i and j in the heap.
  • Parameters:
    • i: The index of the first element.
    • j: The index of the second element.

func (u *uint64Heap) Push(x interface{})

  • Purpose: Pushes a new element onto the heap.
  • Parameters:
    • x: The element to push (interface{}). It's type-asserted to uint64.

func (u *uint64Heap) Pop() interface{}

  • Purpose: Pops the minimum element from the heap.
  • Returns: The minimum element (interface{}). It's type-asserted to uint64.

mark

type mark struct {
index uint64
waiter chan struct{}
indices []uint64
done bool
}
  • Purpose: Represents a single mark, which can indicate the start or completion of one or more indices. It can also contain a channel for waiting goroutines.
  • Fields:
    • index: A single index.
    • waiter: A channel to signal when the index is done.
    • indices: A slice of indices.
    • done: A boolean indicating whether the index/indices are done.

WaterMark

type WaterMark struct {
doneUntil atomic.Uint64
lastIndex atomic.Uint64
Name string
markCh chan mark
}
  • Purpose: Keeps track of the minimum un-finished index.
  • Fields:
    • doneUntil: The maximum index for which all indices less than or equal to it are done.
    • lastIndex: The last index for which Begin has been called.
    • Name: A name for the watermark (purpose unclear from the code).
    • markCh: A channel for sending marks to the processing goroutine.

func (w *WaterMark) Init(closer *z.Closer)

func (w *WaterMark) Init(closer *z.Closer)
  • Purpose: Initializes a WaterMark struct. Must be called before using it.
  • Parameters:
    • closer: A z.Closer for managing the lifecycle of the processing goroutine.

func (w *WaterMark) Begin(index uint64)

func (w *WaterMark) Begin(index uint64)
  • Purpose: Sets the last index to the given value and sends a mark indicating the beginning of the index.
  • Parameters:
    • index: The index that is beginning.

func (w *WaterMark) BeginMany(indices []uint64)

func (w *WaterMark) BeginMany(indices []uint64)
  • Purpose: Works like Begin but accepts multiple indices.
  • Parameters:
    • indices: A slice of indices that are beginning.

func (w *WaterMark) Done(index uint64)

func (w *WaterMark) Done(index uint64)
  • Purpose: Sets a single index as done.
  • Parameters:
    • index: The index that is done.

func (w *WaterMark) DoneMany(indices []uint64)

func (w *WaterMark) DoneMany(indices []uint64)
  • Purpose: Works like Done but accepts multiple indices.
  • Parameters:
    • indices: A slice of indices that are done.

func (w *WaterMark) DoneUntil() uint64

func (w *WaterMark) DoneUntil() uint64
  • Purpose: Returns the maximum index that has the property that all indices less than or equal to it are done.
  • Returns: The maximum done index.

func (w *WaterMark) SetDoneUntil(val uint64)

func (w *WaterMark) SetDoneUntil(val uint64)
  • Purpose: Sets the maximum index that has the property that all indices less than or equal to it are done.
  • Parameters:
    • val: The new value for doneUntil.

func (w *WaterMark) LastIndex() uint64

func (w *WaterMark) LastIndex() uint64
  • Purpose: Returns the last index for which Begin has been called.
  • Returns: The last index.

func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error

func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error
  • Purpose: Waits until the given index is marked as done.
  • Parameters:
    • ctx: A context for cancellation.
    • index: The index to wait for.
  • Returns: An error if the context is cancelled, or nil if the index is done.

func (w *WaterMark) process(closer *z.Closer)

func (w *WaterMark) process(closer *z.Closer)
  • Purpose: Processes the markCh channel, updating the doneUntil value and notifying any waiting goroutines.
  • Parameters:
    • closer: A z.Closer for managing the lifecycle of the processing goroutine.

Code Examples

(No specific examples are beneficial for this file.)

Getting Started Relevance