文章目录
- key 的分布
- key_range 的分布
- QPS 的分布
最近在用 mixgraph 负载对 RocksDB 进行测试,其是 facebook 提供的一种基于 db_bench 输入的负载,整合在了 RocksDB 的源码中,同 fillrandom、fillseq 等等一样的调用流程。
苦于 mixgraph 的文档内容是在太少了,有些概念也不确定,于是去翻阅源码来学习,收获还是很大的,理解了其对 key 的幂分布、对 keyrange 双指数分布(two-term-exponential distribution)、对 QPS 进行 sine 分布 这三大要素。先总结一下文档的内容,如下:
- key 按照 f(x) = ax^b 来分布,参数为
-key_dist_a=<> -key_dist_b=<>
- keyrange 按照 f(x) = aexp(bx) + cexp(dx) 来分布,参数为
-keyrange_dist_a=<> -keyrange_dist_b=<> -keyrange_dist_c=<> -keyrange_dist_d=<> -keyrange_num=<>
- QPS 按照 f(x) = asine(bx+c) +d 来分布,参数为
-sine_a=<> -sine_b=<> -sine_c=<> -sine_d=<> -sine_mix_rate_interval_milliseconds=<>
。
本篇博客将详细介绍这三个分布到底是什么意思,以及具体是怎么实现的。db_bench 的调用链就不赘述了,整个大流程在上一篇博客(db_bench源码(一):random模式的写入)中介绍的很详细,这里直接从负载入口函数 MixGraph
讲起,依次解释三个概念。
假设我们给的参数为:
-num=10000 \ # 1w条key
-key_dist_a=0.002312 \
-key_dist_b=0.3467 \
-keyrange_dist_a=5.18 \
-keyrange_dist_b=-2.917 \
-keyrange_dist_c=0.0164 \
-keyrange_dist_d=-0.08082 \
-keyrange_num=30 \
-sine_a=12000 \
-sine_b=0.035 \
-sine_c=4.17 \
-sine_d=32000 \
-sine_mix_rate_interval_milliseconds=50 \
-sine_mix_rate=true \
key 的分布
先说结论,f(x) = ax^b 中,x 就是 key 本身(db_bench 生成 key 是通过 uint64_t 转成 char* 的),而 f(x) 就是这个 key 出现的概率,当然,如果用 key 本身去算,f(x) 是 > 1 的,所以这里的 f(x) 并不是归一化的概率,因此只反应概率分布,也就是 key 越大概率越大,符合 ax^b 曲线。
代码是如何这个分布的,这就很有意思了。把 MixGraph 中相关的代码提取出来,如下:
void MixGraph(ThreadState* thread) {// ...ini_rand = GetRandomKey(&thread->rand);rand_v = ini_rand % FLAGS_num;double u = static_cast<double>(rand_v) / FLAGS_num;key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);Random64 rand(key_seed);key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
}
可以看到,它先生成了一个初始随机 key,随机方式和 random 负载一样,是平等随机。接下来,它将这个 key 缩成一个小数 rand_v (归一化),然后调用 PowerCdfInversion 进行一次映射,最后把映射结果当作 seed 重新进行随机,得到 key。即,把平等随机生成的 key 做了一次映射,那么关键就在这次映射,其代码如下。
// inversion of y=ax^b
int64_t PowerCdfInversion(double u, double a, double b) {double ret;ret = std::pow((u / a), (1 / b));return static_cast<int64_t>(ceil(ret));
}
代码很简单,就是进行关于 ax^b 的逆进行映射。为什么进行逆映射就能保证最终的 key 分布是符合 ax^b 的,画一张图就明白了,如下:
在 x 轴上是平等随机分布的,假设 [a1, b1] 与 [c1, d1] 长度一样,那么他们对应的概率也就一样,故 key 的数量也基本一样。对应映射之后的结果,[a2, b2] 与 [c2, d2] 的 key 数量基本一样。然后,[c2, d2] 的长度明显窄于 [a2, d2] 的,故在整个 y 轴上,[c2, d2] 的密度是大于的 [a2,b2] 的,因此 [c2, d2] 之间某个点被选中的概率就更大。
这样一来,将映射后的 key 作为输入,就满足 f(x) = ax^b 的概率分布了。
key_range 的分布
上一节讲了对 key 总体的分布,是不区分 range 的,也就是从 startkey ~ endkey 均是这个分布。而 mixgraph 表示,真实情况下对 key 的访问频率是带有一定的 range 的,且这些 range 的概率符合双指数分布:
From our observations, the prefix hotness (key-range hotness) follows the two-term-exponential distribution: f(x) = aexp(bx) + cexp(dx). However, we cannot directly use the inverse function to decide a key-range from a random distribution. To achieve it, we create a list of KeyrangeUnit, each KeyrangeUnit occupies a range of integers whose size is decided based on the hotness of the key-range. When a random value is generated based on uniform distribution, we map it to the KeyrangeUnit Vec and one KeyrangeUnit is selected. The probability of a KeyrangeUnit being selected is the same as the hotness of this KeyrangeUnit. After that, the key can be randomly allocated to the key-range of this KeyrangeUnit, or we can based on the power distribution (y=ax^b) to generate the offset of the key in the selected key-range. In this way, we generate the keyID based on the hotness of the prefix and also the key hotness distribution.
mixgraph 会把 key 分成同样大小的多个 range,数量由 keyrange_num 决定,比如这个示例中就是 30。而 f(x) = aexp(bx) + cexp(dx) 不再指 key 的概率,而是 range 的概率,换句话说就是 prefix。也就是,30 个 range,每一个都会被算一次 f(x) ,表示 key 会出现在其中的概率。
来看代码:
void MixGraph(ThreadState* thread) {// ...GenerateTwoTermExpKeys gen_exp; // Decide if user wants to use prefix based key generationif (FLAGS_keyrange_dist_a != 0.0 || FLAGS_keyrange_dist_b != 0.0 ||FLAGS_keyrange_dist_c != 0.0 || FLAGS_keyrange_dist_d != 0.0) {use_prefix_modeling = true;gen_exp.InitiateExpDistribution(FLAGS_num, FLAGS_keyrange_dist_a, FLAGS_keyrange_dist_b,FLAGS_keyrange_dist_c, FLAGS_keyrange_dist_d);}
}
这里先构造了个 GenerateTwoTermExpKeys,用于后续生成 key,来看一下函数 InitiateExpDistribution,它将决定每个 range 的概率。
Status InitiateExpDistribution(int64_t total_keys, double prefix_a,double prefix_b, double prefix_c,double prefix_d) {int64_t amplify = 0;int64_t keyrange_start = 0;if (FLAGS_keyrange_num <= 0) {keyrange_num_ = 1;} else {keyrange_num_ = FLAGS_keyrange_num;}keyrange_size_ = total_keys / keyrange_num_;// Calculate the key-range shares size based on the input parametersfor (int64_t pfx = keyrange_num_; pfx >= 1; pfx--) {// Step 1. Calculate the probability that this key range will be// accessed in a query. It is based on the two-term expoential// distributiondouble keyrange_p = prefix_a * std::exp(prefix_b * pfx) +prefix_c * std::exp(prefix_d * pfx);if (keyrange_p < std::pow(10.0, -16.0)) {keyrange_p = 0.0;}// Step 2. Calculate the amplify// In order to allocate a query to a key-range based on the random// number generated for this query, we need to extend the probability// of each key range from [0,1] to [0, amplify]. Amplify is calculated// by 1/(smallest key-range probability). In this way, we ensure that// all key-ranges are assigned with an Integer that >=0if (amplify == 0 && keyrange_p > 0) {amplify = static_cast<int64_t>(std::floor(1 / keyrange_p)) + 1;}// Step 3. For each key-range, we calculate its position in the// [0, amplify] range, including the start, the size (keyrange_access)KeyrangeUnit p_unit;p_unit.keyrange_start = keyrange_start;if (0.0 >= keyrange_p) {p_unit.keyrange_access = 0;} else {p_unit.keyrange_access =static_cast<int64_t>(std::floor(amplify * keyrange_p));}p_unit.keyrange_keys = keyrange_size_;keyrange_set_.push_back(p_unit);keyrange_start += p_unit.keyrange_access;}keyrange_rand_max_ = keyrange_start;// Step 4. Shuffle the key-ranges randomly// Since the access probability is calculated from small to large,// If we do not re-allocate them, hot key-ranges are always at the end// and cold key-ranges are at the begin of the key space. Therefore, the// key-ranges are shuffled and the rand seed is only decide by the// key-range hotness distribution. With the same distribution parameters// the shuffle results are the same.Random64 rand_loca(keyrange_rand_max_);for (int64_t i = 0; i < FLAGS_keyrange_num; i++) {int64_t pos = rand_loca.Next() % FLAGS_keyrange_num;assert(i >= 0 && i < static_cast<int64_t>(keyrange_set_.size()) &&pos >= 0 && pos < static_cast<int64_t>(keyrange_set_.size()));std::swap(keyrange_set_[i], keyrange_set_[pos]);}// Step 5. Recalculate the prefix start postion after shufflingint64_t offset = 0;for (auto& p_unit : keyrange_set_) {p_unit.keyrange_start = offset;offset += p_unit.keyrange_access;}return Status::OK();
}
首先,从尾向头遍历,对所有 range 计算概率(keyrange_p),通过公式 aexp(bx) + cexp(dx), x 就是这个 range 的编号。接下来,对概率进行一定程度的放大,接着根据这个概率计算出 keyrange_access。这里解释一下 keyrange_start 和 keyrange_access,前者就是这个 range 代表开始 key,而后者代表这个 range 的范围(1 + keyrange_p),以此来表示这个 range 出现的概率。比如说,rangeA、rangeB 和 rangeC,A 的概率为 0.5,B 的为 0.3,C 的为 0.2,那么三者的 key 范围依次为 [0, 1.5),[1.5, 2.8),[2.8, 4.1)。简而言之,通过控制 range 的范围来控制概率,但这个范围不是最终插入的 key 范围,而是映射的范围。
由于 f(x) 的递减的,所以从尾遍历的 range 一定是有序排列的。如 Step4 中所述,mixgraph 会打乱这种排序,但是每一个 range 的 keyrange_access 还是原来,即概率不变。
至此,每一个 range 的范围确定了,接下来 MixGraph 函数会通过这些 range 来生成 key,方式依然是先平等随机生成,然后映射。
void MixGraph(ThreadState* thread) {// ...} else if (use_prefix_modeling) {key_rand =gen_exp.DistGetKeyID(ini_rand, FLAGS_key_dist_a, FLAGS_key_dist_b);} else {key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);Random64 rand(key_seed);key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;}// ...
}
其中,ini_rand 就是随机的初始 key,下面那个分支就是第一节讲的 key 分布,这里要走上面那个分支了。可以看到,如果启用了 key_range,那么就不再使用第一节的 key 分布方式。实际上,不是不用了,而是不再整个 key 范围上用了,改为了在每个 range 上用,所以还是要传入 key_dist_a 和 key_dist_b。来看代码:
// Generate the Key ID according to the input ini_rand and key distribution
int64_t DistGetKeyID(int64_t ini_rand, double key_dist_a,double key_dist_b) {int64_t keyrange_rand = ini_rand % keyrange_rand_max_;// Calculate and select one key-range that contains the new keyint64_t start = 0, end = static_cast<int64_t>(keyrange_set_.size());while (start + 1 < end) {int64_t mid = start + (end - start) / 2;assert(mid >= 0 && mid < static_cast<int64_t>(keyrange_set_.size()));if (keyrange_rand < keyrange_set_[mid].keyrange_start) {end = mid;} else {start = mid;}}int64_t keyrange_id = start;// Select one key in the key-range and compose the keyIDint64_t key_offset = 0, key_seed;if (key_dist_a == 0.0 || key_dist_b == 0.0) {key_offset = ini_rand % keyrange_size_;} else {double u =static_cast<double>(ini_rand % keyrange_size_) / keyrange_size_;key_seed = static_cast<int64_t>(ceil(std::pow((u / key_dist_a), (1 / key_dist_b))));Random64 rand_key(key_seed);key_offset = rand_key.Next() % keyrange_size_;}return keyrange_size_ * keyrange_id + key_offset;
}
函数分为两个部分。
- 首先,通过二分找到初始 key 处于哪一个 range。
- 然后,在该 range 中运行第一节的 key 分布映射,并根据 range id 和 size 组装为最后的 key 返回。
至此,key 的分布就完成了。总结一下,key 的分布是按 range 的,range 之间的概率分布符合 aexp(bx) + cexp(dx),而 range 内部的 key 概率分布符合 ax^b。二者实现控制概率的方式都是对平等随机的初始 key 进行不平等映射。
QPS 的分布
QPS 就很好理解了,常常与 IOPS 对应,后者一般指底层每秒实际执行成功的 IO 次数,前者则指上层每秒下发的 IO 数,虽然叫作 query,但读和写都包括。mixgraph 的 QPS 符合 asine(bx+c) +d 分布,这里的 x 是时间,单位为 sine_mix_rate_interval_milliseconds(ms),即每过这么多时间重新计算一下 QPS,来调整下发速率。
在 MixGraph 中,每执行完一次 op,都会判断时间是否超过 interval,如果是,则更改 QPS,如下:
void MixGraph(ThreadState* thread) {// ...Duration duration(FLAGS_duration, reads_);uint64_t last_record_time = FLAGS_env->NowMicros();while (!duration.Done(1)) {// 按第一、二节的方式生成 key// ...// change the qpsif (FLAGS_sine_mix_rate &&usecs_since_last > (FLAGS_sine_mix_rate_interval_milliseconds *uint64_t{1000}) ||!set_rate) {set_rate = true;double usecs_since_start =static_cast<double>(now - thread->stats.GetStart());thread->stats.ResetSineInterval();double mix_rate_with_noise = AddNoise(SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);write_rate = mix_rate_with_noise * query.ratio_[1];if (read_rate > 0) {thread->shared->read_rate_limiter->SetBytesPerSecond(static_cast<int64_t>(read_rate));}if (write_rate > 0) {thread->shared->write_rate_limiter->SetBytesPerSecond(static_cast<int64_t>(write_rate));}}// 开始读写// ...}
}
上述代码中,核心是 SineRate,其会通过时间计算 QPS,如下:
double SineRate(double x) {return FLAGS_sine_a * sin((FLAGS_sine_b * x) + FLAGS_sine_c) + FLAGS_sine_d;
}
计算完成后,对其进行加噪,然后就按照指定的 read、write 、seek 比率把 QPS 分配出去,这里省去解释如何加噪。而 QPS 具体是怎么限制 op 的下发速度的,就是 rate_limiter 的活了。可以看到,当分配完 read_rate 和 write_rate后,会调用 rate_limiter 的 SetBytesPerSecond。实际上,就是赋个值:
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {// assert(bytes_per_second > 0);rate_bytes_per_sec_ = bytes_per_second;refill_bytes_per_period_.store(CalculateRefillBytesPerPeriod(bytes_per_second),std::memory_order_relaxed);
}
rate_bytes_per_sec_ 实际上就是一个存量,指接下来一秒内最多能下发的 bytes 数,注意,是 bytes 数而不是 op 数。
回到 MixGraph,当设置了 rate_limiter 之后,开始去执行读操作了,执行完毕后,发现其调用了 rate_limiter 的 Request,如下:
void MixGraph(ThreadState* thread) {// ...Duration duration(FLAGS_duration, reads_);uint64_t last_record_time = FLAGS_env->NowMicros();while (!duration.Done(1)) {// 按第一、二节的方式生成 key// ...// change the qpsif (FLAGS_sine_mix_rate &&usecs_since_last > (FLAGS_sine_mix_rate_interval_milliseconds *uint64_t{1000}) ||!set_rate) {// 设置 rate_limiter}// 开始读写if (query_type == 0) {// 读thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH,nullptr /*stats*/);} else if (query_type == 1){// 写thread->shared->write_rate_limiter->Request(1, Env::IO_HIGH,nullptr /*stats*/);} else if (query_type == 2){// seekthread->shared->read_rate_limiter->Request(1, Env::IO_HIGH,nullptr /*stats*/);}// ...}
}
可以看到,任何一个操作执行完毕后都会调用 Request,去申请 1 个 byte。Request 的代码量有点大,逻辑也有点复杂,因为 rocksdb 在里面实现了 IO Priority。但是大致控制 QPS 的逻辑是:它是一个阻塞函数,当 bytes 还有冗余时,就放行,同时 bytes - n。
因此,当 read 的 QPS 设为 1000 的话,那么 rate_limiter 中就有 1000 个 bytes 存量,每一个 read 操作消耗一个,故前 1000 个 read 顺利执行,后续则会被阻塞。当时间超过 interval 后,重新计算 QPS,然后赋予 rate_limter 新的 bytes 存量,这样之前的 read 就会被放行,以此重复。
至此,MixGraph 的 QPS 分布梳理完毕。