Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt RocksDB 9.1.1 #151

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version}

# Note: if you don't have a good reason, please do not set -DPORTABLE=ON
# This one is set here on purpose of compatibility with github action runtime processor
rocksdb_version="8.11.4"
rocksdb_version="9.1.1"
cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \
mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \
Expand Down
21 changes: 15 additions & 6 deletions c.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t* rocksdb_create_snapshot(
extern ROCKSDB_LIBRARY_API void rocksdb_release_snapshot(
rocksdb_t* db, const rocksdb_snapshot_t* snapshot);

extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_snapshot_get_sequence_number(const rocksdb_snapshot_t* snapshot);

/* Returns NULL if property name is unknown.
Else returns a pointer to a malloc()-ed null-terminated value. */
extern ROCKSDB_LIBRARY_API char* rocksdb_property_value(rocksdb_t* db,
Expand Down Expand Up @@ -691,8 +694,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_flush_wal(rocksdb_t* db,
extern ROCKSDB_LIBRARY_API void rocksdb_disable_file_deletions(rocksdb_t* db,
char** errptr);

extern ROCKSDB_LIBRARY_API void rocksdb_enable_file_deletions(
rocksdb_t* db, unsigned char force, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_enable_file_deletions(rocksdb_t* db,
char** errptr);

/* Management operations */

Expand Down Expand Up @@ -1152,10 +1155,16 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_env(rocksdb_options_t*,
rocksdb_env_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log(rocksdb_options_t*,
rocksdb_logger_t*);
extern ROCKSDB_LIBRARY_API rocksdb_logger_t* rocksdb_options_get_info_log(
rocksdb_options_t* opt);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_info_log_level(
rocksdb_options_t*, int);
extern ROCKSDB_LIBRARY_API int rocksdb_options_get_info_log_level(
rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API rocksdb_logger_t*
rocksdb_logger_create_stderr_logger(int log_level, const char* prefix);
extern ROCKSDB_LIBRARY_API void rocksdb_logger_destroy(
rocksdb_logger_t* logger);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_write_buffer_size(
rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API size_t
Expand Down Expand Up @@ -1499,10 +1508,6 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_advise_random_on_open(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_options_get_advise_random_on_open(rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_access_hint_on_compaction_start(rocksdb_options_t*, int);
extern ROCKSDB_LIBRARY_API int
rocksdb_options_get_access_hint_on_compaction_start(rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_adaptive_mutex(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_use_adaptive_mutex(
Expand Down Expand Up @@ -1684,6 +1689,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_ratelimiter_create_auto_tuned(int64_t rate_bytes_per_sec,
int64_t refill_period_us,
int32_t fairness);
extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t*
rocksdb_ratelimiter_create_with_mode(int64_t rate_bytes_per_sec,
int64_t refill_period_us, int32_t fairness,
int mode, bool auto_tuned);
extern ROCKSDB_LIBRARY_API void rocksdb_ratelimiter_destroy(
rocksdb_ratelimiter_t*);

Expand Down
4 changes: 2 additions & 2 deletions cf_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestColumnFamilyMultiGetWithTS(t *testing.T) {
givenVal1 = []byte("world1")
givenVal2 = []byte("world2")
givenVal3 = []byte("world3")
givenTs1 = marshalTimestamp(1)
givenTs1 = marshalTimestamp(0)
givenTs2 = marshalTimestamp(2)
givenTs3 = marshalTimestamp(3)
)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestColumnFamilyMultiGetWithTS(t *testing.T) {
require.EqualValues(t, values[1].Data(), givenVal2)
require.EqualValues(t, values[2].Data(), givenVal3)

require.EqualValues(t, times[0].Data(), givenTs1)
require.EqualValues(t, times[0].Data(), []byte{})
require.EqualValues(t, times[1].Data(), givenTs2)
require.EqualValues(t, times[2].Data(), givenTs3)
}
10 changes: 5 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ func (db *DB) SingleDeleteCFWithTS(opts *WriteOptions, cf *ColumnFamilyHandle, k
}

// DeleteRangeCF deletes keys that are between [startKey, endKey)
func (db *DB) DeleteRangeCF(opts *WriteOptions, cf *ColumnFamilyHandle, startKey []byte, endKey []byte) (err error) {
func (db *DB) DeleteRangeCF(opts *WriteOptions, cf *ColumnFamilyHandle, startKey, endKey []byte) (err error) {
var (
cErr *C.char
cStartKey = refGoBytes(startKey)
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func (db *DB) SingleDeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []b
}

// Merge merges the data associated with the key with the actual data in the database.
func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) (err error) {
func (db *DB) Merge(opts *WriteOptions, key, value []byte) (err error) {
var (
cErr *C.char
cKey = refGoBytes(key)
Expand All @@ -1102,7 +1102,7 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) (err error) {

// MergeCF merges the data associated with the key with the actual data in the
// database and column family.
func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, value []byte) (err error) {
func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) (err error) {
var (
cErr *C.char
cKey = refGoBytes(key)
Expand Down Expand Up @@ -1662,10 +1662,10 @@ func (db *DB) DisableFileDeletions() (err error) {
}

// EnableFileDeletions enables file deletions for the database.
func (db *DB) EnableFileDeletions(force bool) (err error) {
func (db *DB) EnableFileDeletions() (err error) {
var cErr *C.char

C.rocksdb_enable_file_deletions(db.c, boolToChar(force), &cErr)
C.rocksdb_enable_file_deletions(db.c, &cErr)
err = fromCError(cErr)

return
Expand Down
2 changes: 2 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func newTestDBMultiCF(t *testing.T, columns []string, applyOpts func(opts *Optio
dir := t.TempDir()

opts := NewDefaultOptions()
rateLimiter := NewGenericRateLimiter(1024, 100*1000, 10, RateLimiterModeAllIo, true)
opts.SetRateLimiter(rateLimiter)
opts.SetCreateIfMissingColumnFamilies(true)
opts.SetCreateIfMissing(true)
opts.SetCompression(ZSTDCompression)
Expand Down
26 changes: 26 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package grocksdb

// #include "rocksdb/c.h"
// #include "grocksdb.h"
import "C"
import "unsafe"

// Logger struct.
type Logger struct {
c *C.rocksdb_logger_t
}

func NewStderrLogger(level InfoLogLevel, prefix string) *Logger {
prefix_ := C.CString(prefix)
defer C.free(unsafe.Pointer(prefix_))

return &Logger{
c: C.rocksdb_logger_create_stderr_logger(C.int(level), prefix_),
}
}

// Destroy Logger.
func (l *Logger) Destroy() {
C.rocksdb_logger_destroy(l.c)
l.c = nil
}
41 changes: 27 additions & 14 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,18 @@ func (opts *Options) SetEnv(env *Env) {
env.c = nil
}

// SetInfoLog sets info logger.
func (opts *Options) SetInfoLog(logger *Logger) {
C.rocksdb_options_set_info_log(opts.c, logger.c)
}

// GetInfoLog gets info logger.
func (opts *Options) GetInfoLog() *Logger {
return &Logger{
c: C.rocksdb_options_get_info_log(opts.c),
}
}

// SetInfoLogLevel sets the info log level.
//
// Default: InfoInfoLogLevel
Expand Down Expand Up @@ -1451,20 +1463,21 @@ func (opts *Options) GetDbWriteBufferSize() uint64 {
return uint64(C.rocksdb_options_get_db_write_buffer_size(opts.c))
}

// SetAccessHintOnCompactionStart specifies the file access pattern
// once a compaction is started.
//
// It will be applied to all input files of a compaction.
// Default: NormalCompactionAccessPattern
func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) {
C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value))
}

// GetAccessHintOnCompactionStart returns the file access pattern
// once a compaction is started.
func (opts *Options) GetAccessHintOnCompactionStart() CompactionAccessPattern {
return CompactionAccessPattern(C.rocksdb_options_get_access_hint_on_compaction_start(opts.c))
}
// Deprecation in rocksdb v9.x
// // SetAccessHintOnCompactionStart specifies the file access pattern
// // once a compaction is started.
// //
// // It will be applied to all input files of a compaction.
// // Default: NormalCompactionAccessPattern
// func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) {
// C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value))
// }
//
// // GetAccessHintOnCompactionStart returns the file access pattern
// // once a compaction is started.
// func (opts *Options) GetAccessHintOnCompactionStart() CompactionAccessPattern {
// return CompactionAccessPattern(C.rocksdb_options_get_access_hint_on_compaction_start(opts.c))
// }

// SetUseAdaptiveMutex enable/disable adaptive mutex, which spins
// in the user space before resorting to kernel.
Expand Down
8 changes: 6 additions & 2 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func TestOptions(t *testing.T) {
opts.SetAdviseRandomOnOpen(true)
require.EqualValues(t, true, opts.AdviseRandomOnOpen())

opts.SetAccessHintOnCompactionStart(SequentialCompactionAccessPattern)
require.EqualValues(t, SequentialCompactionAccessPattern, opts.GetAccessHintOnCompactionStart())
// opts.SetAccessHintOnCompactionStart(SequentialCompactionAccessPattern)
// require.EqualValues(t, SequentialCompactionAccessPattern, opts.GetAccessHintOnCompactionStart())

opts.SetDbWriteBufferSize(1 << 30)
require.EqualValues(t, 1<<30, opts.GetDbWriteBufferSize())
Expand Down Expand Up @@ -401,6 +401,10 @@ func TestOptions(t *testing.T) {

opts.SetWriteBufferManager(wbm)

lg := NewStderrLogger(InfoInfoLogLevel, "prefix")
opts.SetInfoLog(lg)
require.NotNil(t, opts.GetInfoLog())

// cloning
cl := opts.Clone()
require.EqualValues(t, 5, cl.GetTableCacheNumshardbits())
Expand Down
49 changes: 49 additions & 0 deletions ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ package grocksdb
// #include "rocksdb/c.h"
import "C"

type RateLimiterMode int

const (
RateLimiterModeReadsOnly RateLimiterMode = iota
RateLimiterModeWritesOnly
RateLimiterModeAllIo
)

// RateLimiter is used to control write rate of flush and
// compaction.
type RateLimiter struct {
Expand Down Expand Up @@ -52,6 +60,47 @@ func NewAutoTunedRateLimiter(rateBytesPerSec, refillPeriodMicros int64, fairness
return newNativeRateLimiter(cR)
}

// NewGenericRateLimiter creates a RateLimiter object, which can be shared among RocksDB instances to
// control write rate of flush and compaction.
//
// @rate_bytes_per_sec: this is the only parameter you want to set most of the
// time. It controls the total write rate of compaction and flush in bytes per
// second. Currently, RocksDB does not enforce rate limit for anything other
// than flush and compaction, e.g. write to WAL.
//
// @refill_period_us: this controls how often tokens are refilled. For example,
// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
// burstier writes while smaller value introduces more CPU overhead.
// The default should work for most cases.
//
// @fairness: RateLimiter accepts high-pri requests and low-pri requests.
// A low-pri request is usually blocked in favor of hi-pri request. Currently,
// RocksDB assigns low-pri to request from compaction and high-pri to request
// from flush. Low-pri requests can get blocked if flush requests come in
// continuously. This fairness parameter grants low-pri requests permission by
// 1/fairness chance even though high-pri requests exist to avoid starvation.
// You should be good by leaving it at default 10.
//
// @mode: Mode indicates which types of operations count against the limit.
//
// @auto_tuned: Enables dynamic adjustment of rate limit within the range
// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to
// the recent demand for background I/O.
func NewGenericRateLimiter(
rateBytesPerSec, refillPeriodMicros int64, fairness int32,
mode RateLimiterMode, autoTuned bool,
) *RateLimiter {
cR := C.rocksdb_ratelimiter_create_with_mode(
C.int64_t(rateBytesPerSec),
C.int64_t(refillPeriodMicros),
C.int32_t(fairness),
C.int(mode),
C.bool(autoTuned),
)
return newNativeRateLimiter(cR)
}

// NewNativeRateLimiter creates a native RateLimiter object.
func newNativeRateLimiter(c *C.rocksdb_ratelimiter_t) *RateLimiter {
return &RateLimiter{c: c}
Expand Down
5 changes: 5 additions & 0 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ func newNativeSnapshot(c *C.rocksdb_snapshot_t) *Snapshot {
return &Snapshot{c: c}
}

// GetSequenceNumber gets sequence number of the Snapshot.
func (snapshot *Snapshot) GetSequenceNumber() uint64 {
return uint64(C.rocksdb_snapshot_get_sequence_number(snapshot.c))
}

// Destroy deallocates the Snapshot object.
func (snapshot *Snapshot) Destroy() {
C.rocksdb_free(unsafe.Pointer(snapshot.c))
Expand Down
Loading