首页 > TitanDB 中使用Compaction Filter ,产生了预期之外几十倍的读I/O

TitanDB 中使用Compaction Filter ,产生了预期之外几十倍的读I/O

Compaction过程中 产生大量读I/O 的背景

项目中因大value 需求,引入了PingCap 参考Wisckey 思想实现的key-value分离存储 titan, 使用过程中因为有用到Rocksdb本身的 CompactionFilter功能,所以就直接用TitanDB的option 传入了compaction filter。

使用过程中,单纯的通过db->Put接口写入 就会发现磁盘上大量的读I/O。

Ps : 相关的现象产生时的基本配置就不贴上来了,这个现象用过titan的compaction filter的同学应该都会比较清楚。

如果没有用过,但也发现了一些异常,也可以直接向后看。

我们数据写入量是 key:10B, value: 8K , 磁盘上的读本身是由于compaction引起的,compaction过程中需要将选择的sst文件中的key-value通过迭代器一个一个读取上来做堆排序。这个过程会产生读I/O,也就是只有Compaction 本身会有读I/O。

问题现象是单次compaction的量也就几十M,但磁盘上却产生了数百M的读I/O

更加直观的体现就是通过命令sudo iotop,可以看到此时大量的compaction 线程产生了读IO

在这里插入图片描述

问题分析

这里显然不合理,rocksdb的日志打印出来的LOG 中总共的compaction的带宽也就几十M,因为在titandb的key-value分离存储之后LSM-tree中仅仅存储了key和key-index,所以单次compaction的过程中理论上并不会携带着value参与,这样的大量I/O不太合理。

继续向下看,从iotop的输出中取出来一个compaction的线程ID,sudo strace -ttt -T -p 209278 抓取它的系统调用,可以看到大量的pread64系统调用

1617853714.972743 pread64(14224, "203250206p20/fuid:11288154201^365.
362]22"..., 8190, 13057621) = 12057 <0.000445>
1617853714.973241 pread64(13772, "357212255y20/fuid:11288198201^365.
362]22"..., 8190, 3267429) = 12057 <0.000013>
1617853714.973284 pread64(15591, "343360237320/fuid:11288239201^365.
362]22"..., 8190, 3279482) = 12057 <0.000230>
...

可以看到pread64读到的数据大小是8190B,显然是我们写入的value大小,这货肯定读了存放value的blobfile

随机抽样几个fd ,也就是pread64(14224, "20...)的第一参数,从进程的fd列表中看看它链接得是哪个文件ls -l /proc/xxx/fd | grep 209278

lr-x------ 1 kiwi2 kiwi2 64 Apr  8 11:49 /proc/209235/fd/10029 -> /mnt/db/14/titandb/000681.blob

果然是从blobfile中读取的数据,到这里我们就知道为什么compaction线程会有这么多的读,因为compaction过程中竟然读了blob file中的value。。。陷入沉思,梳理一下titan的写入逻辑。

在这里插入图片描述

  1. Key-value 都和以前rocksdb一样,先写入memtable
  2. 在Flush过程中形成sst文件的时候,通过titan自己的table-builder add的过程中来做区分,大于一个阈值时 分离value写入到blobfile中,key+key-index 存放到LSM-tree 的sst文件中
  3. 后续LSM-tree继续自己的compaction, blobfiles 则在达到触发gc条件的时候由一个线程池的一个线程调度blobfile的过期清理

也就是titan compaction过程中理论上仅仅是sst文件中的key + key-index参与,并不会涉及blobfiles 中的value,要不然key-value分离的意义何在?带宽还是没有降下来。

接下来的分析就更加明了了,看看这个时候大量读的compaction线程调用栈,直接上命令sudo pstack 209278(pstack底层也是调用gdb 执行的,不过是quiet指令执行,并不会阻塞线程),最后能看到如下调用栈

#0  0x00007faa05d93f73 in pread64 () from /lib64/libpthread.so.0
#1  0x000000000095f85e in pread (__offset=16132142, __nbytes=12057, __buf=0x3c074a000, __fd=)
#2  rocksdb::PosixRandomAccessFile::Read(unsigned long, unsigned long, rocksdb::Slice*, char*) const ()
#3  0x0000000000a0c0b1 in rocksdb::RandomAccessFileReader::Read(unsigned long, unsigned long, rocksdb::Slice*, char*, bool) const ()
#4  0x000000000081b197 in rocksdb::titandb::BlobFileReader::ReadRecord(rocksdb::titandb::BlobHandle const&, rocksdb::titandb::BlobRecord*, rocksdb::titandb::OwnedSlice*) ()
#5  0x000000000081ba21 in rocksdb::titandb::BlobFileReader::Get(rocksdb::ReadOptions const&, rocksdb::titandb::BlobHandle const&, rocksdb::titandb::BlobRecord*, rocksdb::PinnableSlice*) ()
#6  0x00000000008428e3 in rocksdb::titandb::BlobFileCache::Get(rocksdb::ReadOptions const&, unsigned long, unsigned long, rocksdb::titandb::BlobHandle const&, rocksdb::titandb::BlobRecord*, rocksdb::PinnableSlice*) ()
#7  0x00000000008396b8 in rocksdb::titandb::BlobStorage::Get(rocksdb::ReadOptions const&, rocksdb::titandb::BlobIndex const&, rocksdb::titandb::BlobRecord*, rocksdb::PinnableSlice*) ()
#8  0x00000000007f4a3b in rocksdb::titandb::TitanCompactionFilter::FilterV2 (this=0x3ceb05b00, level=0, key=..., value_type=, value=..., new_value=0x1be783cf8, skip_until=0x1be783d18)
#9  0x0000000000a2fa1a in InvokeFilterIfNeeded (skip_until=0x7fa9f786e730, need_skip=0x7fa9f786e72f, this=0x1be783b00)
#10 rocksdb::CompactionIterator::InvokeFilterIfNeeded (this=0x1be783b00, need_skip=0x7fa9f786e72f, skip_until=0x7fa9f786e730)
#11 0x0000000000a3039a in rocksdb::CompactionIterator::NextFromInput() ()
#12 0x0000000000a31c5a in rocksdb::CompactionIterator::Next (this=0x1be783b00)
#13 0x0000000000a39658 in rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*) ()
#14 0x0000000000a3aa1c in rocksdb::CompactionJob::Run() ()
#15 0x0000000000887a5b in rocksdb::DBImpl::BackgroundCompaction(bool*, rocksdb::JobContext*, rocksdb::LogBuffer*, rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority) ()
#16 0x000000000088ab44 in rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority) ()
#17 0x000000000088b028 in rocksdb::DBImpl::BGWorkCompaction (arg=)
#18 0x0000000000a1437c in operator() (this=0x7fa9f7870370)
#19 rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long) ()
#20 0x0000000000a144d3 in rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper (arg=)

这个调用栈中可以看到

#11 0x0000000000a3039a in rocksdb::CompactionIterator::NextFromInput() ()
#12 0x0000000000a31c5a in rocksdb::CompactionIterator::Next (this=0x1be783b00)
#13 0x0000000000a39658 in rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*) ()
#14 0x0000000000a3aa1c in rocksdb::CompactionJob::Run() ()

这一些都是正常的compaction逻辑,但是再往上走就进入了compaction filter之中,使用了Titandb的filter函数,并且调用了rocksdb::titandb::BlobStorage::Get,确实,我们用户态用了compaction filter,但不应该调用到blob的Get,好吧。。。

直接看Titan的源代码。

Titan的Compaction Filter实现

在打开TitanDB的时候会将用户传入的compaction_filter作为一个子filter传进来,并且交给titan自己的TitanCompactionFilterFactory来处理

Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs,std::vector<ColumnFamilyHandle*>* handles) { ......std::vector<ColumnFamilyDescriptor> base_descs;std::vector<std::shared_ptr<TitanTableFactory>> titan_table_factories;for (auto& desc : descs) { ......if (cf_opts.compaction_filter != nullptr ||cf_opts.compaction_filter_factory != nullptr) { std::shared_ptr<TitanCompactionFilterFactory> titan_cf_factory =std::make_shared<TitanCompactionFilterFactory>(cf_opts.compaction_filter, cf_opts.compaction_filter_factory,this, desc.options.skip_value_in_compaction_filter, desc.name);cf_opts.compaction_filter = nullptr;cf_opts.compaction_filter_factory = titan_cf_factory;}}// Open base DB.s = DB::Open(db_options_, dbname_, base_descs, handles, &db_);......
}

进入TitanCompactionFilterFactoryCreateCompactionFilter函数

之前介绍Rocksdb的ComapctionFilter实现的时候知道,引擎对外暴漏了这一些接口,能够由用户来指定自己想要过滤什么样的key。

Rocskdb CompactionFilter实现

std::unique_ptr<CompactionFilter> CreateCompactionFilter(const CompactionFilter::Context &context) override { assert(original_filter_ != nullptr || original_filter_factory_ != nullptr);std::shared_ptr<BlobStorage> blob_storage;{ MutexLock l(&titan_db_impl_->mutex_);blob_storage = titan_db_impl_->blob_file_set_->GetBlobStorage(context.column_family_id).lock();}if (blob_storage == nullptr) { assert(false);// Shouldn't be here, but ignore compaction filter when we hit error.return nullptr;}const CompactionFilter *original_filter = original_filter_;std::unique_ptr<CompactionFilter> original_filter_from_factory;if (original_filter == nullptr) { original_filter_from_factory =original_filter_factory_->CreateCompactionFilter(context);original_filter = original_filter_from_factory.get();}return std::unique_ptr<CompactionFilter>(new TitanCompactionFilter(titan_db_impl_, cf_name_, original_filter,std::move(original_filter_from_factory), blob_storage, skip_value_));
}

Factory会将TitanCompactionFilter返回,且这个filter也携带着用户自定义的Filteroriginal_filter。也就是comapction 过程中会先执行TitanCompactionFilterFilterV2函数,接着看一下titandb 的FilterV2函数:

Decision FilterV2(int level, const Slice &key, ValueType value_type,const Slice &value, std::string *new_value,std::string *skip_until) const override { ......BlobRecord record;PinnableSlice buffer;ReadOptions read_options;// 问题源头s = blob_storage_->Get(read_options, blob_index, &record, &buffer);if (s.IsCorruption()) { // Could be cause by blob file beinged GC-ed, or real corruption.// TODO(yiwu): Tell the two cases apart.return Decision::kKeep;} else if (s.ok()) { // 用户自定义的Filter逻辑auto decision = original_filter_->FilterV2(level, key, kValue, record.value, new_value, skip_until);...}
}

可以看到这里会有一个blob_storage_->Get,到此我们就知道为什么会有一个blobfile 的Get了。

因为用户在回掉使用original_filter_->FilterV2逻辑的时候需要知道具体的value,所以Titan这里需要将blobfile中的value传回去。

OK。。。这样啊,那确实没有办法,毕竟想要拥有灵活性,代价是必不可少的。

解决

如果业务中针对compaction filter的需求是不需要value的数据,这里可以避免掉blobfile的Get,设置titan options skip_value_in_compaction_filter = true 即可。

更多相关:

  • 文章目录概览1. UDB 架构2. UDB 表格式3. Rocksdb:针对flash存储优化过的第三方库3.1 Rocksdb架构3.2 为什么选择Rocksdb4. MyRocks / Rocksdb 开发历程4.1 设计目标4.2 性能挑战4.2.1 降低CPU的消耗4.2.2 降低range-scan 的延时消耗4.2.3 磁...

  • 简单记录一些 在linux下 统计进程内部函数运行耗时的统计工具,主要是用作性能瓶颈分析。当然以下工具除了pstack功能单一之外,其他的工具都非常强大,这里仅仅整理特定场景的特定用法,用作协同分析。 以下工具需要追踪具体的进程,如果想要打印信息更全,建议编译的时候将符号信息都编译到二进制文件之中,-g选项 strace str...

  • 想要自己随时随地写一写rocksdb的代码,并且快速测试,但是公司的物理机登陆过于麻烦,想要验证功能的话其实在自己的电脑就完全可以了。 安装 brew install rocksdb,默认二进制文件安装在/usr/local/bin在~/.bashrc或者自己正在使用的shell的rc文件中 加入rocksdb的bin文件所在路径...

  • 过滤器(filter)正如其名,作用就是接收一个输入,通过某个规则进行处理,然后返回处理后的结果。主要用在数据的格式化上,例如获取一个数组中的子集,对数组中的元素进行排序等。ng内置了一些过滤器,它们是:currency(货币)、date(日期)、filter(子串匹配)、json(格式化json对象)、limitTo(限制个数)、l...

  • 练习:用户输入姓名、年龄、工作、爱好 ,然后打印成以下格式------------ info of Egon -----------Name  : EgonAge   : 22Sex   : maleJob   : Teacher ------------- end -----------------完成情况:in_name=inpu...

  •   语法 它通过{}和:来代替%。 “映射”示例 通过位置 In [1]: '{0},{1}'.format('kzc',18) Out[1]: 'kzc,18' In [2]: '{},{}'.format('kzc',18) Out[2]: 'kzc,18' In [3]: '{1},{0},{1}'.forma...

  • --------------------------------------------------------------------------------------- 本系列文章为《机器学习实战》学习笔记,内容整理自书本,网络以及自己的理解,如有错误欢迎指正。 源码在Python3.5上测试均通过,代码及数据 --> http...

  • 首先运行easy_install pymongo命令安装pymongo驱动。然后执行操作: 创建连接 1 In [1]: import pymongo 2 3 In [2]: connection = pymongo.Connection('localhost', 27017) 切换到数据库malware In [3]: db...

  • 代码: public class Person{public int ID { get; set; }public string Name { get; set; }public int Age { get; set; } }public class Dog{public int ID { get; set; }...