Skip to main content

publisher.go

publisher.go - Overview

This file defines the publisher struct and its associated methods for managing subscribers and publishing updates to them based on key matches using a Trie index.

Detailed Documentation

subscriber

type subscriber struct {
id uint64
matches []pb.Match
sendCh chan *pb.KVList
subCloser *z.Closer
active *atomic.Uint64
}
  • Purpose: Represents a subscriber to database updates.
    • id: Unique identifier for the subscriber.
    • matches: List of pb.Match criteria the subscriber is interested in.
    • sendCh: Channel to send pb.KVList updates to the subscriber.
    • subCloser: Used to close the subscriber's resources.
    • active: Atomic boolean to track subscriber's active status.

publisher

type publisher struct {
sync.Mutex
pubCh chan requests
subscribers map[uint64]subscriber
nextID uint64
indexer *trie.Trie
}
  • Purpose: Manages subscribers and publishes updates based on key matches.
    • sync.Mutex: Mutex to protect concurrent access to the publisher's state.
    • pubCh: Channel to receive update requests.
    • subscribers: Map of subscriber ID to subscriber struct.
    • nextID: Next available subscriber ID.
    • indexer: trie.Trie index for matching keys to subscriber IDs.

newPublisher

func newPublisher() *publisher {
return &publisher{
pubCh: make(chan requests, 1000),
subscribers: make(map[uint64]subscriber),
nextID: 0,
indexer: trie.NewTrie(),
}
}
  • Purpose: Creates a new publisher instance.
  • Returns: A pointer to a new publisher struct with initialized fields.

listenForUpdates

func (p *publisher) listenForUpdates(c *z.Closer) {
defer func() {
p.cleanSubscribers()
c.Done()
}()
slurp := func(batch requests) {
for {
select {
case reqs := <-p.pubCh:
batch = append(batch, reqs...)
default:
p.publishUpdates(batch)
return
}
}
}
for {
select {
case <-c.HasBeenClosed():
return
case reqs := <-p.pubCh:
slurp(reqs)
}
}
}
  • Purpose: Listens for updates on the pubCh channel and publishes them to subscribers.
    • Parameters:
      • c: *z.Closer - Closer to signal when the listener should stop.
  • Returns: None.

publishUpdates

func (p *publisher) publishUpdates(reqs requests) {
p.Lock()
defer func() {
p.Unlock()
// Release all the request.
reqs.DecrRef()
}()
batchedUpdates := make(map[uint64]*pb.KVList)
for _, req := range reqs {
for _, e := range req.Entries {
ids := p.indexer.Get(e.Key)
if len(ids) == 0 {
continue
}
k := y.SafeCopy(nil, e.Key)
kv := &pb.KV{
Key: y.ParseKey(k),
Value: y.SafeCopy(nil, e.Value),
Meta: []byte{e.UserMeta},
ExpiresAt: e.ExpiresAt,
Version: y.ParseTs(k),
}
for id := range ids {
if _, ok := batchedUpdates[id]; !ok {
batchedUpdates[id] = &pb.KVList{}
}
batchedUpdates[id].Kv = append(batchedUpdates[id].Kv, kv)
}
}
}

for id, kvs := range batchedUpdates {
if p.subscribers[id].active.Load() == 1 {
p.subscribers[id].sendCh <- kvs
}
}
}
  • Purpose: Publishes updates to subscribers based on the provided requests.
    • Parameters:
      • reqs: requests - A slice of update requests.
  • Returns: None.

newSubscriber

func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (subscriber, error) {
p.Lock()
defer p.Unlock()
ch := make(chan *pb.KVList, 1000)
id := p.nextID
// Increment next ID.
p.nextID++
s := subscriber{
id: id,
matches: matches,
sendCh: ch,
subCloser: c,
active: new(atomic.Uint64),
}
s.active.Store(1)

p.subscribers[id] = s
for _, m := range matches {
if err := p.indexer.AddMatch(m, id); err != nil {
return subscriber{}, err
}
}
return s, nil
}
  • Purpose: Creates a new subscriber and adds it to the publisher.
    • Parameters:
      • c: *z.Closer - Closer to manage the subscriber's lifecycle.
      • matches: []pb.Match - List of match criteria for the subscriber.
    • Returns:
      • subscriber: The newly created subscriber.
      • error: An error, if any occurred.

cleanSubscribers

func (p *publisher) cleanSubscribers() {
p.Lock()
defer p.Unlock()
for id, s := range p.subscribers {
for _, m := range s.matches {
_ = p.indexer.DeleteMatch(m, id)
}
delete(p.subscribers, id)
s.subCloser.SignalAndWait()
}
}
  • Purpose: Stops and removes all subscribers.
  • Returns: None.

deleteSubscriber

func (p *publisher) deleteSubscriber(id uint64) {
p.Lock()
defer p.Unlock()
if s, ok := p.subscribers[id]; ok {
for _, m := range s.matches {
_ = p.indexer.DeleteMatch(m, id)
}
}
delete(p.subscribers, id)
}
  • Purpose: Deletes a subscriber by its ID.
    • Parameters:
      • id: uint64 - The ID of the subscriber to delete.
  • Returns: None.

sendUpdates

func (p *publisher) sendUpdates(reqs requests) {
if p.noOfSubscribers() != 0 {
reqs.IncrRef()
p.pubCh <- reqs
}
}
  • Purpose: Sends updates to the publisher channel.
    • Parameters:
      • reqs: requests - The update requests to send.
  • Returns: None.

noOfSubscribers

func (p *publisher) noOfSubscribers() int {
p.Lock()
defer p.Unlock()
return len(p.subscribers)
}
  • Purpose: Returns the number of active subscribers.
  • Returns: The number of subscribers as an integer.

Code Examples

None.

Getting Started Relevance