草庐IT

leveldb实现之写入流程

Vincent72143Lau 2023-03-28 原文

Leveldb是非常经典的C++实现的KV存储,代码简短,关键代码2W行以内,麻雀虽小,五脏俱全,大神之作,代码风格看着舒服。RocksDB便是fork自leveldb做出了优化和功能增强,成为常用的工业级KV数据库引擎。设计与实现值得学习,也可以作为研究RockDB引擎的基础,也为大型数据库Cassandra、ScyllaDB等LSM架构的数据学习做技术储备。

本篇主要介绍leveldb的写入流程,主要流程是先写入预写日志(WAL)然后写入memtable,最后通过只读memtable刷盘为sstable

知识准备

写入示例

leveldb库的基本使用

#include <iostream>
#include "leveldb/db.h"
#include <cassert>

using namespace std;

namespace ld=leveldb;

int main(){
    ld::DB *db;
    ld::Options options;
    options.create_if_missing=true;

    ld::Status status=ld::DB::Open(options,"/tmp/testdb",&db);
    assert(status.ok());
    
    status=db->Put(ld::WriteOptions(),"key1","val1");
    assert(status.ok());
    
    return 0;
}

调用栈

// step 1
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

// step 2
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;
  ...
} 

 

首先将键值写入到WriteBatch结构,然后调用DBImpl::Write做写入操作,下面会重点说明DBImpl函数

WriteBatch

WriteBatch是DBImpl::Write流程的主要数据结构,对其做出详细说明。追求需要,会将write打包为batch,然后批量写入wal,保证了leveldb写入原子性,服务宕机,可以使写入成功的数据恢复。

WirteBatch私有成员变量 std::string rep_ ,用以存放数据

WriteBatch的编码:

长度 8字节 4字节 可变长度 可变长度 可变长度
内容 sequence number count record 1 record 2 record 3

 

 

 

sequence number: leveldb的序列号,保证MVCC,选择最近recod的sequence number

count 为记录数量

 

record的编码如下:

长度 1字节 可变长度 键大小 可变变长 值大小
内容 类型 键大小 值大小

 

 

 

WriteBatch的成员函数主要包括:

  // Store the mapping "key->value" in the database.
  void Put(const Slice& key, const Slice& value);

  // If the database contains a mapping for "key", erase it.  Else do nothing.
  void Delete(const Slice& key);

  // Clear all updates buffered in this batch.
  void Clear();

  // The size of the database changes caused by this batch.
  //
  // This number is tied to implementation details, and may change across
  // releases. It is intended for LevelDB usage metrics.
  size_t ApproximateSize() const;

  // Copies the operations in "source" to this batch.
  //
  // This runs in O(source size) time. However, the constant factor is better
  // than calling Iterate() over the source batch with a Handler that replicates
  // the operations into this batch.
  void Append(const WriteBatch& source);

  // Support for iterating over the contents of a batch.
  Status Iterate(Handler* handler) const;

 

WriteBachInternal是WriteBatch的友元类,为其辅助函数

写入流程

DBImpl::Write函数主要逻辑

分步叙述主逻辑

Part 1

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  } 
}
  • Write函数接受一个WriteBatch,以及写入选项作为其参数,sync表示wal是否直接刷盘,done是否此updates已经写入完成
  • 写入支持并发,写入到一个队列writers_内,通过条件变量来实现生产者消费者。将多个线程的写入合并,来提升写入的性能。后面会详细说明其实现方式
  • 只有在队列队首,而且写入没有完成才会执行下述的逻辑。
  • 如果执行到 w.one==true则直接退出,此时表明数据被其他线程成功写入了(因为写入之后,会更新Writer的写入状态)。

Part 2

Status status = MakeRoomForWrite(updates == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    // write_batch只需要写入一个seq
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      // 此处解锁,其他线程获取锁之后,执行加入writers_队列的动作,然后阻塞在条件变量上
      // 在线程[t1,t2,t3],第一次执行时batch中只会有t1线程的内容,随后t2和t3才会加入
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }
  • MakeRoomForWrite主要的工作为:处理L0增长过快,选择是否写限速或者写停止、memtable的刷盘逻辑以及memtable所对应的wal的处理逻辑。(下文还会对此函数详细论述)
  • 获取此写入的sequence num,每个写都会有个递增的数值
  • BuildBatchGroup比较简单,将队列的writers_的写入合并为一个WriteBatch。writes_为stl队列数据结构,通过迭代器遍历,然后通过WriteBatchInternal辅助类对WriteBatch操作,得出结果
  • 此处释放锁的原因是为了提升性能,wal写入和写入memtable比较耗时,此处释放锁后,其他的线程的写入可以入队writes_内,但是不会向下执行逻辑
  • InsertInto将WriteBatch的内容写入到memtable,以后会写一个关于memtable的文章
  • AddRecord将WriteBatch写入wal

Part 3

while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    // 不是队首元素,则标记该写入已经完成
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal(); // 通知
    }
    if (ready == last_writer) break;
  }

  // 通知
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

主要逻辑是写入完成的出队,并且更新其写入状态

关键函数分析

MakeRoomForWrite

Status DBImpl::MakeRoomForWrite(bool force);
函数的作用是:处理L0写入过快的问题,处理memtable和immemtable以及wal,以及是否进行compaction
参数force标识是否立即刷盘
 
函数的主要逻辑:
  1. 根据参数force是否立即刷盘,然后决定是否允许延迟操作,由变量allow_delay标识
  2. 如果bg_error_发生错误,退出循环,并返回error状态
  3. 如果allow_delay为ture,而且L0的文件数大于kL0_SlowdownWritesTrigger(默认值为8),则写入限速1ms,在sleep之前释放锁mutex_,不阻塞其他线程逻辑,并将allow_deply设置为false,单次写入只允许限速一次
  4. 如果不立即刷盘,而且memtable的近似大小仍未达到write_buffer_size,则直接退出函数,什么也不用做
  5. 代码走到此处,要么需要立即刷盘,要么大小超过write_buffer_size,如果此时存在immemtable,则通过条件变量阻塞,只到compaction完成(immemtable刷盘成功)
  6. 如果L0的文件数目超过kL0_StopWritesTrigger(默认12),则写入停止,也是通过条件变量实现,等待compaction将L0的文件数减少
  7. 最后的情况,就是创建Log文件句柄,创建memtable文件句柄,将就旧的的memtable变为immemtable,然后判断是否需要通过异步调度compatiion动作
 

 

有关leveldb实现之写入流程的更多相关文章

  1. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  4. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  5. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  6. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  7. ruby - Ruby 是否使用 $stdout 来写入 puts 和 return 的输出? - 2

    我想知道Ruby用来在命令行打印这些东西的输出流:irb(main):001:0>a="test"=>"test"irb(main):002:0>putsatest=>nilirb(main):003:0>a=>"test"$stdout是否用于irb(main):002:0>和irb(main):003:0>?而且,在这两次调用之间,$stdout的值是否有任何变化?另外,有人能告诉我打印/写入这些内容的Ruby源代码吗? 最佳答案 是的。而且很容易向自己测试/证明。在命令行试试这个:ruby-e'puts"foo"'>test.

  8. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

  9. ruby - "public/protected/private"方法是如何实现的,我该如何模拟它? - 2

    在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定

  10. Ruby:写入 stdin 并从 stdout 读取? - 2

    我正在编写一个ruby​​程序,它应该执行另一个程序,通过stdin向它传递值,从它的stdout读取响应,然后打印响应。这是我目前所拥有的。#!/usr/bin/envrubyrequire'open3'stdin,stdout,stderr=Open3.popen3('./MyProgram')stdin.puts"helloworld!"output=stdout.readerrors=stderr.readstdin.closestdout.closestderr.closeputs"Output:"puts"-------"putsoutputputs"\nErrors:"p

随机推荐