项目地址:https://github.com/skyzh/mini-lsm
个人实现地址:https://gitee.com/cnyuyang/mini-lsm
在本章中,您将:
- 在上一章的基础上完成读路径,以支持快照读。
- 实现事务API,支持快照读。
- 引擎恢复过程中能正确恢复已提交时间戳。
最后,您的引擎将能够为用户提供存储键(key)的一致视图。
在重构过程中,您可能需要根据需要将某些函数的签名从&self
更改为self: &Arc<Self>
。
要运行测试用例,请执行以下操作:
cargo x copy-test --week 3 --day 3
cargo x scheck
注意:在完成本章后,您还需要通过2.5和2.6的测试用例。
Task 1-LSM Iterator with Read Timestamp
本章的目标是:
let snapshot1 = engine.new_txn(); // 给引擎写点东西 let snapshot2 = engine.new_txn(); // 给引擎写点东西 snapshot1.get(/* ... */); // 我们可以检索引擎先前状态的一致快照
为了达到这个目的,我们可以在创建事务的时候记录下读取的时间戳(也就是最近一次提交的时间戳)。当我们对事务进行读操作时,只会读取低于或等于读取时间戳的所有版本的键(key)。
在此任务中,您需要修改:
src/lsm_iterator.rs
为此,您需要在LsmIterator中记录读取时间戳。
impl LsmIterator {pub(crate) fn new(iter: LsmIteratorInner,end_bound: Bound<Bytes>,read_ts: u64,) -> Result<Self> {// ...} }
你需要改变你的LSM迭代器
next
逻辑来找到正确的键。
先在LsmIterator
结构体中添加相关字段,并修改构造函数:
pub struct LsmIterator {inner: LsmIteratorInner,upper: Bound<Bytes>,prev_key: Vec<u8>,read_ts: u64, // 【新增】
}pub(crate) fn new(iter: LsmIteratorInner, upper: Bound<Bytes>, read_ts: u64) -> Result<Self> {let mut lsm = Self {inner: iter,upper,prev_key: Vec::new(),read_ts, // 【新增】};...
}
next
函数的改造结合Task2
完成
Task 2-Multi-Version Scan and Get
在此任务中,您需要修改:
src/mvcc.rs src/mvcc/txn.rs src/lsm_storage.rs
现在我们在LSM迭代器中有了
read_ts
,我们可以在事务结构上实现scan
和get
,这样我们就可以在存储引擎中的给定点读取数据了。如果需要,我们建议您在
LsmStorageInner
结构中创建像scan_with_ts(/*原始参数*/, read_ts: u64)
和get_with_ts
这样的辅助函数。存储引擎上最初的get/scan
应该实现为创建一个事务(快照),并在该事务上执行get/scan
。调用路径类似于:LsmStorageInner::scan -> new_txn and Transaction::scan -> LsmStorageInner::scan_with_ts
要在
LsmStorageInner::scan
中创建事务,我们需要向事务构造函数提供一个Arc<LsmStorageInner>
。因此,我们可以将scan
的签名更改为采取self: &Arc<Self>
而不是简单的&self
,这样我们就可以用let txn = self.mvcc().new_txn(self.clone(), /* ...*/)
来创建一个事务。您还需要更改
scan
函数以返回一个TxnIterator
。我们必须确保用户迭代引擎时快照是活的,因此,TxnIterator
存储快照对象。在TxnIterator
内部,我们现在可以存储一个FusedIterator<LsmIterator>
。我们稍后在实现OCC时将其更改为其他内容。您暂时不需要实现
Transaction::put/delete
,所有修改仍将通过引擎(MiniLsm
对象)。
LsmMvccInner::new_txn
先实现从mvcc
对象申请一个事务,获取到最新的时间戳,用于构造Transaction
对象
pub fn new_txn(&self, inner: Arc<LsmStorageInner>, serializable: bool) -> Arc<Transaction> {let ts = self.ts.lock();let read_ts = ts.0;Arc::new(Transaction {read_ts,inner,local_storage: Arc::new(SkipMap::new()),committed: Arc::new(AtomicBool::new(false)),key_hashes: None,})
}
Transaction
scan
函数实现需要依赖TxnIterator
、TxnLocalIteratorBuilder
先简单完成这两部分内容,帮助流程继续往下走。
TxnLocalIterator
最少需要实现is_valid
函数:
fn is_valid(&self) -> bool {false
}
is_valid
返回false
表示这个迭代器中没有有效数据。
TxnIterator
最少需要实现他的构造函数以及next
函数
// 构造函数
pub fn create(txn: Arc<Transaction>,iter: TwoMergeIterator<TxnLocalIterator, FusedIterator<LsmIterator>>,
) -> Result<Self> {Ok(Self { _txn: txn, iter })
}// next函数
fn next(&mut self) -> Result<()> {self.iter.next()?;Ok(())
}
get&scan,分别调用LsmStorageInner
的get_with_ts
、scan_with_ts
函数:
// get函数实现
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {self.inner.get_with_ts(key, self.read_ts)
}// scan函数实现
pub fn scan(self: &Arc<Self>, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<TxnIterator> {let local_iter = TxnLocalIteratorBuilder {map: self.local_storage.clone(),iter_builder: |map| map.range((map_bound(lower), map_bound(upper))),item: (Bytes::new(), Bytes::new()),}.build();TxnIterator::create(self.clone(),TwoMergeIterator::create(local_iter,self.inner.scan_with_ts(lower, upper, self.read_ts)?,)?,)
}
get_with_ts、scan_with_ts
开始之前先实现new_txn
,以及实现新的get
、scan
函数,将原来的函数签名修改为get_with_ts
、scan_with_ts
pub fn new_txn(self: &Arc<Self>) -> Result<Arc<Transaction>> {Ok(self.mvcc().new_txn(self.clone(), self.options.serializable))
}// 新实现
pub fn scan<'a>(self: &'a Arc<Self>,_lower: Bound<&[u8]>,_upper: Bound<&[u8]>,
) -> Result<TxnIterator> {let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);txn.scan(_lower, _upper)
}// 历史代码,修改函数签名
pub fn scan_with_ts(&self,_lower: Bound<&[u8]>,_upper: Bound<&[u8]>,read_ts: u64, // 新加参数
) -> Result<FusedIterator<LsmIterator>> {...// 历史代码
}// 新实现
pub fn get<'a>(self: &'a Arc<Self>, key: &[u8]) -> Result<Option<Bytes>> {let txn = self.mvcc().new_txn(self.clone(), self.options.serializable);txn.get(key)
}// 历史代码,修改函数签名
pub fn get_with_ts(&self, key: &[u8], read_ts: u64) -> Result<Option<Bytes>> {...// 历史代码
}
首先我们分析一下引入时间戳后的键值对&时间戳的布局:
有以下两点特点:
-
单个SST中,存在重复的key,重复key是连续排布,但是时间戳ts不同(取决于比较规则)
-
整体看,key依旧是从小到大排布,先出现所有的
a
再出现b
get_with_ts
因为get
操作为点查,所以只需要将所有查询的起始时间设置为read_ts
,这里各举一个例子。
// Memtable
let memtable = snapshot.memtable.scan(Bound::Included(KeySlice::from_slice(key, read_ts)), // 修改Bound::Included(KeySlice::from_slice(key, TS_RANGE_END)),
);// SST
SsTableIterator::create_and_seek_to_key(table, KeySlice::from_slice(key, read_ts))?;
scan_with_ts的实现方式有很多,这里仅仅展示我的代码逻辑,对于scan
扫描范围不做修改,SsTableIterator
返回的是a
到b
所有的
还是用SST1
、SST2
举例,如果read_ts
为99,那么需要返回给用户的应该为其中绿色部分:
SsTableIterator
则需要实现将灰色部分过滤的功能。实现判断逻辑为:
key:a ts:100
:通过时间戳判断key:b ts:100
:通过时间戳判断key:b ts:96
:通过prev_key
判断key:c ts:99
:通过is_empty()
判断key:c ts:97
:通过prev_key
判断
首先对于上层函数来说SsTableIterator
当前的值是否有效不能仅仅通过key
是否在搜索范围内判断,还要加上时间戳的判断。同时我们发现,有效的数据不是连续的,所以SsTableIterator
在碰到第一个无效数据时不能停止搜索,只有key
不在范围内才能停止。,为达成以上目的,需要作如下修改:
fn is_valid(&self) -> bool {self.is_key_valid() && self.read_ts >= self.inner.key().ts()
}fn is_key_valid(&self) -> bool {if !self.inner.is_valid() {return false;}let mut is_valid = true;match self.upper.as_ref() {Bound::Included(upper) => is_valid = self.inner.key().key_ref() <= upper.as_ref(),Bound::Excluded(upper) => is_valid = self.inner.key().key_ref() < upper.as_ref(),Bound::Unbounded => {}}is_valid
}
将原来is_valid
内容移至is_key_valid
中,is_valid
新增时间戳判断,is_valid
供外部调用,is_key_valid
迭代器内部使用。
构造函数new
:
pub(crate) fn new(iter: LsmIteratorInner, upper: Bound<Bytes>, read_ts: u64) -> Result<Self> {let mut lsm = Self {inner: iter,upper,prev_key: Vec::new(),read_ts,};while lsm.is_key_valid() && (lsm.inner.key().ts() > read_ts || lsm.value().is_empty()) {if lsm.value().is_empty() {lsm.prev_key = lsm.key().to_vec();}lsm.next();}if lsm.is_key_valid() {lsm.prev_key = lsm.key().to_vec();}Ok(lsm)
}
next
函数:
fn next(&mut self) -> Result<()> {self.inner.next();if self.inner.is_valid() {if self.inner.key().ts() > self.read_ts {return self.next();}if self.inner.value().is_empty() {self.prev_key = self.key().to_vec();return self.next();}if self.prev_key == self.key().to_vec() {return self.next();}self.prev_key = self.key().to_vec();}Ok(())
}
除此之外还需要修改scan_with_ts
中处理Excluded
的逻辑,在scan
中会通过SsTableIterator::create_and_seek_to_key
找到左边界,然后通过:
if iter.is_valid() && iter.key().key_ref() == key {iter.next()?;
}
跳过当前值。这个操作在非mvcc
版本是生效的,因为非mvcc
版本不存在相同的键。然后我们此前修改的逻辑是LsmIterator
中的,也在SstConcatIterator
不生效。为适配mvcc
,只要把if
改成while
就能完整的跳过这个字段的所有版本:
while iter.is_valid() && iter.key().key_ref() == key {iter.next()?;
}
Task 3-Store Largest Timestamp in SST
在此任务中,您需要修改:
src/table.rs src/table/builder.rs
在SST编码中,您应该在块元数据之后存储最大的时间戳,并在加载SST时恢复它。这将有助于系统在恢复系统时确定最新的提交时间戳。
编码encode_block_meta
:
pub fn encode_block_meta(block_meta: &[BlockMeta],max_ts: u64, // 【新增】最大的时间戳#[allow(clippy::ptr_arg)] buf: &mut Vec<u8>,
) {let original_len = buf.len();buf.put_u32(block_meta.len() as u32);for meta in block_meta {... }buf.put_u64(max_ts); // 【新增】写入最大的时间戳buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
}
解码decode_block_meta
:
pub fn decode_block_meta(mut buf: &[u8]) -> Result<(Vec<BlockMeta>, u64)> { //【修改】修改返回值类型let num = buf.get_u32();let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);let mut block_meta: Vec<BlockMeta> = Vec::with_capacity(num as usize);for i in 0..num {...}let max_ts = buf.get_u64(); // 【新增】读取最大的时间戳if buf.get_u32() != checksum {bail!("meta checksum mismatched");}Ok((block_meta, max_ts)) // 【修改】返回最大时间戳
}
然后同步修改调用点。需要在SsTableBuilder
结构体中新增成员变量max_ts
,调用add
自动更新该变量,最后在build
函数中传入encode_block_meta
。
Task 4-Recover Commit Timestamp
现在我们有了SST中的最大时间戳信息和WAL中的时间戳信息,我们可以获取引擎启动前提交的最大时间戳,并在创建mvcc对象时使用该时间戳作为最新提交的时间戳。
如果没有启用WAL,您可以简单地通过找到SST中最大的时间戳来计算最新提交的时间戳。如果启用了WAL,您应该进一步迭代所有恢复的memtable,并找到最大的时间戳。
在此任务中,您需要修改:
src/lsm_storage.rs
我们没有此部分的测试用例。在完成本节之后,您应该通过前几章(包括2.5和2.6)的所有持久性测试。
修改LsmStorageInner::open
函数:
let mut last_commit_ts = 0; // 声明last_commit_ts
if !manifest_path.exists() {manifest = Some(Manifest::create(manifest_path)?);
} else {... for table_id in state.l0_sstables.iter().chain(state.levels.iter().map(|(_, files)| files).flatten()){let table_id = *table_id;let sst = SsTable::open(table_id,Some(block_cache.clone()),FileObject::open(&Self::path_of_sst_static(path, table_id)).context("failed to open SST")?,)?;last_commit_ts = last_commit_ts.max(sst.max_ts()); // 读取SST,更新last_commit_tsstate.sstables.insert(table_id, Arc::new(sst));}if options.enable_wal { // 打开WAL日志场景...let max_ts = memtable.map.iter().map(|x| x.key().ts()).max().unwrap_or_default();last_commit_ts = last_commit_ts.max(max_ts); // 读取Memtable,更新last_commit_tsif !memtable.is_empty() {state.imm_memtables.insert(0, Arc::new(memtable));}}
...
let storage = Self {...mvcc: Some(LsmMvccInner::new(last_commit_ts)), // 使用恢复出来的ast_commit_ts创建mvcc对象...
};Ok(storage)