本篇仅仅是一个记录 MergeOperator 的使用方式。
Rocksdb 使用MergeOperator 来代替Update 场景中的读改写操作,即用户的一个Update 操作需要调用rocksdb的 Get + Put 接口才能完成。
而这种情况下会引入一些额外的读写放大,对于支持SQL这种update 频繁的场景来说实在是不划算,所以Merge 操作横空出世,用户只需要实现自己的 Merge 操作,通过option 传入,接下来有update 的场景时只需要调用一个Merge
就可以完成了,后续针对当前key的 real update 都会在后台Compaction 以及 用户调研 Get
或者 迭代器操作 时会进行合并。当然,Merge本身也存在问题,就是如果kMergeType得不到及时得compaction 调度,那可能读得负载就重了,因为读需要将之前的未Merge 都进行Merge 才能返回。
因为MergeOperator
虚基类 的函数太多了,会区分 full merge 和 partial merge,但是对于很多用户来说就是一个计数累加或者 string-append 操作,并没有过于复杂的操作,所以rocksdb 提供了 更为通用的虚基类AssociativeMergeOperator
来屏蔽复杂的Full merge 和 partial merge,继承这个类则只需要主体实现一个Merge
和 Name
函数即可。
如下代码使用了Rocksdb 已经封装好的两个 Merge操作,一个是StringAppendOperator
,另一个是UInt64AddOperator
,Merge
本身就是写一个key/value,只不过key的type是kMergeType
,value 也是实际存在的。
StringAppend
的简单测试代码如下,我们使用同一个key进行两次Merge
操作,相当于写入了两条kMergeType
的key到db中,然后调用一次Flush,会生成一个sst文件(进行Merge),再Get会发现这个key的value 按照StringAppend中的行为完成了Merge。
#include
#include #include
#include
#include
#include
#include
#include
#include
#include
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;void OpenDB() { option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));// 默认会用 逗号分隔 不同的mergeoption.merge_operator = MergeOperators::CreateStringAppendOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) { cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() { int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;EncodeFixed64(buf, 2);s = db->Merge(rocksdb::WriteOptions(),key, "2");s = db->Merge(rocksdb::WriteOptions(),key, "3");db->Flush(rocksdb::FlushOptions());if (!s.ok()) { cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) { cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << value << endl;
}int main() { OpenDB();DoWrite();return 0;
}
输出如下:
Finish open !
Finish merge !
Get merge value len: 3 data: 2,3
可以看到Get到的value 已经进行合并了。
这个是一个自增计数的Merge 案例。
需要主要的是如果自己实现 MergeOperator
底层有编解码,那上层用户侧请求的写入也需要 编码方式写入 以及 按照底层的解码方式读取。
Rocksdb实现的案例代码在拿到用户传入的value的时候会进行编解码:
// A 'model' merge operator with uint64 addition semantics
// Implemented as an AssociativeMergeOperator for simplicity and example.
class UInt64AddOperator : public AssociativeMergeOperator { public:bool Merge(const Slice& /*key*/, const Slice* existing_value,const Slice& value, std::string* new_value,Logger* logger) const override { uint64_t orig_value = 0;if (existing_value){ // 解码以存在的value,则我们上层调用Merge 写入的时候需要按照Fixed64进行编码orig_value = DecodeInteger(*existing_value, logger);}uint64_t operand = DecodeInteger(value, logger);assert(new_value);new_value->clear();PutFixed64(new_value, orig_value + operand);return true; // Return true always since corruption will be treated as 0}const char* Name() const override { return "UInt64AddOperator"; }private:// Takes the string and decodes it into a uint64_t// On error, prints a message and returns 0uint64_t DecodeInteger(const Slice& value, Logger* logger) const { uint64_t result = 0;if (value.size() == sizeof(uint64_t)) { result = DecodeFixed64(value.data());} else if (logger != nullptr) { // If value is corrupted, treat it as 0ROCKS_LOG_ERROR(logger, "uint64 value corruption, size: %" ROCKSDB_PRIszt" > %" ROCKSDB_PRIszt,value.size(), sizeof(uint64_t));}return result;}};
案例代码:
#include
#include #include
#include
#include
#include
#include
#include
#include
#include
#include "utilities/merge_operators.h"using namespace rocksdb;using namespace std;
rocksdb::DB* db;
rocksdb::Options option;static bool LittleEndian() { int i = 1;return *((char*)(&i));
}inline uint32_t DecodeFixed32(const char* ptr) { if (LittleEndian()) { // Load the raw bytesuint32_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else { return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));}
}inline uint64_t DecodeFixed64(const char* ptr) { if (LittleEndian()) { // Load the raw bytesuint64_t result;memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain loadreturn result;} else { uint64_t lo = DecodeFixed32(ptr);uint64_t hi = DecodeFixed32(ptr + 4);return (hi << 32) | lo;}
}inline void EncodeFixed64(char* buf, uint64_t value) { if (LittleEndian()) { memcpy(buf, &value, sizeof(value));} else { buf[0] = value & 0xff;buf[1] = (value >> 8) & 0xff;buf[2] = (value >> 16) & 0xff;buf[3] = (value >> 24) & 0xff;buf[4] = (value >> 32) & 0xff;buf[5] = (value >> 40) & 0xff;buf[6] = (value >> 48) & 0xff;buf[7] = (value >> 56) & 0xff;}
}void OpenDB() { option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::BlockBasedTableOptions table_options;table_options.no_block_cache = true;table_options.cache_index_and_filter_blocks = false;option.table_factory.reset(NewBlockBasedTableFactory(table_options));option.merge_operator = MergeOperators::CreateUInt64AddOperator();auto s = rocksdb::DB::Open(option, "./db", &db);if (!s.ok()) { cout << "open faled : " << s.ToString() << endl;exit(-1);}cout << "Finish open !"<< endl;
}void DoWrite() { int j = 0;string key = std::to_string(j);std::string value;char buf[8];rocksdb::Status s;// 因为底层实现的Uint64AddOperator 会进行编码 以及 解码EncodeFixed64(buf, 2);// 对同一个key ,merge 两个2,则最后Get的时候会变成4s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));s = db->Merge(rocksdb::WriteOptions(),key, std::string(buf,8));db->Flush(rocksdb::FlushOptions());if (!s.ok()) { cout << "Merge value failed: " << s.ToString() << endl;exit(-1);}cout << "Finish merge !" << endl;s = db->Get(rocksdb::ReadOptions(), key, &value);if (!s.ok()) { cout << "Get after only merge is failed " << s.ToString() << endl;exit(-1);}cout << "Get merge value " << value.size() << " " << DecodeFixed64(value.data()) << endl;
}int main() { OpenDB();DoWrite();return 0;
}
输出如下:
Finish open !
Finish merge !
Get merge value 8 4
翻页器
在src/main/resources/springmvc-servlet.xml中加入
看了很多人写的好几个去重方法,我在这里精简组合下,适用于已排序与未排序的数组。 废话不多说,上代码。