Skip to main content

write_bench.go

write_bench.go - Overview

This file implements a command-line tool for benchmarking BadgerDB write performance. It includes functionalities for writing random or sorted data, reporting statistics, running garbage collection, and dropping all or prefixed keys.

Detailed Documentation

var writeBenchCmd

var writeBenchCmd = &cobra.Command{
Use: "write",
Short: "Writes random data to Badger to benchmark write speed.",
Long: `
This command writes random data to Badger to benchmark write speed. Useful for testing and
performance analysis.
`,
RunE: writeBench,
}
  • Purpose: Defines a cobra command named "write" that benchmarks BadgerDB write speed by writing random data.
  • Properties:
    • Use: The command's name: "write".
    • Short: A brief description of the command.
    • Long: A more detailed description of the command.
    • RunE: The function to execute when the command is called: writeBench.

var wo

var (
wo = struct {
keySz int
valSz int
numKeys float64
syncWrites bool
force bool
sorted bool
showLogs bool

valueThreshold int64
numVersions int
vlogMaxEntries uint32
loadBloomsOnOpen bool
detectConflicts bool
zstdComp bool
showDir bool
ttlDuration string
encryptionKey string
showKeysCount bool
blockCacheSize int64
indexCacheSize int64

dropAllPeriod string
dropPrefixPeriod string
gcPeriod string
gcDiscardRatio float64
}{}
  • Purpose: Defines a struct wo to hold various options related to the write benchmark, such as key size, value size, number of keys, and other configuration parameters.

var sizeWritten

var sizeWritten    atomic.Uint64
  • Purpose: An atomic counter to track the total size of data written during the benchmark.

var gcSuccess

var gcSuccess      atomic.Uint64
  • Purpose: An atomic counter to track the number of successful garbage collection cycles.

var sstCount

var sstCount       uint32
  • Purpose: A counter to keep track of the number of SST files.

var vlogCount

var vlogCount      uint32
  • Purpose: A counter to keep track of the number of Value Log files.

var files

var files          []string
  • Purpose: A slice to store the names of files in the database directory, used for reporting.

var entriesWritten

var entriesWritten atomic.Uint64
  • Purpose: An atomic counter to track the total number of entries written during the benchmark.

func init()

func init() {
benchCmd.AddCommand(writeBenchCmd)
writeBenchCmd.Flags().IntVarP(&wo.keySz, "key-size", "k", 32, "Size of key")
writeBenchCmd.Flags().IntVar(&wo.valSz, "val-size", 128, "Size of value")
writeBenchCmd.Flags().Float64VarP(&wo.numKeys, "keys-mil", "m", 10.0,
"Number of keys to add in millions")
writeBenchCmd.Flags().BoolVar(&wo.syncWrites, "sync", false,
"If true, sync writes to disk.")
writeBenchCmd.Flags().BoolVarP(&wo.force, "force-compact", "f", true,
"Force compact level 0 on close.")
writeBenchCmd.Flags().BoolVarP(&wo.sorted, "sorted", "s", false, "Write keys in sorted order.")
writeBenchCmd.Flags().BoolVarP(&wo.showLogs, "verbose", "v", false, "Show Badger logs.")
writeBenchCmd.Flags().Int64VarP(&wo.valueThreshold, "value-th", "t", 1<<10, "Value threshold")
writeBenchCmd.Flags().IntVarP(&wo.numVersions, "num-version", "n", 1, "Number of versions to keep")
writeBenchCmd.Flags().Int64Var(&wo.blockCacheSize, "block-cache-mb", 256,
"Size of block cache in MB")
writeBenchCmd.Flags().Int64Var(&wo.indexCacheSize, "index-cache-mb", 0,
"Size of index cache in MB.")
writeBenchCmd.Flags().Uint32Var(&wo.vlogMaxEntries, "vlog-maxe", 1000000, "Value log Max Entries")
writeBenchCmd.Flags().StringVarP(&wo.encryptionKey, "encryption-key", "e", "",
"If it is true, badger will encrypt all the data stored on the disk.")
writeBenchCmd.Flags().BoolVar(&wo.loadBloomsOnOpen, "load-blooms", true,
"Load Bloom filter on DB open.")
writeBenchCmd.Flags().BoolVar(&wo.detectConflicts, "conficts", false,
"If true, it badger will detect the conflicts")
writeBenchCmd.Flags().BoolVar(&wo.zstdComp, "zstd", false,
"If true, badger will use ZSTD mode. Otherwise, use default.")
writeBenchCmd.Flags().BoolVar(&wo.showDir, "show-dir", false,
"If true, the report will include the directory contents")
writeBenchCmd.Flags().StringVar(&wo.dropAllPeriod, "dropall", "0s",
"If set, run dropAll periodically over given duration.")
writeBenchCmd.Flags().StringVar(&wo.dropPrefixPeriod, "drop-prefix", "0s",
"If set, drop random prefixes periodically over given duration.")
writeBenchCmd.Flags().StringVar(&wo.ttlDuration, "entry-ttl", "0s",
"TTL duration in seconds for the entries, 0 means without TTL")
writeBenchCmd.Flags().StringVarP(&wo.gcPeriod, "gc-every", "g", "0s", "GC Period.")
writeBenchCmd.Flags().Float64VarP(&wo.gcDiscardRatio, "gc-ratio", "r", 0.5, "GC discard ratio.")
writeBenchCmd.Flags().BoolVar(&wo.showKeysCount, "show-keys", false,
"If true, the report will include the keys statistics")
}
  • Purpose: Initializes the writeBenchCmd by adding it to the benchCmd and defining its flags. It sets up command-line flags to configure the write benchmark.

func writeRandom(db *badger.DB, num uint64) error

func writeRandom(db *badger.DB, num uint64) error {
value := make([]byte, wo.valSz)
y.Check2(rand.Read(value))

es := uint64(wo.keySz + wo.valSz) // entry size is keySz + valSz
batch := db.NewManagedWriteBatch()

ttlPeriod, errParse := time.ParseDuration(wo.ttlDuration)
y.Check(errParse)

for i := uint64(1); i <= num; i++ {
key := make([]byte, wo.keySz)
y.Check2(rand.Read(key))

vsz := rand.Intn(wo.valSz) + 1
e := badger.NewEntry(key, value[:vsz])

if ttlPeriod != 0 {
e.WithTTL(ttlPeriod)
}
err := batch.SetEntryAt(e, 1)
for err == badger.ErrBlockedWrites {
time.Sleep(time.Second)
batch = db.NewManagedWriteBatch()
err = batch.SetEntryAt(e, 1)
}
if err != nil {
panic(err)
}

entriesWritten.Add(1)
sizeWritten.Add(es)
}
return batch.Flush()
}
  • Purpose: Writes random data to BadgerDB.
  • Parameters:
    • db: A pointer to the BadgerDB instance.
    • num: The number of keys to write.
  • Returns: An error, if any.

func readTest(db *badger.DB, dur time.Duration)

  • Purpose: This function performs read tests on the BadgerDB database.
  • Parameters:
    • db: A pointer to the BadgerDB instance.
    • dur: The duration for which the read test should run.
  • Returns: None

func writeSorted(db *badger.DB, num uint64) error

func writeSorted(db *badger.DB, num uint64) error {
value := make([]byte, wo.valSz)
y.Check2(rand.Read(value))
es := 8 + wo.valSz // key size is 8 bytes and value size is valSz

writer := db.NewStreamWriter()
if err := writer.Prepare(); err != nil {
return err
}

wg := &sync.WaitGroup{}
writeCh := make(chan *z.Buffer, 3)
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvBuf := z.NewBuffer(5<<20, "Benchmark.WriteSorted")
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, i)
kv := &pb.KV{
Key: key,
Value: value,
Version: 1,
StreamId: streamId,
}
badger.KVToBuffer(kv, kvBuf)

sz += es
entriesWritten.Add(1)
sizeWritten.Add(uint64(es))

if sz >= 4<<20 { // 4 MB
writeCh <- kvBuf
kvBuf = z.NewBuffer(1&lt;&lt;20, "Benchmark.WriteSorted")
sz = 0
}
}
writeCh <- kvBuf
}

// Let's create some streams.
width := num / 4
streamID := uint32(0)
for start := uint64(0); start &lt; num; start += width {
end := start + width
if end &gt; num {
end = num
}
streamID++
wg.Add(1)
go writeRange(start, end, streamID)
}
go func() {
wg.Wait()
close(writeCh)
}()
log.Printf("Max StreamId used: %d. Width: %d\n", streamID, width)
for kvs := range writeCh {
if err := writer.Write(kvs); err != nil {
panic(err)
}
y.Check(kvs.Release())
}
log.Println("DONE streaming. Flushing...")
return writer.Flush()
}
  • Purpose: Writes sorted data to BadgerDB using the Stream Writer.
  • Parameters:
    • db: A pointer to the BadgerDB instance.
    • num: The number of keys to write.
  • Returns: An error, if any.

func writeBench(cmd *cobra.Command, args []string) error

func writeBench(cmd *cobra.Command, args []string) error {
opt := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithSyncWrites(wo.syncWrites).
WithCompactL0OnClose(wo.force).
WithValueThreshold(wo.valueThreshold).
WithNumVersionsToKeep(wo.numVersions).
WithBlockCacheSize(wo.blockCacheSize &lt;&lt; 20).
WithIndexCacheSize(wo.indexCacheSize &lt;&lt; 20).
WithValueLogMaxEntries(wo.vlogMaxEntries).
WithEncryptionKey([]byte(wo.encryptionKey)).
WithDetectConflicts(wo.detectConflicts).
WithLoggingLevel(badger.INFO)
if wo.zstdComp {
opt = opt.WithCompression(options.ZSTD)
}

if !wo.showLogs {
opt = opt.WithLogger(nil)
}

fmt.Printf("Opening badger with options = %+v\n", opt)
db, err := badger.OpenManaged(opt)
if err != nil {
return err
}
defer func() {
start := time.Now()
err := db.Close()
log.Printf("DB.Close. Error: %v. Time taken to close: %s", err, time.Since(start))
}()

fmt.Println("*********************************************************")
fmt.Println("Starting to benchmark Writes")
fmt.Println("*********************************************************")

startTime = time.Now()
num := uint64(wo.numKeys * mil)
c := z.NewCloser(4)
go reportStats(c, db)
go dropAll(c, db)
go dropPrefix(c, db)
go runGC(c, db)

if wo.sorted {
err = writeSorted(db, num)
} else {
err = writeRandom(db, num)
}

c.SignalAndWait()
fmt.Println(db.LevelsToString())
return err
}
  • Purpose: Executes the write benchmark. It opens a BadgerDB instance, configures it based on command-line flags, performs write operations (either random or sorted), and reports statistics.
  • Parameters:
    • cmd: A pointer to the cobra command.
    • args: Command-line arguments.
  • Returns: An error, if any.

func showKeysStats(db *badger.DB)

func showKeysStats(db *badger.DB) {
var (
internalKeyCount uint32
invalidKeyCount uint32
validKeyCount uint32
)

txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

iopt := badger.DefaultIteratorOptions
iopt.AllVersions = true
iopt.InternalAccess = true
it := txn.NewIterator(iopt)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
i := it.Item()
if bytes.HasPrefix(i.Key(), []byte("!badger!")) {
internalKeyCount++
}
if i.IsDeletedOrExpired() {
invalidKeyCount++
} else {
validKeyCount++
}
}
fmt.Printf("Valid Keys: %d Invalid Keys: %d Internal Keys: %d\n",
validKeyCount, invalidKeyCount, internalKeyCount)
}
  • Purpose: Shows statistics about the keys in the database, including the number of valid, invalid, and internal keys.
  • Parameters:
    • db: A pointer to the BadgerDB instance.
  • Returns: None.

func reportStats(c *z.Closer, db *badger.DB)

func reportStats(c *z.Closer, db *badger.DB) {
defer c.Done()

t := time.NewTicker(time.Second)
defer t.Stop()

var count int
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
count++
if wo.showKeysCount {
showKeysStats(db)
}
// fetch directory contents
if wo.showDir {
err := filepath.Walk(sstDir, func(path string, info os.FileInfo, err error) error {
fileSize := humanize.IBytes(uint64(info.Size()))
files = append(files, "[Content] "+path+" "+fileSize)
if filepath.Ext(path) == ".vlog" {
vlogCount++
}
if filepath.Ext(path) == ".sst" {
sstCount++
}
return nil
})
if err != nil {
log.Printf("Error while fetching directory. %v.", err)
} else {
fmt.Printf("[Content] Number of files:%d\n", len(files))
for _, file := range files {
fmt.Println(file)
}
fmt.Printf("SST Count: %d vlog Count: %d\n", sstCount, vlogCount)
}
}

dur := time.Since(startTime)
sz := sizeWritten.Load()
entries := entriesWritten.Load()
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
"entries written: %d, speed: %d/sec, jemalloc: %s\n",
y.FixedDuration(time.Since(startTime)),
humanize.IBytes(sz), humanize.IBytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))

if count%10 == 0 {
fmt.Println(db.LevelsToString())
}
}
}
}
  • Purpose: Reports write benchmark statistics periodically.
  • Parameters:
    • c: A pointer to the z.Closer for managing goroutine lifecycle.
    • db: A pointer to the BadgerDB instance.
  • Returns: None.

func runGC(c *z.Closer, db *badger.DB)

func runGC(c *z.Closer, db *badger.DB) {
defer c.Done()
period, err := time.ParseDuration(wo.gcPeriod)
y.Check(err)
if period == 0 {
return
}

t := time.NewTicker(period)
defer t.Stop()
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
if err := db.RunValueLogGC(wo.gcDiscardRatio); err == nil {
gcSuccess.Add(1)
} else {
log.Printf("[GC] Failed due to following err %v", err)
}
}
}
}
  • Purpose: Runs garbage collection periodically.
  • Parameters:
    • c: A pointer to the z.Closer for managing goroutine lifecycle.
    • db: A pointer to the BadgerDB instance.
  • Returns: None.

func dropAll(c *z.Closer, db *badger.DB)

func dropAll(c *z.Closer, db *badger.DB) {
defer c.Done()
dropPeriod, err := time.ParseDuration(wo.dropAllPeriod)
y.Check(err)
if dropPeriod == 0 {
return
}

t := time.NewTicker(dropPeriod)
defer t.Stop()
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
fmt.Println("[DropAll] Started")
err := db.DropAll()
for err == badger.ErrBlockedWrites {
err = db.DropAll()
time.Sleep(time.Millisecond * 300)
}

if err != nil {
fmt.Println("[DropAll] Failed")
} else {
fmt.Println("[DropAll] Successful")
}
}
}
}
  • Purpose: Drops all keys in the database periodically.
  • Parameters:
    • c: A pointer to the z.Closer for managing goroutine lifecycle.
    • db: A pointer to the BadgerDB instance.
  • Returns: None.

func dropPrefix(c *z.Closer, db *badger.DB)

func dropPrefix(c *z.Closer, db *badger.DB) {
defer c.Done()
dropPeriod, err := time.ParseDuration(wo.dropPrefixPeriod)
y.Check(err)
if dropPeriod == 0 {
return
}

t := time.NewTicker(dropPeriod)
defer t.Stop()
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
fmt.Println("[DropPrefix] Started")
prefix := make([]byte, 1+int(float64(wo.keySz)*0.1))
y.Check2(rand.Read(prefix))
err = db.DropPrefix(prefix)

if err != nil {
panic(err)
} else {
fmt.Println("[DropPrefix] Successful")
}
}
}
}
  • Purpose: Drops keys with a random prefix periodically.
  • Parameters:
    • c: A pointer to the z.Closer for managing goroutine lifecycle.
    • db: A pointer to the BadgerDB instance.
  • Returns: None.

func printReadStats(c *z.Closer, startTime time.Time)

func printReadStats(c *z.Closer, startTime time.Time) {
defer c.Done()

t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
dur := time.Since(startTime)
sz := sizeRead.Load()
entries := entriesRead.Load()
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[READ] Time elapsed: %s, bytes read: %s, speed: %s/sec, "+
"entries read: %d, speed: %d/sec\n", y.FixedDuration(time.Since(startTime)),
humanize.IBytes(sz), humanize.IBytes(bytesRate), entries, entriesRate)
}
}
}
  • Purpose: Prints read benchmark statistics periodically.
  • Parameters:
    • c: A pointer to the z.Closer for managing goroutine lifecycle.
    • startTime: The time at which the read benchmark started.
  • Returns: None.

Getting Started Relevance