首页 > Rocksdb iterator和snapshot 接口

Rocksdb iterator和snapshot 接口

Rocksdb提供迭代器来来访问整个db中的数据,就像STL中的迭代器功能一样,用来访问容器中的具体的数据。

访问形式以及访问接口有如下几种:

  • 遍历所有的key-value
    //打开db,并初始化一个迭代器指针
    rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl;
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    delete it;
    
  • 输出一个范围内的key-value,[small, big)
    for (it->Seek(small);it->Valid() && it->key().ToString() < big;it->Next()) { 
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    
  • 反向遍历db中的元素
    for (it->SeekToLast(); it->Valid(); it->Prev()) { 
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    
  • 反向遍历一个指定范围的key,如(small, big]
    for (it->SeekForPrev(start);it->Valid() && it->key().ToString() > limit;it->Prev()) { 
    ...
    }
    assert(it->status().ok()); // Check for any errors found during the scan
    

迭代器的接口可以算是 rocksdb针对客户端的核心接口,主要是提供排序以及高效查找的功能。

测试代码如下:

#include 
#include 
#include 
#include 
#include 
#include 
#include using namespace std;static string rand_key(unsigned long long key_range) { char buff[30];unsigned long long n = 1;for (int i =1; i <= 4; ++i) { n *= (unsigned long long ) rand();}sprintf(buff, "%llu", n % key_range);string k(buff);return k;
}int main() { rocksdb::DB *db;rocksdb::Options option;option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);if (!s.ok()) { cout << "Open failed with " << s.ToString() << endl;exit(1);}rocksdb::DestroyDB("./iterator_db", option);cout << "seek all keys : " << endl;for(int i = 0; i < 5; i ++) { rocksdb::Status s = db->Put(rocksdb::WriteOptions(), rand_key(9), string(10, 'a' + (i % 26)) );if (!s.ok()) { cout << "Put failed with " << s.ToString() << endl;exit(1);}}   /* traverse rocksdb key-value */rocksdb::Iterator *it = db->NewIterator(rocksdb::ReadOptions());for (it->SeekToFirst(); it->Valid(); it->Next()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl;}string limit="4";string start="2";cout << "seek from '2' to '4' : " << endl;for(it->Seek(start); it->Valid()&&it->key().ToString() < limit;it->Next()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl;} assert(it->status().ok());cout << "seek from last to start :" << endl;for (it->SeekToLast(); it->Valid(); it->Prev()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl;}assert(it->status().ok());cout << "seek from '4' to '2' :" << endl;for(it->SeekForPrev(limit); it->Valid()&&it->key().ToString() > start;it->Prev()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl;} assert(it->status().ok());delete it;db->Close();delete db;return 0;
}

输出如下:

seek all keys : 
3: cccccccccc
4: dddddddddd
7: bbbbbbbbbb
8: eeeeeeeeee
seek from '2' to '4' : 
3: cccccccccc
seek from last to start :
8: eeeeeeeeee
7: bbbbbbbbbb
4: dddddddddd
3: cccccccccc
seek from '4' to '2' :
4: dddddddddd
3: cccccccccc

且上层使用rocksdb迭代器接口时一般会和snapshot接口一同使用,用来实现MVCC的版本控制功能。

关于snapshot的实现,我们在Rocksdb事务:隔离性的实现中有提到,感兴趣的可以看看。

关于snapshot的客户端接口主要有:

  • sp1 = db->GetSnapshot(); 在当前db状态下创建一个snapshot,添加到内部维护的一个全局的snapshotImpl的双向链表中,并返回该snapshot的对象
  • read_option.snapshot = sp1; 将获取到的snapshot 传给read_option,进行Get操作
  • db->ReleaseSnapshot(sp1); 释放snapshot相关的资源(从双向链表中删除该节点)

隔离性的测试代码如下:

#include 
#include 
#include 
#include 
#include 
#include 
#include using namespace std;int main() { rocksdb::DB *db;rocksdb::Options option;option.create_if_missing = true;option.compression = rocksdb::CompressionType::kNoCompression;rocksdb::Status s = rocksdb::DB::Open(option, "./iterator_db", &db);if (!s.ok()) { cout << "Open failed with " << s.ToString() << endl;exit(1);}// set a snapshot before putconst rocksdb::Snapshot *sp1 = db->GetSnapshot(); s = db->Put(rocksdb::WriteOptions(), "sp2", "value_sp2");assert(s.ok());// set a snapshot after putconst rocksdb::Snapshot *sp2 = db->GetSnapshot();rocksdb::ReadOptions read_option;read_option.snapshot = sp1;string value = "";//预期获取不到sp2的value,因为这里用的是sp1的快照s = db->Get(read_option, "sp2", &value); if(value == "") { cout << "Can't get sp2 at sp1!" << endl;}read_option.snapshot = sp2;// 能够获取到,使用的是sp2的快照,其是在put之后设置的s = db->Get(read_option, "sp2", &value); assert(s.ok());if(value != "") { cout << "Got sp2's value: " << value << endl;}db->ReleaseSnapshot(sp1);db->ReleaseSnapshot(sp2);

输出如下:

Can't get sp2 at sp1!
Got sp2's value: value_sp2

当然rocksdb也提供了更为复杂的mvcc特性,来以事务的方式支持不同的隔离级别。

更多相关:

  • 文章目录概览1. UDB 架构2. UDB 表格式3. Rocksdb:针对flash存储优化过的第三方库3.1 Rocksdb架构3.2 为什么选择Rocksdb4. MyRocks / Rocksdb 开发历程4.1 设计目标4.2 性能挑战4.2.1 降低CPU的消耗4.2.2 降低range-scan 的延时消耗4.2.3 磁...

  • Compaction过程中 产生大量读I/O 的背景 项目中因大value 需求,引入了PingCap 参考Wisckey 思想实现的key-value分离存储 titan, 使用过程中因为有用到Rocksdb本身的 CompactionFilter功能,所以就直接用TitanDB的option 传入了compaction filt...

  • 简单记录一些 在linux下 统计进程内部函数运行耗时的统计工具,主要是用作性能瓶颈分析。当然以下工具除了pstack功能单一之外,其他的工具都非常强大,这里仅仅整理特定场景的特定用法,用作协同分析。 以下工具需要追踪具体的进程,如果想要打印信息更全,建议编译的时候将符号信息都编译到二进制文件之中,-g选项 strace str...

  • 想要自己随时随地写一写rocksdb的代码,并且快速测试,但是公司的物理机登陆过于麻烦,想要验证功能的话其实在自己的电脑就完全可以了。 安装 brew install rocksdb,默认二进制文件安装在/usr/local/bin在~/.bashrc或者自己正在使用的shell的rc文件中 加入rocksdb的bin文件所在路径...

  •  翻页器

  •     在src/main/resources/springmvc-servlet.xml中加入

  • 本篇仅仅是一个记录 MergeOperator 的使用方式。 Rocksdb 使用MergeOperator 来代替Update 场景中的读改写操作,即用户的一个Update 操作需要调用rocksdb的 Get + Put 接口才能完成。 而这种情况下会引入一些额外的读写放大,对于支持SQL这种update 频繁的场景来说实在是不划...

  • 看了很多人写的好几个去重方法,我在这里精简组合下,适用于已排序与未排序的数组。 废话不多说,上代码。 数组去重