Skip to main content

stream.go

stream.go - Overview

This file defines the stream command, which allows streaming the contents of a BadgerDB database to another database or backing it up to a file, potentially with different configuration options like compression or number of versions.

Detailed Documentation

var streamCmd

var streamCmd = &cobra.Command{
Use: "stream",
Short: "Stream DB into another DB with different options",
Long: `
This command streams the contents of this DB into another DB with the given options.
`,
RunE: stream,
}
  • Purpose: Defines the stream command using the cobra library. This command facilitates streaming data between BadgerDB instances or backing up a database to a file.
  • Fields:
    • Use: The command's name: "stream".
    • Short: A brief description of the command.
    • Long: A more detailed description of the command.
    • RunE: The function to execute when the command is invoked: stream.

var so

var so = struct {
outDir string
outFile string
compressionType uint32
numVersions int
readOnly bool
keyPath string
}{}
  • Purpose: Defines a struct so to hold command-line options for the stream command.
  • Fields:
    • outDir: The path to the output database directory (string).
    • outFile: The path to the output backup file (string).
    • compressionType: The compression type for the output database (uint32).
    • numVersions: The number of versions to keep per key in the output database (int).
    • readOnly: A boolean indicating whether the input database should be opened in read-only mode (bool).
    • keyPath: The path to the encryption key file (string).

func init()

func init() {
// TODO: Add more options.
RootCmd.AddCommand(streamCmd)
streamCmd.Flags().StringVarP(&so.outDir, "out", "o", "",
"Path to output DB. The directory should be empty.")
streamCmd.Flags().StringVarP(&so.outFile, "", "f", "",
"Run a backup to this file.")
streamCmd.Flags().BoolVarP(&so.readOnly, "read_only", "", true,
"Option to open input DB in read-only mode")
streamCmd.Flags().IntVarP(&so.numVersions, "num_versions", "", 0,
"Option to configure the maximum number of versions per key. "+
"Values <= 0 will be considered to have the max number of versions.")
streamCmd.Flags().Uint32VarP(&so.compressionType, "compression", "", 1,
"Option to configure the compression type in output DB. "+
"0 to disable, 1 for Snappy, and 2 for ZSTD.")
streamCmd.Flags().StringVarP(&so.keyPath, "encryption-key-file", "e", "",
"Path of the encryption key file.")
}
  • Purpose: Initializes the streamCmd by adding it to the root command and defining its flags.
  • Functionality:
    • Adds the streamCmd to the RootCmd.
    • Defines command-line flags using StringVarP, BoolVarP, and IntVarP to populate the fields of the so struct. These flags control the output directory, output file, read-only mode, number of versions, compression type, and encryption key path.

func stream(cmd *cobra.Command, args []string) error

func stream(cmd *cobra.Command, args []string) error {
// Options for input DB.
if so.numVersions <= 0 {
so.numVersions = math.MaxInt32
}
encKey, err := getKey(so.keyPath)
if err != nil {
return err
}
inOpt := badger.DefaultOptions(sstDir).
WithReadOnly(so.readOnly).
WithValueThreshold(1 &lt;&lt; 10 /* 1KB */).
WithNumVersionsToKeep(so.numVersions).
WithBlockCacheSize(100 &lt;&lt; 20).
WithIndexCacheSize(200 &lt;&lt; 20).
WithEncryptionKey(encKey)

// Options for output DB.
if so.compressionType &gt; 2 {
return errors.New(
"compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)")
}
inDB, err := badger.OpenManaged(inOpt)
if err != nil {
return y.Wrapf(err, "cannot open DB at %s", sstDir)
}
defer inDB.Close()

stream := inDB.NewStreamAt(math.MaxUint64)

if len(so.outDir) &gt; 0 {
if _, err := os.Stat(so.outDir); err == nil {
f, err := os.Open(so.outDir)
if err != nil {
return err
}
defer f.Close()

_, err = f.Readdirnames(1)
if err != io.EOF {
return fmt.Errorf(
"cannot run stream tool on non-empty output directory %s", so.outDir)
}
}

stream.LogPrefix = "DB.Stream"
outOpt := inOpt.
WithDir(so.outDir).
WithValueDir(so.outDir).
WithNumVersionsToKeep(so.numVersions).
WithCompression(options.CompressionType(so.compressionType)).
WithEncryptionKey(encKey).
WithReadOnly(false)
err = inDB.StreamDB(outOpt)

} else if len(so.outFile) &gt; 0 {
stream.LogPrefix = "DB.Backup"
f, err := os.OpenFile(so.outFile, os.O_RDWR|os.O_CREATE, 0666)
y.Check(err)
_, err = stream.Backup(f, 0)
y.Check(err)
}
fmt.Println("Done.")
return err
}
  • Purpose: Implements the logic for the stream command, handling database streaming or backup based on the provided command-line arguments.
  • Parameters:
    • cmd: A pointer to the cobra.Command object.
    • args: A slice of strings representing the command-line arguments.
  • Returns: An error object, or nil if the command executes successfully.
  • Functionality:
    • Sets so.numVersions to math.MaxInt32 if it's less than or equal to 0.
    • Gets the encryption key using the getKey function.
    • Configures the input database options (inOpt) using badger.DefaultOptions and the values from the so struct.
    • Validates the compression type.
    • Opens the input database using badger.OpenManaged.
    • Creates a new stream using inDB.NewStreamAt.
    • If so.outDir is specified:
      • Checks if the output directory is empty.
      • Configures the output database options (outOpt).
      • Streams the database to the output directory using inDB.StreamDB.
    • Else if so.outFile is specified:
      • Backs up the database to the output file using stream.Backup.
    • Prints "Done." to the console.

Code Examples

No specific examples are needed, as the code itself demonstrates the usage.

Getting Started Relevance