diff --git a/.gitignore b/.gitignore index c4713f7..1af18ef 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ libs/bzip2/libbz2.a build/* .vscode/c_cpp_properties.json dist +.vscode/settings.json diff --git a/build.sh b/build.sh index 3459d68..30dd9bd 100644 --- a/build.sh +++ b/build.sh @@ -11,13 +11,13 @@ CMAKE_REQUIRED_PARAMS="-DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREF snappy_version="1.1.10" cd $BUILD_PATH && wget https://github.com/google/snappy/archive/${snappy_version}.tar.gz && tar xzf ${snappy_version}.tar.gz && cd snappy-${snappy_version} && \ mkdir -p build_place && cd build_place && \ - CXXFLAGS='-fPIC -O3 -pipe -Wno-uninitialized -Werror,-Wno-sign-compare' cmake $CMAKE_REQUIRED_PARAMS -DSNAPPY_BUILD_TESTS=OFF -DSNAPPY_BUILD_BENCHMARKS=OFF .. && make install/strip -j16 && \ + CXXFLAGS='-fPIC -O3 -pipe -Wno-uninitialized -Wno-sign-compare' cmake $CMAKE_REQUIRED_PARAMS -DSNAPPY_BUILD_TESTS=OFF -DSNAPPY_BUILD_BENCHMARKS=OFF .. && make install/strip -j16 && \ cd $BUILD_PATH && rm -rf * export CFLAGS='-fPIC -O3 -pipe' export CXXFLAGS='-fPIC -O3 -pipe -Wno-uninitialized' -zlib_version="1.2.11" +zlib_version="1.2.13" cd $BUILD_PATH && wget https://github.com/madler/zlib/archive/v${zlib_version}.tar.gz && tar xzf v${zlib_version}.tar.gz && cd zlib-${zlib_version} && \ ./configure --prefix=$INSTALL_PREFIX --static && make -j16 install && \ cd $BUILD_PATH && rm -rf * @@ -27,7 +27,7 @@ cd $BUILD_PATH && wget https://github.com/lz4/lz4/archive/v${lz4_version}.tar.gz cmake $CMAKE_REQUIRED_PARAMS -DLZ4_BUILD_LEGACY_LZ4C=OFF -DBUILD_SHARED_LIBS=OFF -DLZ4_POSITION_INDEPENDENT_LIB=ON && make -j16 install && \ cd $BUILD_PATH && rm -rf * -zstd_version="1.5.4" +zstd_version="1.5.5" cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version}.tar.gz && tar xzf v${zstd_version}.tar.gz && \ cd zstd-${zstd_version}/build/cmake && mkdir -p build_place && cd build_place && \ cmake $CMAKE_REQUIRED_PARAMS -DZSTD_BUILD_PROGRAMS=OFF -DZSTD_BUILD_CONTRIB=OFF -DZSTD_BUILD_STATIC=ON -DZSTD_BUILD_SHARED=OFF -DZSTD_BUILD_TESTS=OFF \ @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version} # Note: if you don't have a good reason, please do not set -DPORTABLE=ON # # This one is set here on purpose of compatibility with github action runtime processor -rocksdb_version="8.0.0" +rocksdb_version="8.1.1" cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \ mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \ -DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \ diff --git a/c.h b/c.h index 701f145..1ba7fab 100644 --- a/c.h +++ b/c.h @@ -76,6 +76,8 @@ typedef struct rocksdb_backup_engine_options_t rocksdb_backup_engine_options_t; typedef struct rocksdb_restore_options_t rocksdb_restore_options_t; typedef struct rocksdb_memory_allocator_t rocksdb_memory_allocator_t; typedef struct rocksdb_lru_cache_options_t rocksdb_lru_cache_options_t; +typedef struct rocksdb_hyper_clock_cache_options_t + rocksdb_hyper_clock_cache_options_t; typedef struct rocksdb_cache_t rocksdb_cache_t; typedef struct rocksdb_compactionfilter_t rocksdb_compactionfilter_t; typedef struct rocksdb_compactionfiltercontext_t @@ -597,13 +599,14 @@ extern ROCKSDB_LIBRARY_API void rocksdb_release_snapshot( extern ROCKSDB_LIBRARY_API char* rocksdb_property_value(rocksdb_t* db, const char* propname); /* returns 0 on success, -1 otherwise */ -int rocksdb_property_int(rocksdb_t* db, const char* propname, - uint64_t* out_val); +extern ROCKSDB_LIBRARY_API int rocksdb_property_int(rocksdb_t* db, + const char* propname, + uint64_t* out_val); /* returns 0 on success, -1 otherwise */ -int rocksdb_property_int_cf(rocksdb_t* db, - rocksdb_column_family_handle_t* column_family, - const char* propname, uint64_t* out_val); +extern ROCKSDB_LIBRARY_API int rocksdb_property_int_cf( + rocksdb_t* db, rocksdb_column_family_handle_t* column_family, + const char* propname, uint64_t* out_val); extern ROCKSDB_LIBRARY_API char* rocksdb_property_value_cf( rocksdb_t* db, rocksdb_column_family_handle_t* column_family, @@ -662,6 +665,11 @@ extern ROCKSDB_LIBRARY_API void rocksdb_flush_cf( rocksdb_t* db, const rocksdb_flushoptions_t* options, rocksdb_column_family_handle_t* column_family, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_flush_cfs( + rocksdb_t* db, const rocksdb_flushoptions_t* options, + rocksdb_column_family_handle_t** column_family, int num_column_families, + char** errptr); + extern ROCKSDB_LIBRARY_API void rocksdb_flush_wal(rocksdb_t* db, unsigned char sync, char** errptr); @@ -2012,6 +2020,29 @@ rocksdb_cache_get_usage(rocksdb_cache_t* cache); extern ROCKSDB_LIBRARY_API size_t rocksdb_cache_get_pinned_usage(rocksdb_cache_t* cache); +/* HyperClockCache */ +extern ROCKSDB_LIBRARY_API rocksdb_hyper_clock_cache_options_t* +rocksdb_hyper_clock_cache_options_create(size_t capacity, + size_t estimated_entry_charge); +extern ROCKSDB_LIBRARY_API void rocksdb_hyper_clock_cache_options_destroy( + rocksdb_hyper_clock_cache_options_t*); +extern ROCKSDB_LIBRARY_API void rocksdb_hyper_clock_cache_options_set_capacity( + rocksdb_hyper_clock_cache_options_t*, size_t); +extern ROCKSDB_LIBRARY_API void +rocksdb_hyper_clock_cache_options_set_estimated_entry_charge( + rocksdb_hyper_clock_cache_options_t*, size_t); +extern ROCKSDB_LIBRARY_API void +rocksdb_hyper_clock_cache_options_set_num_shard_bits( + rocksdb_hyper_clock_cache_options_t*, int); +extern ROCKSDB_LIBRARY_API void +rocksdb_hyper_clock_cache_options_set_memory_allocator( + rocksdb_hyper_clock_cache_options_t*, rocksdb_memory_allocator_t*); + +extern ROCKSDB_LIBRARY_API rocksdb_cache_t* rocksdb_cache_create_hyper_clock( + size_t capacity, size_t estimated_entry_charge); +extern ROCKSDB_LIBRARY_API rocksdb_cache_t* +rocksdb_cache_create_hyper_clock_opts(rocksdb_hyper_clock_cache_options_t*); + /* DBPath */ extern ROCKSDB_LIBRARY_API rocksdb_dbpath_t* rocksdb_dbpath_create( @@ -2116,6 +2147,11 @@ rocksdb_ingestexternalfileoptions_set_allow_blocking_flush( extern ROCKSDB_LIBRARY_API void rocksdb_ingestexternalfileoptions_set_ingest_behind( rocksdb_ingestexternalfileoptions_t* opt, unsigned char ingest_behind); +extern ROCKSDB_LIBRARY_API void +rocksdb_ingestexternalfileoptions_set_fail_if_not_bottommost_level( + rocksdb_ingestexternalfileoptions_t* opt, + unsigned char fail_if_not_bottommost_level); + extern ROCKSDB_LIBRARY_API void rocksdb_ingestexternalfileoptions_destroy( rocksdb_ingestexternalfileoptions_t* opt); @@ -2198,6 +2234,12 @@ extern ROCKSDB_LIBRARY_API void rocksdb_universal_compaction_options_destroy( extern ROCKSDB_LIBRARY_API rocksdb_fifo_compaction_options_t* rocksdb_fifo_compaction_options_create(void); extern ROCKSDB_LIBRARY_API void +rocksdb_fifo_compaction_options_set_allow_compaction( + rocksdb_fifo_compaction_options_t* fifo_opts, unsigned char allow_compaction); +extern ROCKSDB_LIBRARY_API unsigned char +rocksdb_fifo_compaction_options_get_allow_compaction( + rocksdb_fifo_compaction_options_t* fifo_opts); +extern ROCKSDB_LIBRARY_API void rocksdb_fifo_compaction_options_set_max_table_files_size( rocksdb_fifo_compaction_options_t* fifo_opts, uint64_t size); extern ROCKSDB_LIBRARY_API uint64_t @@ -2622,6 +2664,11 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf( rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options, rocksdb_column_family_handle_t* column_family, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cfs( + rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options, + rocksdb_column_family_handle_t** column_families, int num_column_families, + char** errptr); + extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_wal( rocksdb_transactiondb_t* txn_db, unsigned char sync, char** errptr); diff --git a/cache.go b/cache.go index d7eddde..7323f4f 100644 --- a/cache.go +++ b/cache.go @@ -20,6 +20,18 @@ func NewLRUCacheWithOptions(opt *LRUCacheOptions) *Cache { return newNativeCache(cCache) } +// NewHyperClockCache creates a new hyper clock cache. +func NewHyperClockCache(capacity, estimatedEntryCharge int) *Cache { + cCache := C.rocksdb_cache_create_hyper_clock(C.size_t(capacity), C.size_t(estimatedEntryCharge)) + return newNativeCache(cCache) +} + +// NewHyperClockCacheWithOpts creates a hyper clock cache with predefined options. +func NewHyperClockCacheWithOpts(opt *HyperClockCacheOptions) *Cache { + cCache := C.rocksdb_cache_create_hyper_clock_opts(opt.c) + return newNativeCache(cCache) +} + // NewNativeCache creates a Cache object. func newNativeCache(c *C.rocksdb_cache_t) *Cache { return &Cache{c: c} @@ -90,3 +102,89 @@ func (l *LRUCacheOptions) SetNumShardBits(n int) { func (l *LRUCacheOptions) SetMemoryAllocator(m *MemoryAllocator) { C.rocksdb_lru_cache_options_set_memory_allocator(l.c, m.c) } + +// HyperClockCacheOptions are options for HyperClockCache. +// +// HyperClockCache is a lock-free Cache alternative for RocksDB block cache +// that offers much improved CPU efficiency vs. LRUCache under high parallel +// load or high contention, with some caveats: +// * Not a general Cache implementation: can only be used for +// BlockBasedTableOptions::block_cache, which RocksDB uses in a way that is +// compatible with HyperClockCache. +// * Requires an extra tuning parameter: see estimated_entry_charge below. +// Similarly, substantially changing the capacity with SetCapacity could +// harm efficiency. +// * SecondaryCache is not yet supported. +// * Cache priorities are less aggressively enforced, which could cause +// cache dilution from long range scans (unless they use fill_cache=false). +// * Can be worse for small caches, because if almost all of a cache shard is +// pinned (more likely with non-partitioned filters), then CLOCK eviction +// becomes very CPU intensive. +// +// See internal cache/clock_cache.h for full description. +type HyperClockCacheOptions struct { + c *C.rocksdb_hyper_clock_cache_options_t +} + +// NewHyperClockCacheOptions creates new options for hyper clock cache. +func NewHyperClockCacheOptions(capacity, estimatedEntryCharge int) *HyperClockCacheOptions { + return &HyperClockCacheOptions{ + c: C.rocksdb_hyper_clock_cache_options_create(C.size_t(capacity), C.size_t(estimatedEntryCharge)), + } +} + +// SetCapacity sets the capacity of the cache. +func (h *HyperClockCacheOptions) SetCapacity(capacity int) { + C.rocksdb_hyper_clock_cache_options_set_capacity(h.c, C.size_t(capacity)) +} + +// SetEstimatedEntryCharge sets the estimated average `charge` associated with cache entries. +// +// This is a critical configuration parameter for good performance from the hyper +// cache, because having a table size that is fixed at creation time greatly +// reduces the required synchronization between threads. +// * If the estimate is substantially too low (e.g. less than half the true +// average) then metadata space overhead with be substantially higher (e.g. +// 200 bytes per entry rather than 100). With kFullChargeCacheMetadata, this +// can slightly reduce cache hit rates, and slightly reduce access times due +// to the larger working memory size. +// * If the estimate is substantially too high (e.g. 25% higher than the true +// average) then there might not be sufficient slots in the hash table for +// both efficient operation and capacity utilization (hit rate). The hyper +// cache will evict entries to prevent load factors that could dramatically +// affect lookup times, instead letting the hit rate suffer by not utilizing +// the full capacity. +// +// A reasonable choice is the larger of block_size and metadata_block_size. +// When WriteBufferManager (and similar) charge memory usage to the block +// cache, this can lead to the same effect as estimate being too low, which +// is better than the opposite. Therefore, the general recommendation is to +// assume that other memory charged to block cache could be negligible, and +// ignore it in making the estimate. +// +// The best parameter choice based on a cache in use is given by +// GetUsage() / GetOccupancyCount(), ignoring metadata overheads such as +// with kDontChargeCacheMetadata. More precisely with +// kFullChargeCacheMetadata is (GetUsage() - 64 * GetTableAddressCount()) / +// GetOccupancyCount(). However, when the average value size might vary +// (e.g. balance between metadata and data blocks in cache), it is better +// to estimate toward the lower side than the higher side. +func (h *HyperClockCacheOptions) SetEstimatedEntryCharge(v int) { + C.rocksdb_hyper_clock_cache_options_set_estimated_entry_charge(h.c, C.size_t(v)) +} + +// SetCapacity sets number of shards used for this cache. +func (h *HyperClockCacheOptions) SetNumShardBits(n int) { + C.rocksdb_hyper_clock_cache_options_set_num_shard_bits(h.c, C.int(n)) +} + +// SetMemoryAllocator for this cache. +func (h *HyperClockCacheOptions) SetMemoryAllocator(m *MemoryAllocator) { + C.rocksdb_hyper_clock_cache_options_set_memory_allocator(h.c, m.c) +} + +// Destroy the options. +func (h *HyperClockCacheOptions) Destroy() { + C.rocksdb_hyper_clock_cache_options_destroy(h.c) + h.c = nil +} diff --git a/cache_test.go b/cache_test.go index 1ee87c2..f9c8d8c 100644 --- a/cache_test.go +++ b/cache_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestCache(t *testing.T) { +func TestLRUCache(t *testing.T) { cache := NewLRUCache(19) defer cache.Destroy() @@ -17,7 +17,18 @@ func TestCache(t *testing.T) { cache.DisownData() } -func TestCacheWithOpts(t *testing.T) { +func TestHyperClockCache(t *testing.T) { + cache := NewHyperClockCache(100, 10) + defer cache.Destroy() + + require.EqualValues(t, 100, cache.GetCapacity()) + cache.SetCapacity(128) + require.EqualValues(t, 128, cache.GetCapacity()) + + cache.DisownData() +} + +func TestLRUCacheWithOpts(t *testing.T) { opts := NewLRUCacheOptions() opts.SetCapacity(19) opts.SetNumShardBits(2) @@ -32,3 +43,20 @@ func TestCacheWithOpts(t *testing.T) { cache.DisownData() } + +func TestHyperClockCacheWithOpts(t *testing.T) { + opts := NewHyperClockCacheOptions(100, 10) + opts.SetCapacity(19) + opts.SetEstimatedEntryCharge(10) + opts.SetNumShardBits(2) + defer opts.Destroy() + + cache := NewHyperClockCacheWithOpts(opts) + defer cache.Destroy() + + require.EqualValues(t, 19, cache.GetCapacity()) + cache.SetCapacity(128) + require.EqualValues(t, 128, cache.GetCapacity()) + + cache.DisownData() +} diff --git a/cf_test.go b/cf_test.go index 8f2e9c2..1a8f9a5 100644 --- a/cf_test.go +++ b/cf_test.go @@ -109,6 +109,7 @@ func TestColumnFamilyBatchPutGet(t *testing.T) { // trigger flush require.Nil(t, db.FlushCF(cfh[0], NewDefaultFlushOptions())) + require.Nil(t, db.FlushCFs(cfh, NewDefaultFlushOptions())) meta := db.GetColumnFamilyMetadataCF(cfh[0]) require.NotNil(t, meta) diff --git a/db.go b/db.go index 55d1998..c4933b1 100644 --- a/db.go +++ b/db.go @@ -1223,6 +1223,24 @@ func (db *DB) GetProperty(propName string) (value string) { return } +// GetIntProperty similar to `GetProperty`, but only works for a subset of properties whose +// return value is an integer. Return the value by integer. +func (db *DB) GetIntProperty(propName string) (value uint64, success bool) { + cProp := C.CString(propName) + success = C.rocksdb_property_int(db.c, cProp, (*C.uint64_t)(&value)) == 0 + C.free(unsafe.Pointer(cProp)) + return +} + +// GetIntPropertyCF similar to `GetProperty`, but only works for a subset of properties whose +// return value is an integer. Return the value by integer. +func (db *DB) GetIntPropertyCF(propName string, cf *ColumnFamilyHandle) (value uint64, success bool) { + cProp := C.CString(propName) + success = C.rocksdb_property_int_cf(db.c, cf.c, cProp, (*C.uint64_t)(&value)) == 0 + C.free(unsafe.Pointer(cProp)) + return +} + // GetPropertyCF returns the value of a database property. func (db *DB) GetPropertyCF(propName string, cf *ColumnFamilyHandle) (value string) { cProp := C.CString(propName) @@ -1562,6 +1580,21 @@ func (db *DB) FlushCF(cf *ColumnFamilyHandle, opts *FlushOptions) (err error) { return } +// FlushCFs triggers a manual flush for the database on specific column families. +func (db *DB) FlushCFs(cfs []*ColumnFamilyHandle, opts *FlushOptions) (err error) { + if n := len(cfs); n > 0 { + _cfs := make([]*C.rocksdb_column_family_handle_t, n) + for i := range _cfs { + _cfs[i] = cfs[i].c + } + + var cErr *C.char + C.rocksdb_flush_cfs(db.c, opts.c, &_cfs[0], C.int(n), &cErr) + err = fromCError(cErr) + } + return +} + // FlushWAL flushes the WAL memory buffer to the file. If sync is true, it calls SyncWAL // afterwards. func (db *DB) FlushWAL(sync bool) (err error) { diff --git a/db_test.go b/db_test.go index fb1d730..7cb211f 100644 --- a/db_test.go +++ b/db_test.go @@ -12,7 +12,11 @@ import ( func TestOpenDb(t *testing.T) { db := newTestDB(t, nil) defer db.Close() + require.EqualValues(t, "0", db.GetProperty("rocksdb.num-immutable-mem-table")) + v, success := db.GetIntProperty("rocksdb.num-immutable-mem-table") + require.EqualValues(t, uint64(0), v) + require.True(t, success) } func TestDBCRUD(t *testing.T) { diff --git a/db_unix.go b/db_unix.go deleted file mode 100644 index 2f92b0a..0000000 --- a/db_unix.go +++ /dev/null @@ -1,28 +0,0 @@ -//go:build !windows -// +build !windows - -package grocksdb - -// #include -// #include "rocksdb/c.h" -import "C" - -import "unsafe" - -// GetIntProperty similar to `GetProperty`, but only works for a subset of properties whose -// return value is an integer. Return the value by integer. -func (db *DB) GetIntProperty(propName string) (value uint64, success bool) { - cProp := C.CString(propName) - success = C.rocksdb_property_int(db.c, cProp, (*C.uint64_t)(&value)) == 0 - C.free(unsafe.Pointer(cProp)) - return -} - -// GetIntPropertyCF similar to `GetProperty`, but only works for a subset of properties whose -// return value is an integer. Return the value by integer. -func (db *DB) GetIntPropertyCF(propName string, cf *ColumnFamilyHandle) (value uint64, success bool) { - cProp := C.CString(propName) - success = C.rocksdb_property_int_cf(db.c, cf.c, cProp, (*C.uint64_t)(&value)) == 0 - C.free(unsafe.Pointer(cProp)) - return -} diff --git a/db_unix_test.go b/db_unix_test.go deleted file mode 100644 index 569a3a8..0000000 --- a/db_unix_test.go +++ /dev/null @@ -1,19 +0,0 @@ -//go:build !windows -// +build !windows - -package grocksdb - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestOpenDbUnix(t *testing.T) { - db := newTestDB(t, nil) - defer db.Close() - require.EqualValues(t, "0", db.GetProperty("rocksdb.num-immutable-mem-table")) - v, success := db.GetIntProperty("rocksdb.num-immutable-mem-table") - require.EqualValues(t, uint64(0), v) - require.True(t, success) -} diff --git a/go.mod b/go.mod index 6c8b597..b491b23 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/linxGnu/grocksdb go 1.17 -require github.com/stretchr/testify v1.8.2 +require github.com/stretchr/testify v1.8.3 require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 6a56e69..c3467ce 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/memory_usage_test.go b/memory_usage_test.go index 677f62d..bfedefa 100644 --- a/memory_usage_test.go +++ b/memory_usage_test.go @@ -1,7 +1,7 @@ package grocksdb import ( - "math/rand" + "crypto/rand" "testing" "github.com/stretchr/testify/assert" diff --git a/options_compaction.go b/options_compaction.go index 43cbb61..2e79286 100644 --- a/options_compaction.go +++ b/options_compaction.go @@ -141,6 +141,16 @@ func (opts *FIFOCompactionOptions) GetMaxTableFilesSize() uint64 { return uint64(C.rocksdb_fifo_compaction_options_get_max_table_files_size(opts.c)) } +// SetAllowCompaction allows compaction or not. +func (opts *FIFOCompactionOptions) SetAllowCompaction(allow bool) { + C.rocksdb_fifo_compaction_options_set_allow_compaction(opts.c, boolToChar(allow)) +} + +// AllowCompaction checks if compaction is allowed. +func (opts *FIFOCompactionOptions) AllowCompaction() bool { + return charToBool(C.rocksdb_fifo_compaction_options_get_allow_compaction(opts.c)) +} + // Destroy deallocates the FIFOCompactionOptions object. func (opts *FIFOCompactionOptions) Destroy() { C.rocksdb_fifo_compaction_options_destroy(opts.c) diff --git a/options_compaction_test.go b/options_compaction_test.go index 4659a8b..6b4ea24 100644 --- a/options_compaction_test.go +++ b/options_compaction_test.go @@ -35,6 +35,10 @@ func TestFifoCompactOption(t *testing.T) { fo.SetMaxTableFilesSize(2 << 10) require.EqualValues(t, 2<<10, fo.GetMaxTableFilesSize()) + + require.False(t, fo.AllowCompaction()) + fo.SetAllowCompaction(true) + require.True(t, fo.AllowCompaction()) } func TestUniversalCompactOption(t *testing.T) { diff --git a/options_ingest.go b/options_ingest.go index 3ae6a18..35c8762 100644 --- a/options_ingest.go +++ b/options_ingest.go @@ -58,6 +58,16 @@ func (opts *IngestExternalFileOptions) SetIngestionBehind(flag bool) { C.rocksdb_ingestexternalfileoptions_set_ingest_behind(opts.c, boolToChar(flag)) } +// SetFailIfNotBottommostLevel sets to TRUE if user wants file to be ingested to the bottommost level. An +// error of Status::TryAgain() will be returned if a file cannot fit in the bottommost level when calling +// DB::IngestExternalFile()/DB::IngestExternalFiles(). +// +// The user should clear the bottommost level in the overlapping range before re-attempt. +// Ingest_behind takes precedence over fail_if_not_bottommost_level. +func (opts *IngestExternalFileOptions) SetFailIfNotBottommostLevel(flag bool) { + C.rocksdb_ingestexternalfileoptions_set_fail_if_not_bottommost_level(opts.c, boolToChar(flag)) +} + // Destroy deallocates the IngestExternalFileOptions object. func (opts *IngestExternalFileOptions) Destroy() { C.rocksdb_ingestexternalfileoptions_destroy(opts.c) diff --git a/transactiondb.go b/transactiondb.go index f2faf67..529c6b0 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -434,6 +434,21 @@ func (db *TransactionDB) FlushCF(cf *ColumnFamilyHandle, opts *FlushOptions) (er return } +// FlushCFs triggers a manual flush for the database on specific column families. +func (db *TransactionDB) FlushCFs(cfs []*ColumnFamilyHandle, opts *FlushOptions) (err error) { + if n := len(cfs); n > 0 { + _cfs := make([]*C.rocksdb_column_family_handle_t, n) + for i := range _cfs { + _cfs[i] = cfs[i].c + } + + var cErr *C.char + C.rocksdb_transactiondb_flush_cfs(db.c, opts.c, &_cfs[0], C.int(n), &cErr) + err = fromCError(cErr) + } + return +} + // FlushWAL flushes the WAL memory buffer to the file. If sync is true, it calls SyncWAL // afterwards. func (db *TransactionDB) FlushWAL(sync bool) (err error) { diff --git a/transactiondb_test.go b/transactiondb_test.go index c7a1daf..ecdd334 100644 --- a/transactiondb_test.go +++ b/transactiondb_test.go @@ -183,3 +183,60 @@ func newTestTransactionDB(t *testing.T, applyOpts func(opts *Options, transactio return db } + +func TestTransactionDBColumnFamilyBatchPutGet(t *testing.T) { + dir := t.TempDir() + + givenNames := []string{"default", "guide"} + + opts := NewDefaultOptions() + opts.SetCreateIfMissingColumnFamilies(true) + opts.SetCreateIfMissing(true) + + db, cfh, err := OpenTransactionDbColumnFamilies(opts, NewDefaultTransactionDBOptions(), dir, givenNames, []*Options{opts, opts}) + require.Nil(t, err) + defer db.Close() + + require.EqualValues(t, len(cfh), 2) + defer cfh[0].Destroy() + defer cfh[1].Destroy() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + ro := NewDefaultReadOptions() + defer ro.Destroy() + + givenKey0 := []byte("hello0") + givenVal0 := []byte("world0") + givenKey1 := []byte("hello1") + givenVal1 := []byte("world1") + + b0 := NewWriteBatch() + defer b0.Destroy() + b0.PutCF(cfh[0], givenKey0, givenVal0) + require.Nil(t, db.Write(wo, b0)) + actualVal0, err := db.GetCF(ro, cfh[0], givenKey0) + defer actualVal0.Free() + require.Nil(t, err) + require.EqualValues(t, actualVal0.Data(), givenVal0) + + b1 := NewWriteBatch() + defer b1.Destroy() + b1.PutCF(cfh[1], givenKey1, givenVal1) + require.Nil(t, db.Write(wo, b1)) + actualVal1, err := db.GetCF(ro, cfh[1], givenKey1) + defer actualVal1.Free() + require.Nil(t, err) + require.EqualValues(t, actualVal1.Data(), givenVal1) + + actualVal, err := db.GetCF(ro, cfh[0], givenKey1) + require.Nil(t, err) + require.EqualValues(t, actualVal.Size(), 0) + actualVal, err = db.GetCF(ro, cfh[1], givenKey0) + require.Nil(t, err) + require.EqualValues(t, actualVal.Size(), 0) + + // trigger flush + require.Nil(t, db.FlushCF(cfh[0], NewDefaultFlushOptions())) + require.Nil(t, db.FlushCFs(cfh, NewDefaultFlushOptions())) +}