Paper Notes

本仓库记录我读过的 paper 以及对每篇 paper 的理解、收货和疑问。

如何读论文我参考了 How to Read a Paper 的 three-pass method:

  1. 第一遍快速浏览论文(5-10 分钟)
    • 仔细阅读题目、摘要和简介
    • 阅读每个 section 及 sub-section 的题目,忽略其它内容
    • 如果有数学公式则快速扫一眼以便了解其理论基础
    • 阅读总结部分
    • 简单浏览引用部分,看哪些是以前读过的
  2. 第二遍仔细阅读论文但是忽略细节,比如证明部分。在阅读的过程中记录下不懂的术语以及想要问作者的问题(对于有经验的读者需要 1 小时)
    • 仔细阅读论文中的插图、图表或其它插画,尤其注意那些横纵坐标的含义
    • 标记下自己没有阅读过的相关引用
  3. 为了完全理解一篇论文,往往需要阅读第三遍。第三遍则需要读者从作者的角度出发,试着在脑子里重新实现一遍论文的工作(初学者需要多个小时,对于有经验的读者也需要1-2个小时)

论文方向: CS/Database/Distributed System

论文来源:

bigdata

MapReduce: Simplified Data Processing on Large Clusters

OSDI'04: Sixth Symposium on Operating System Design and Implementation, San Francisco, CA (2004), pp. 137-150

https://research.google/pubs/pub62/

在处理 Distributed GrepInverted IndexDistributed Sort 等问题时,虽然数据本身需要执行的转换非常简单,但在高度分布式、可扩展和容错的环境中执行这些任务却又不那么简单,MapReduce 通过隐藏所有分布式系统的复杂性,为用户提供了一个分布式计算框架,用户只需提供用于将 key/value pair 处理生成一组 intermedia key/value pairsmap 函数,和一个将同一个键对应的所有 intermedia key/value pairs 做合并操作的 reduce 函数,就可以将程序并行地运行在计算机集群上。

MapReduce 的执行过程如下:

MapReduce Execution Overview

  1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.

  2. One of the copies of the program is special —— the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.

  3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.

  4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.

  5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.

  6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.

  7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code.

一个 MapReduce 任务可以分为三个阶段:

  • map phase: 在 map worker 上,处理后的中间数据根据默认或用户提供的 partitioning function 将数据保存为 R 个本地文件,并将文件位置上报给 master
  • shuffle phase: 在 reduce worker 上,根据从 master 上获取的文件位置,从各个 map worker 上读取所需的文件
  • reduce phase: 在 reduce worker 上将读取文件中 intermedia key/value pairs 进行处理的过程

Fault Tolerance

当 master 失败时,整个任务重做;当 map worker 失败时,即使它已经完成,也需要重做,因为中间数据文件是写在本地的;当 reduce worker 失败时,如果任务未完成,需要成重新调度其他节点完成对应的 reduce 任务,如果任务已经完成,则不需要重做,因为 reduce 的结果保存在 GFS。

优化

  • Locality: 由于 input file 保存在 GFS 上,MapReduce 可以根据文件存储的位置,将 map worker 调度到数据分片所在的节点上以减少网络开销
  • Combiner: 当 reduce 函数满足交换律和结合律特性时,可以将 reduce 的工作在 map 阶段提前执行
  • Backup Tasks: 将一定比例的长尾任务重新调度,可以减少任务的整体执行时间

Apache Hadoop 是 MapReduce 的开源实现,2014 年 Google 提出了 MapReduce 的替代模型 Cloud Dataflow,该模型支持流批一体,具有更好的性能及扩展性,对标的开源产品为 Apache Flink。

Nephele: Efficient Parallel Data Processing in the Cloud

Proceedings of the 2nd Workshop on Many-Task Computing on Grids and Supercomputers, November 2009

https://dl.acm.org/doi/10.1145/1646468.1646476

在云的时代,对于一个并发数据处理框架的调度器来说,要回答的问题从 Given a set of compute resources, how to distribute the particular tasks of a job among them? 转变为了 Given a job, what compute resources match the tasks the job consists of best?

这种新的范式对调度器提出了三个要求:

  1. 必须了解 job 执行所在的云环境,如各种 VM 的类型及其定价
  2. 必须能够描述一个 job 的各个 task 之间的依赖关系,进而判断何时一个 VM 不再需要可以提前释放
  3. 必须能够决定一个 task 应该在哪种类型的 VM 上执行

这种灵活性同时带来了一些挑战,其中最主要的问题在于网络拓扑,数据处理框架很难知道网络的层级,比如两个节点之间的数据传输经过了几个交换机,对于调度器来说,拓扑感知的调度非常困难。即使知道了底层的网络层级,由于 VM 的迁移特性,网络拓扑可能会随时变动。解决这种问题的唯一方法是将需要大量数据传输的 tasks 调度在一个更强的 VM 上以保证 Data locality。

Nephele

基于云计算带来的机遇和挑战,论文提出了 Nephele,一种基于云环境的全新数据处理框架。其架构遵循 master-worker 模式:

nephele architecture

在一个 job 提交之前,用户需要启动一个实例运行 Job Manager(JM),它负责接收用户提交的 Job 及调度,它能够通过云提供的服务接口动态申请及释放 VM。

Nephele 框架的另一种角色为 Task Manager(TM),它从 JM 接收任务,执行之后将完成状态或可能的错误信息汇报给 JM。

一个任务在 Nephele 中通过 DAG 来描述,原因在于:

  1. DAG 允许 task 有多个入度和出度,可以极大简化传统数据结合算子,如 join
  2. DAG 的边表示了执行中的 Job 的数据传输路径,Nephele 可以根据这些信息判断哪些实例可以关闭释放

一个 Nephele Job 的定义由三个手动步骤构成:

  1. the user must write the program code for each task of his processing job or select it from an external library
  2. the task program must be assigned to a vertex
  3. the vertices must be connected by edges to define the communication paths of the job

用户通过 Job Graph 在一个抽象的层级描述任务和任务之间的关系,将任务并行度及任务在实例上的调度留给 Nephele。用户还可以在任务描述中增加注解给 Job Graph 提供更多信息,如:

  • Number of subtasks
  • NUmber of subtasks per instance
  • Sharing instances between tasks
  • Channel types
  • Instance type
nephele architecture

在接收到用户的 Job Graph 后,JM 将其转换为 Execution Graph,这是 Nephele 调度和监控的主要数据结构。相对 Job Graph 只在抽象层次描述任务,Exectution Graph 多了物理层次任务调度到具体实例的信息及 tasks 之间的通信通道:

nephele architecture

其中几个重要的概念:

  • Execution Stage 一个 Exection Stage 执行之前,必须保证其前序 Execution Stage 都已执行完毕,它的另外三个特性使得它可以类比为 checkpoint
    • when the processing of a stage begins, all instances required within the stage are allocated
    • all subtasks included in this stage are set up and ready to receive records
    • before the processing of a new stage, all intermediate results of its preceding stages are stored in a persistent manner
  • Group Vertex 对应一个 Job 的一个 task,如果有并发,则一个 Group Vertex 可以有多个 Execution Vertex
  • Execution Instance 执行实例,多个 Group Vertex 可以调度在同一个 Exectuion Instance 进而提高 Data Locality
  • Channels Nephele 要求所有的边都替换成 channel,包括如下几种:
    • Network channels - 要求通信子任务存在于同一个 Stage
    • In-Memory channels - 要求通信子任务存在于同一个 Stage
    • File channels - Nephele 只允许不同 Stage 的子任务使用这种方式

读后感

Nephele 是 Flink 的前身,这种根据任务申请资源、按量付费的方式在云时代有着巨大的机会,这也是为何 Snowflake、PingCap 这样的厂商逐渐撅起的一个因素。

The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in MassiveScale, Unbounded, OutofOrder Data Processing

Proceedings of the VLDB EndowmentVolume 8Issue 12August 2015 pp 1792–1803

https://doi.org/10.14778/2824032.2824076

The future of data processing is unbounded data,尽管 bounded data 依然有用武之地,它在语义上可以被归类为 ubounded data。Dataflow Model 就是这样一个提供无界数据和有界数据统一处理框架的模型。

论文介绍了 Windowing、Time Domains 相关的各种概念,附加了很多图例,Apache Flink 在一定程度上借鉴了该论文的理念。

阅读该论文是因为最近在学习 Flink,并总结了一些学习资源: Awesome Flink Learning Resources

databases

ColumnStores vs. RowStores: How Different Are They Really?

SIGMOD 2008 Daniel J. Abadi, etc.

https://dl.acm.org/doi/10.1145/1376616.1376712

列存数据库在数仓、决策支持、BI 等分析型应用中被证明比传统行存数据库的表现要好出一个量级以上,其原因也显而易见:列存数据库对于只读查询按需访问所需列,因而更 IO efficient。这种直观的感受不禁让人猜想:如果用行存数据库模拟列存数据库,使得每个列单独访问,是否可以获得列存数据库的性能收益?

这引出了论文要回答的第一个问题:

  • Are these performance gains due to something fundamental about the way column-oriented DBMSs are internally architected, or would such gains also be possible in a conventional system that used a more column-oriented physical design?

作者使用三种技术在行存数据库中模拟列存,其查询性能都非常差。那么是什么造就了列存数据库的优越性能呢?

这引出了论文要回答的第二个问题:

  • Which of the many column-database specific optimizations proposed in the literature are most responsible for the significant performance advantage of column-stores over row-stores on warehouse workloads?

ROWORIENTED EXECUTION

作者在 System X (一款商业行存数据库) 上使用了三种方法来模拟列存数据库。通过在 SSBM 上的实验发现这些方法都不能取得较好的性能。

1. Vertical Partitioning

将表进行垂直切分是最直观的方式,但这需要将切分后的数据进行关联。行存不具有列存将每列数据都按照相同的顺序进行存储的特性,因而一个逻辑表垂直切分出的每一个物理表,都保函两列数据 —— 该列的原始数据和 "position" column(通常是 primary key),查询需要被改写为基于 "position" column 的 join,不管是 Hash join 还是 index join,性能都较差。

这种方式由于在每个列对应的表中都存储了 "position" column,浪费了存储空间和磁盘带宽。另外行存数据库的每一行数据都会保存一个相对较大的 header,这进一步浪费了存储空间。

2. Index-only plans

在基础行存表的之外对每一列存储一个(value, record-id)的非聚簇索引,这样可以不存储重复数据,且没有 tuple header 的存储空间浪费。这种方式对没有谓词的返回列需要进行全表扫描。

3. Materialized Views

对查询的中的每个表所需的列生成物化视图,使用这种方式减少查询需要读取的数据量,以期望该种方法取得优于另外两种方法的性能。

COLUMNORIENTED EXECUTION

上述行存模拟列存不能取得较好的性能收益在于,在 SSBM 场景下,C-Store 特有的 CompressionLate MaterializationBlock IterationInvisible Join 等特性对查询性能具有极大的帮助,论文中将 C-Store 进行逐一特性阉割,将其退化为一个行存数据库,得出各因素对于性能的影响:

  • Compression improves performance by almost a factor of two on average
  • Late materialization results in almost a factor of three performance improvement
  • Block-processing can improve performance anywhere from a factor of only 5% to 50% depending on whether compression has already been removed
  • The invisible join improves performance by 50-75%

Compression 除了通过节省 I/O 来提升查询性能,其 operate directly on compressed data 的特性进一步提高了性能。

Late MaterializationBlock Iteration 合在一起被称为 Vectorvectorized query processing

Invisible Join 是一种 Late materialized join 技术,但减少了需要读取的数据,具体细节论文中的例子做了简明的解释。

思考

基于 Btree 的行存数据库中的一些特性是为了解决某些实际问题(写优化),比如论文中提到的:

  • 每行数据都有 header => 为了实现 MVCC 而存在
  • 数据并非物理有序 => 有些存储引擎使用 directory slots 的方式存储数据以避免数据插入时过多的数据移动

所以能够直观地判断出在基于 Btree 的行存数据库中模拟列存不会有好的性能收益。如果是 LSM Tree 呢?虽然可以实现 Late Materialization,但 Compression 和 Block iteration 可能不如在 Column Store 中的收益明显。

Main Memory Database Systems

Main Memory Database Systems: An Overview

IEEE Transactions on Knowledge and Data Engineering, H. Garcia-Molina, 1992

https://dl.acm.org/doi/10.1109/69.180602

这是一篇 1992 年的 paper,虽然那时的内存容量可能只有几十兆上百兆,但 MMDB 已经开始崭露头角,人们希望通过将数据全量驻留在内存来提供更快的响应时间及更大的事务吞吐。传统数据库(DRDB)可以将磁盘数据缓存在内存中加速查询,MMDB 需要将数据在磁盘备份以防数据丢失。

MMDBDRDB with a very large cache 有何不同呢?DRDB 在设计的时候假设内存不足以保存所有数据,因此没有充分利用内存的特性。而 MMDB 在设计之初就假设数据可以全量存储在内存,其各模块都存在着一些与 DRDB 不同的点。

Concurrency Control

  1. 访存远快于读盘,因而事务能够快速结束,同时意味着锁不会长时间持有,可以锁的粒度可以加大
  2. 由于数据存储在内存,因此锁机制可以通过数据对象的少量比特位来实现

Commit Processing

数据需要通过日志持久化,对于 MMDB,日志的开销占比显得非常突出。解决该问题的办法包括:

  1. Stable main memory 可以缓解事务的响应时间但不能消除日志瓶颈
  2. Precommit 事务写完日志就可以响应请求
  3. Group Commit 将多个事务的日志攒批落盘,可减少 IO

Access Method

不再需要 B-Tree,索引可以通过排序的链表来实现。

Data Representation

关系元组可以通过一组指针来表示。

Query Processing

顺序访问与随机访问的速度对于内存而言差别不大,因此依赖快速顺序访问的查询算法不再具有优势,如 sort-merge join。但 process cost 变得难以预测。

Recovery

在 MMDB 中,Checkpoint 和 Recovery 是需要访问磁盘数据的两个唯一原因。因此磁盘访问可以对 Checkpionter 进行优化,比如使用加大的 block size。在恢复的时候可以采用 on demand 的方式从磁盘恢复数据或使用磁盘分配或磁盘阵列来并行读取数据以加速恢复。

Performance

传统数据库备份对性能的影响要远小于 MMDB,因此 checkpoint 算法在 MMDB 显得至关重要。

Application Programming Interface and Protection

  1. Once the transaction can access the database directly, they can read or modify unauthorized parts
  2. and the system has no way of knowing what has been modified, so it can not log the cahnges

最佳的解决方案是只运行由特定数据库系统编译器编译过的事务。

Data Clustering and Migration

Migration and dynamic clustering are components of a MMDB that have no counterpart in conventional database systems.

Online Analytical Processing

Lakehouse: A New Generation of Open Platforms that Unify DataWarehousing and Advanced Analytics

CIDR 2021. Matei Zaharia, Ali Ghodsi, Reynold Xin, Michael Armbrust

https://dblp.org/rec/conf/cidr/Zaharia0XA21.html

数据分析平台发展历程经历了如下两代:

  • 第一代: schema-on-write。通过将 Operational database 的数据收集到数仓以支持 Decision Support 及 Business Intelligence。Figure 1-a。
  • 第二代: schema-on-read。将开放文件格式(如 Apache Parquet 和 ORC)的数据离线存储在成本较低 Data Lake 中(如 HDFS),通过 ETL 操作将一部分数据抽取到数仓后提供 DS 及 BI 的能力。这种架构在提供低成本存储各种数据类型的同时牺牲了数据质量及治理能力。从 2015 年开始,Cloud Data Lake(S3,GCS 等)由于具有极低的成本及优秀的持久性开始替代传统的 Data Lake,逐渐形成了 data lake + data warehouse 的两层架构。Figure 1-b。

two-tier model and lakehouse model

两层架构由于存 (如 S3) 算 (如 Redshift) 看起来很便宜,但从用户角度却非常复杂,数据首先需要从 OLTP 数据库 ETL 到 Data Lake,然后再从 Data Lake ETL 到数仓。这种架构主要存在四个问题:

  • Reliablity: 多次 ETL 降低了数据可靠性
  • Data Staleness: 相比第一代架构数据直接 ETL 到数仓,两层架构反而增加了时延
  • Limited support for advanced analytics: 通过 ODBC/JDBC 读取机器学习需要处理大量的 dataset 非常低效,因而高级分析工具受限
  • Totol cost of ownship: 数仓中的数据由于在 Data Lake 中也保存了一份,因而增加了存储成本

两层架构通过增加对 Parquet 和 ORC 外表的支持,可以使用相同的 SQL 引擎查询 Data Lake 上的数据,但这通常具有较差的性能;一些研究将 SQL 引擎(如 Presto)直接运行在 Data Lake 之上,但依然未支持 ACID 事务、多版本等特性。

由于这些问题的存在,本文讨论的主题围绕一个技术问题展开:

Is it possible to turn data lakes based on standard open data formats, such as Parquet and ORC, into high-performance systems that can provide both the performance and management features of data warehouses and fast, direct I/O from advanced analytics workloads?

作者认为 Lakehouse(Data Lake + Data Warehouse)湖仓一体的第三代架构的时代即将来临,它具有如下特点:

  • Reliable data management on data lakes: 数据以开放格式存放在低成本对象存储,并支持事务、版本管理及零拷贝克隆(如 Delta Lake、Apache Iceberg)等特性
  • Support for machine learning and data science: 通过声明式 DataFrame API 支持机器学习和科学计算等负载模型
  • SQL performance: 通过维护缓存、Parquet/ORC 文件的辅助数据(Statics/Bloomfilter based index)及优化数据布局(Z-order/Hilbert Curves)来提升查询性能

Databricks 通过 Delta Lake、Delta Engine 和 Databricks ML Runtime 三个项目构建了一个 Lakehouse 数据分析平台。

two-tier model and lakehouse model

一个小感想:

Lakehouse 本身并没有提出什么新技术,更像是对已有技术的重新组织,通过提供更低的成本和丰富的特性为用户创造价值。

Change Data Capture

DBLog: AWatermark Based Change-Data-Capture Framework

Andreas Andreakis, Ioannis Papapanagiotou. 2020

https://arxiv.org/abs/2010.12597

不同特性数据库之间的同步是一个普遍存在的需求。dual-write 和分布式事务是尝试解决此问题的一种模式,但这两种方式在可行性、鲁棒性和可维护性等方面或多或少存在一些问题;CDC 则是通过捕获数据库事务日志事件来达到下游数据库同步的另一种模式,但事务日志可能并非包含所有的变更事件(create/update/delete),因此需要同步数据库的全状态(full state)。

DBLog 通过事务日志捕获变更事件,通过 select 来获取数据的全状态。根据主键将表切分为多个 chunk,每个 chunk 的 select 和事务日志同时进行,为了保证事件的历史顺序,DBLog 在源数据库中维护了一个单行单列的表作为辅助,通过在 select 前后分别更新该记录使得在事务日志中多了两个事件 lw(低水位) 和 hw(高水位),然后将查询出来的事件和 [lw, hw] 之间的事件做以下算法,生成一个新事件流。

db log algorithm

paper 中的示例图 (Figure 3 && Figure 4) 非常明白地解释了上述算法:

Watermark-based Chunk Selection

Interleaving log capture with full data capture

算法正确性依赖数据库的 RC 或 RR 隔离级别,select 返回的 result set 是 (lw, hw) 之间的一个点的视图,我们以 RR 隔离级别分别用图示论证增改删的正确性:

1 insert log event

insert 日志在 select 之前,select 日志中能会包含该插入记录,因此从 result set 将其删除避免了重复插入。

 ---lw------select-------hw-----
        ^
        |
      insert

insert 日志在 select 之后,result set 中不含该插入记录,新的事件流只包含一次该记录。

 ---lw------select-------hw-----
                    ^
                    |
                  insert

2 update log event

update 日志在 select 之前,select 结果中包含了更新后的结果,而 update 日志包含了更新前后的数据,将 result set 中对应的记录删除,新的事件流中包含 update 后的记录,可保证正确性。

 ---lw------select-------hw-----
        ^
        |
      update

update 日志在 select 之后,select 中包含更新前的结果,将 result set 中对应的记录删除了老的值,但 update 日志包含了更改前后的新老数据,同样可以保证正确性。

 ---lw------select-------hw-----
                    ^
                    |
                  update

3 delete log event

delete 日志在 select 之前,result set 中不含该记录,新的事件流删除一个不存在的记录不影响正确性。

 ---lw------select-------hw-----
        ^
        |
      delete

delete 日志在 select 之后,result set 包含该记录,需要删除该记录来保证新的事件流先删除一个不存在的记录后又将该记录插入。

 ---lw------select-------hw-----
                    ^
                    |
                  delete

疑问

output buffer 中为何还保留了 lw 之前的事务日志?

More readings

[1] DBLog: A Generic Change-Data-Capture Framework

data layout

C-Store: A Column-oriented DBMS

VLDB 2005 Mike Stonebraker, etc.

https://dl.acm.org/doi/10.5555/1083592.1083658

行存(row store)RDBMS 作为当时的主流,通常将同一行记录的所有属性存储在磁盘的连续空间,以获得卓越的写性能,这种方式也被称为 write-optimized,适用于 OLTP 场景。而数据仓库、CRM 等需要 ad-hoc 查询大量数据的系统应该具有 read-optimized 的特性,此类系统往往通过列存(colunm store)架构来获得更有效的性能(如 Sybase IQ, Addamark, KDB)。

列存架构中,查询操作只需按需读取所需要的列,避免将无关数据读取到内存。CPU 的性能增速要远高于磁盘的带宽增速,在 read-optimized 系统,用 CPU cycles 换取磁盘带宽是一件值得的事情,列存通过 1) code 2) densepack 两种方式达到以 CPU cycles 换取磁盘带宽的目的。

C-Store 是作者提出的一个新的列存数据库,具有如下革新特性:

  1. A hybrid architecture with a WS component optimized for frequent insert and update and an RS component optimized for query performance
  2. Redundant storage of elements of a table in several overlapping projections in different orders, so that a query can be solved using the most advantageous projection
  3. Heavily compressed columns using one of several coding schemes
  4. A column-oriented optimizer and executor, with different primitives than in a row-oriented system
  5. High availability and improved performance through K-safety using a sufficient number of overlapping projections
  6. The use of snapshot isolation to avoid 2PC and locking for queries

混合架构

人们对实时数仓数据可见性的延迟要求越来越高(toward 0),C-Store 的 Writeable Store 模块支撑频繁地插入和更新,Read-optimized Store 模块提供高性能查询,Tuple Mover 则负责将数据从 WS merge 到 RS,这是结构非常类似 LSM。

数据模型

C-Store 支持标准关系数据库的逻辑数据模型 —— 数据库由表构成,每个表含多个数据列,表的属性可以作为 primary key 或其它表的 foreign key。但 C-Store 的物理存储模型不同于这种逻辑模型,是以 projection 进行存储的。

  • 一个 projection 存储了一个逻辑表的一个或多个列属性,它还可以存储其它表的一个或多个属性以表示外键
  • 一个 projection 和它的锚表(anchor table)具有的行数相同
  • 一个 projection 具有一个 sort key,由该 projection 的一个或多个列构成。同一个 projection 中的 K 个属性各自存储于一个数据结构中,且都按照 sort key 进行排序
  • 每个 projection 都被水平切分(Horizontally partitioned)为一个或多个 segment,切分的依据为 sort key,这样每个 segment 都对应一段 key range
  • 由于每个 projection 的 sort key 不一致,因此需要 join indexes 来维持不同 projection 的对应关系

Storage key

  • 在 RS 上,storage key 是记录在 segement 上的序数,不存储且按需计算获得
  • 在 WS 上执行引擎给每一次插入赋予一个唯一的 storage key, 大于 RS 上 storage key 的最大值
  • WS 和 RS 按照相同的规则进行水平切分,因此 RS segement 和 WS segent 是 1:1 对应的,因此 (sid, storage_key) 只能存在 RS 或 WS 中的一个
  • 由于 WS 在尺寸上相对较小,因此使用 B-tree 直接存储 projection 的每一列,按 storage key 排序,另外(sort key, storage key)以 sort key 作为 key 保存为 B-tree。这样就根据 sort key 快速定位到 storage key,然后再根据 storage key 找到对应的列值
  • WS 构建在 BerkeleyDB 的 B-tree 结构上

Join Indices

If T1 and T2 are two projections that cover a table T, a join index from the M segments in T1 to the N segments in T2 is logically a collection of M tables, one per segment, S, of T1 consisting of rows of the form:

(s: SID in T2, k: Storage Key in Segment s)

Join Index 按照 T1 的切分方式进行切分,且跟 T1 的 segement 保存在一起。

Join Index 在更新的时候非常难以维护,因此会把每个 column 存储在多个 projection 上,但这对 DBA 设计表结构提出了更高的要求,文中提到他们正在写一个自动化表结构设计工具。

Coding Schema

projection 中的每一列都紧凑地存储在一个结构体中,基于该列是否是 sort key 以及该列不同数据的多少,文中给出了四种编码方式:

  1. Self-order, few distinct values
  2. Foreign-order, few distinct values
  3. Self-order, many distinct values
  4. Foreign-order, many distinct values

Column-oriented optimizer and executor

这块不懂,后面有机会再补充

High availability

由于不同的列存在与多个 projection,C-Store 基于此可以实现 K-safe 的系统。

Snapshot Isolation

Snapshot isolation works by allowing read-only transactions to access the database as of some time in the recent past, before which we can guarantee that there are no uncommitted transactions.

对于只读事务,可以根据时间戳来判断记录的可见性,因此可以不用加锁

对于 Read-write 事务,C-Store 遵循严格的 two-phase locking,并使用了 NO-Force, STEAL 策略。

事务相关的内容感觉跟 Mysql 有些相像,但有些内容还没搞明白,这块后面有机会再研究一下。

Dremel: Interactive Analysis of Web-Scale Datasets

VLDB 2010

https://dl.acm.org/doi/10.14778/1920841.1920886

Dremel 是 Google 内部使用的一个可扩展、交互式数据分析系统,通过将 multi-level execution trees列存数据布局结合,可以在秒级时间量级执行万亿(trillion)行表的聚集查询。

Dremel 架构借用了分布式搜索引擎 serving tree 的概念,查询被下推到树的各个节点并在每一步进行改写,通过聚合从低层收到的回复来组装结果;另外,Dremel 提供了一个 SQL-like 的高级语言来表达 ad hoc 查询,不同于 Pig 和 Hive,Dremel 查询不会转换成 MR 任务。

Dremel 使用了列存格式来存储数据,以减少不相关的数据读取和 CPU 消耗(due to cheaper compression)。虽然列存格式在 OLAP 领域并不少见,但 Dremel 开创性地将其扩展到了嵌套数据格式。

数据模型

Dremel 的数据模型是基于强类型的嵌套记录,抽象语法:

t = dom | <A1:t[*|?], ..., An:t[*|?]>

t 是原子类型或一个记录类型,原子类型包括整型、浮点数、字符串等
Ai 为记录的第 i 个字段,其后的 [*|?] 表示其重复类型
    * 表示该字段为 Repeated
    ?表示该字段为 Optional
    无标记则表示该字段为 Required

这种语法可以使用 ProtoBuffer 进行表示,PB 序列化后的数据适合用于 RPC,不适合作为下盘格式。这里我们认为 PB 数据为行存格式,下面通过例子描述如何将 PB 的数据转化为 Dremel 的列存格式。

Schema

message Document {
    required int64 DocId;
    optional group Links {
        repeated int64 Backward;
        repeated int64 Forward;
    }
    repeated group Name {
        repeated group Language {
            required string Code;
            optional string Country;
        }
        optional string Url;
    }
}

如上的 Schema 非常类似 JSON,理解起来应该不难。除了多重性标记之外,full path 的概念跟后面的 Repetition Level 和 Definition Level 有着密切的关系:

FildFull Path
DocIdDocId
BackwardLinks.Back
LanguageName.Language
CountryName.Language.Country

两行符合上述 Schema 的记录

r1

DocId: 10
Links
    Forward: 20
    Forward: 40
    Forward: 60
Name
    Language
        Code: 'en-us'
        Country: 'us'
    Language
        Code: 'en'
    Url: 'http://A'
Name
    Url: 'http://B'
Name
    Language
        Code: 'en-gb'
        Country: 'gb'

r2

DocId: 20
Links
    Backward: 10
    Backward: 30
    Forward: 80
Name
    Url: 'http://C'

对于这种嵌套模式,如果单纯将同一字段的数据连续存储,不能确定一个数据属于哪条记录。Dremel 引入 Repetition LevelDefinition Level 来解决这个问题。

Repetiton Level 记录该值在 full path 的哪一级进行重复,以 Name.Language.Code 为例,它含有两个可重复字段:NameLanguage,因此 Code 的 Repetition Level 可取的值为 0,1,2:

  • 0 表示一个新行
  • 1 表示该值最近的重复级别为 Name
  • 2 表示该值最近的重复级别为 Language

对 r1 进行 Repetition Level 标注:

                  Repetition Level
DocId: 10                   0
Links
    Forward: 20             0
    Forward: 40             1
    Forward: 60             1
Name
    Language
        Code: 'en-us'       0
        Country: 'us'       0
    Language
        Code: 'en'          2
    Url: 'http://A'         0
Name
    Url: 'http://B'         1
Name
    Language
        Code: 'en-gb'       1
        Country: 'gb'       1

注意到 r1 的第二个 Name 中没有任何 Code,但为了表示 en-gb 是属于第三个 Name,在 Code 列中插入了一条 NULL(repetition level 为 1)。Code 在 Language 中为 Required 字段,所以它为 NULL 隐含了 Language 也为 NULL。因此需要额外的信息来数据属于哪个级别,以避免构造出多余的数据结构。

Definition Level 表示了 full path 上的 optional 或 repeated 字段实际存在的个数,对于同列非 NULL 的所有记录,其值是相同的。

对 r1 进行 Definition Level 标注:

                  Repetition Level      Definition Level
DocId: 10                   0               0
Links
    Forward: 20             0               2
    Forward: 40             1               2
    Forward: 60             1               2
Name
    Language
        Code: 'en-us'       0               2
        Country: 'us'       0               3
    Language
        Code: 'en'          2               2
    Url: 'http://A'         0               2
Name
    Url: 'http://B'         1               2
Name
    Language
        Code: 'en-gb'       1               2
        Country: 'gb'       1               3

Definition Level 主要对 NULL 有意义,在恢复数据的时候可以避免恢复出多余的结构。下图是 r1、r2 全量的列存结构:

column-striped representation

假设没有 Definition Level,尝试恢复 r1 的数据结构,会得到如下的结果:

读第一行 en-us:

Document
    Name
        Language
            code: 'en-us'

读第二行 en:

Document
    Name
        Language
            code: 'en-us'
        Language
            code: 'en'

读第三行 NULL:

Document
    Name
        Language
            code: 'en-us'
        Language
            code: 'en'
    Name
        Language

读第四行 en-gb:

Document
    Name
        Language
            code: 'en-us'
        Language
            code: 'en'
    Name
        Language
    Name
        Language
            code: 'en-gb'

可以看出第二个 Name 中构造出了 Language,这在 r1 是不存在的。但如果有了 Definition Level,在读取第三行的时候就能知道实际只存在一个字段,也就是 Name,这样在构造的时候就不会构造 Language 结构:

读第三行 NULL:

Document
    Name
        Language
            code: 'en-us'
        Language
            code: 'en'
    Name

在实际的编码中会对上面的结构进行编码以减少不必要的存储空间。至此我们对 Dremel 的列存格式有了一定的了解。论文的附录有记录切分和组装的具体算法,这里不再描述。

查询执行

Dremel 使用 multi-level serving tree 来执行查询,根节点收到请求,读取 metadata 并据此将请求路由到下一级,直到叶子节点,叶子节点请求存储层或直接访问本地盘获取数据,处理之后逐级汇总到根节点。请求在路由到下一级之前会进行改写。

System architecture and execution inside a server node

Dremel 是一个多用户系统,其 query dispatcher 会根据任务优先级和负载均衡进行调度。通过设置返回结果时必须扫描 tablets 的百分比(比如 98%),牺牲一定精确性来换取性能。

Paper 中没有提到索引、连接、数据更新相关的内容。

Apache Parquet 的实现参考了 Dremel 的数据模型,Apache Drill 的实现参考了 Dremel 的查询模型。

References:

[1] Dremel made simple with Parquet
[2] The striping and assembly algorithms from the Dremel paper

RCFile: A fast and space-efficient data placement structure in MapReduce-based warehouse systems

IEEE 27th International Conference on Data Engineering, 2011

https://ieeexplore.ieee.org/document/5767933

基于 MapReduce 的数仓系统在大数据分析领域扮演着一个非常重要的角色,在这样的系统中 data placement structure 是影响数仓性能的关键要素。作者根据对 Facebook 生产系统的观察与分析,总结了 data placement structure 需要满足大数据处理的四个要求:

  1. Fast data loading
  2. Fast query processing
  3. Highly efficient storage space utilization
  4. Strong adaptivity to highly dynamic workload patterns

文章对三种在传统数据库系统接受度甚广的数据格式 —— row-stores, column-stores, hybrid-stores —— 基于 MapReduce 使用场景分析了它们不适合分布式大数据分析系统的原因,并提出了一种新的数据格式 RCFile(Record Columnar File)。

Horizontal Row-store

row-store 将一行记录的所有字段按照表的定义逐一顺序存放在磁盘页:

An example of row-store in an HDFS block

行存对于只读的数仓系统的弱点已经被充分讨论过,包括:

  • 对于只关心少数几个列的查询,行存不能减少无用列的读取,因而很难提供快速的查询
  • 各列不同类型的数据混合存储使得行存很难获得较高的压缩比(虽然有研究指出通过结合熵编码列相关性能够为行存提供优于列存的压缩比,但解压会消耗更多的资源)

行存满足大数据处理四个要求中的 1、4

Vertical Column-store

在数仓系统中广泛被使用的读优化的列存格式分为两种,一种是将每一列作为一个 sub-relation 单独存储,典型代表如 Decomposition Storage Model(DSM) 和 MonetDB;另一种将关系中的所有列组合成多个的 column group,并且允许不同 column group 之间有 overlap,典型代表如 C-store 和 Yahoo Zebra。文中称第一种模式为 column-store,将第二种模式称作 column-group

column-group 模式中单个 group 的数据组织方式依赖具体实现,C-store 中每个 group 采用 column-store,而 Zebra 中的每个 group 则采用行存存储。下图是一个 column-group 的示例:

An example of column-group in an HDFS block

column-store 能够避免读取无用列的数据,并能提供较高的压缩比。但在 HDFS 中,不同列的数据不能保证存储在同一个 DataNode 上,因此重构数据时存在大量网络开销。

column-group 的每个 group 相当于一个物化视图,可以避免数据重构的网络开销,但由于预先设计好的 projection 只对有限的查询语句有效,因此不支持 highly dynamic workload。而且由于 column-group 的数据存在 overlap,难免会造成数据空间的膨胀。

column-store 满足大数据处理四个要求中的 1、3、4

column-group 满足大数据处理四个要求中的 1、2

Hybrid Store: PAX

PAX 使用了一种数据混合放置的策略,同一行数据的各字段仍然存储在同一个磁盘页,只是将该磁盘页的多行记录进行了重新组织,各个列的值连续存储在 mini-page 中,并在 page header 中存储 mini-page 的指针。

PAX 的主要目的是为了提高 CPU 缓存命中率(减少 false sharing),鉴于以下三点原因,它不能满足较高的存储空间利用率及快速查询:

  1. 数据未进行压缩
  2. 由于单个磁盘页存储的数据内容不变,PAX 不能消除无用列的磁盘读 I/O
  3. PAX 使用 fixed page 作为数据记录组织的基本单元

PAX 满足大数据处理的四个要求中的 1、4

RCFile

RCFile 在设计上借用了 PAX 的理念:First horizontally-partion, then vertically-partion。其实现在 HDFS 之上,表的数据布局:

  1. 一个表有多个 HDFS blocks
  2. 在每个 HDFS block 中,按照 row group 为基本单元对记录进行组织,根据 row group size 和 HDFS block size,一个 HDFS block 可以对应一个或多个 row group
  3. 每个 row group 包含三个分区: sync marker、metadata header、table data

An example to demonstrate the data layout of RCFile in an HDFS block

RCFile 使用 Run Length Encoding 对 metadata header 进行压缩,table data 的数据各列数据使用 Gzip 进行独立压缩(A future work: 根据不同类型自动选择最佳的压缩算法)。

RCFile 数据追加写的方式总结如下:

  1. RCFile creates and maintains an in-memory column holder for each column. When a record is appended, all its fields will be scattered, and each field will be appended into its corresponding column holder. In addition, RCFile will record corresponding metadata of each field in the metadata header.

  2. RCFile provides two parameters to control how many records can be buffered in memory before they are flushed into the disk. One parameter is the limit of the number of records, and the other parameter is the limit of the size of the memory buffer.

  3. RCFile first compresses the metadata header and stores it in the disk. Then it compresses each column holder separately, and flushes compressed column holders into one row group in the underlying file system.

RCFile 可以根据查询语句的需要只读取 row group 中所需要的列。假设 tbl 有四列数据(c1, c2, c3, c4),如下语句只需读取并解压 c1 和 c4 列

select c1 from tbl where c4 = 1

当一个 row group 中没有满足 c4 = 1 的记录时,甚至不需要读取 c1 列,这被称为 Lazy Decompression。row group size 的大小也会影响 Lazy Decompression 的有效性,因为当 row group size 比较大时,命中满足条件的行的几率会更大,从而使得 Lazy Decompression 更容易失效。

row group size、压缩率、Lazy Decompression 之间的关系存在 trade off,文中使用数据进行了佐证。

RCFile 满足大数据处理的四个要求中的 1、2、3、4

More Readings:

[1] RCFile vs. ORC from Hadoop In Real World

Major Technical Advancements in Apache Hive

SIGMOD 2014

https://dl.acm.org/doi/10.1145/2588555.2595630

Apache Hive 是 Hadoop 生态被广泛使用的数仓系统,随着用量的增大增多,出现了一些短板,主要体现在 file formatsquery planningquery execution 三个方面。Hive 需要更好的文件格式来充分利用存储空间并加速数据访问;更好的查询优化器及执行器来提高资源利用率及执行性能。

本篇笔记主要关注论文中的 file formats,也就是 ORC(Optimized Record Columnar) 相关的内容。

Hive 最初使用两种简单的文件格式:TextFile(plain text data) 和 SequenceFile(binary key/value pairs),由于这两种存储对存储数据类型不可知(data-type-agnostic)且数据以 one-row-at-a-time 的方式进行存储,数据很难被有效压缩。Hive 0.4 引入的 RCFile 列存格式在存储效率上有所提升,但其仍然对数据类型不可知且逐行进行序列化,在这种格式下,data-type specific compression 未能被有效利用。另外,RCFile 主要为 sequential data scan 而设计,它并未提供任何索引来帮助查询跳过无用数据。文中认为 Hive file formats 的两个不足:

  • data-type-agnostic file formats and one-row-at-a-time serialization prevent data values being efficiently compressed
  • data reading efficiency is limited by the lack of indexes and non-decomposed columns with complex data types

File Format

为了解决存储和数据访问方面的不足,Hive 设计并实现了一个增强版的文件格式:Optimized Record Columnar File(ORC File)。其 3 个主要改进包括:

  • ORC file writer 对数据类型可知,通过增加各种 type-specific 数据编码模式来更有效地存储数据
  • 引入各种索引来帮助 ORC reader 查找所需数据并跳过无用数据,从而加速数据访问
  • 由于 ORC writer 数据类型可知,可以将复杂类型(如 map)进一步分解

除此之外,ORC 还具有更大的默认 stripe size (RCFile 称为 row group) 并通过 memory manager 组件来防止 ORC writer 耗尽内存。

ORC 文件的结构示意图如下,我们借助此图从 Table Placement MethodIndexesCompression 三方面对 ORC 进行介绍。

The structure of an ORC file

The Table Placement Method

当一个表存储为 ORC 文件时,它首先被水平切分成多个 stripe,在每个 stripe 中,数据按照 column by column 的方式存储,这跟 RCFile 保持了一致。同一个 stripe 的所有列的数据存储在同一个文件。从 Table Placement Method 的角度看,ORC 相对 RCFile 有三个改进:

  1. ORC 提供的默认 stripe size 为 256MB,而 RCFile 默认为 4MB,更大的 stripe size 具有更好的读取效率
  2. ORC 可以将复杂类型进一步分解成 column tree,其中间节点存储为 metadata stream,叶节点存储为 data stream
  3. ORC 用户可以选择将 stripe 的边界与 HDFS block 的边界对齐,以避免将一个 stripe 存储在两个 HDFS block

我作为读者的一些疑惑及解答:

  1. RCFile 选择 4MB 是由于 Lazy Decompression,本文认为但在 Hive 中,Lazy Decompression 减慢了 execution pileline,这在后文的 Qeury Execution 一节中提到
  2. 对分解之后的复杂类型如何重构为一行没有进行说明,这可能需要具体阅读相关源代码

Indexes

ORC File 设计了两种稀疏索引 —— Data Statics 和 Position Pointers。

Data Statics 被 ORC Reader 用来避免从 HDFS 读取不必要的数据,这些数据由 ORC Writer 在生成一个 ORF file 时创建,通常包括 min/max/sum、length 等数据。Data Statics 在 ORC File 中分为三级:

  • File Level: 该级别的数据记录在文件结尾,通常用于查询优化及回答简单的聚合查询
  • Stripe Level: 对 Stripe 中的每一列值都存储了相应的数据,可以用来跳过非必要的数据访问
  • Index group Level: stripe 中的每一列可以组成多个 index group,默认每 10,000 行数据构成一个 index group,ORC 提供了 index group 级别的 data statics,从而可以更细粒度的跳过不需要的数据。

Position Pointers 为 ORC Reader 有效读取数据提供了两种数据位置指针:

  1. start points of every index group in metadata streams and datastreams
  2. starting point of a stripe

Compression

ORC 使用了两级压缩模式,一个流首先根据其数据类型进行编码,然后再使用通用压缩算法进行压缩(第二步可选)。对于一列数据,它可以被存储在一个或多个(复杂类型)流,流分为四种类型:

  • Byte Stream
  • Run Length Byte Stream
  • Integer Stream
  • Bit Field Stream

不同类型的数据可以使用不同的编码方式,文中只介绍了 Int 类型和 String 类型如何将编码对应到上面的四种类型,更详细的实现需参考 Hive 源码。在如上编码之后,用户可以选择使用通用压缩算法如 ZLIB,Snappy 或 LZO 对流进行进一步压缩。

More Readings:

[1] Create a new Optimized Row Columnar file format for Hive
[2] ORC File Intro

distributed system

consensus

Paxos Made Simple

ACM SIGACT News (Distributed Computing Column) 32, 4 | December 2001, pp. 51-58

https://www.microsoft.com/en-us/research/publication/paxos-made-simple/

Paxos 分布式共识算法是一个系列,Paxos Made Simple 是对其基本算法的描述,称为 Basic Paxos。Paxos 算法运行一次只能决定一个值,而非多个,将此牢记于心可以帮助理解 Paxos。

算法包含三种角色: ProposerAcceptorLearner,同一进程可扮演多种角色。其中 Acceptor 的多数派构成 quorum,2m+1 个 Acceptor 可以容忍 m 个节点 fail-stop 错误(非拜占庭失败)。

算法分两个阶段:

  • Phase 1
    • A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.
    • If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered proposal (if any) that it has accepted.
  • Phase 2
    • If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v, where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.
    • If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

以上算法没有描述 Learner 是如何知道共识已经达成,paper 中给出了两种方法:

  • For each acceptor, whenever it accepts a proposal, respond to all learners, sending them the proposal
  • A less reliable model, but one that reduces communication, is to have one or more nominated ‘distinguished learners’ to which acceptors send their acceptance notifications, and these then broadcast to the rest of the learners.

Paxos 算法中一些要点:

  • Acceptor 需要持久存储记录 minProposal、acceptedProposal、acceptedValue 等状态
  • Paxos 算法的三个里程碑(详细介绍参考 [3])
    • When the proposer receives promise(n) messages from a majority of acceptors.
    • When a majority of acceptors all issue accepted(n,val) messages for proposal number n and some value val.
    • When the proposer(s) and learners receive accepted(n,val) messages from a majority of the acceptors.
  • 如果需要对一系列值进行决策,需要多次运行 Paxos 算法,这称为 Multi-Paxos,Multi-Paxos 在学术界没有明确的阐述,在实现上需要解决很多问题,[4] 中有相应介绍

References:

[1] The Paxos Algorithm, A Google TechTalk presented by Luis Quesada Torres (Feb 2, 2018)
[2] Paxos made simple by Adrian Colyer
[3] Distributed Systems Lecture 15 by Lindsey Kuper
[4] Paxos lecture (Raft user study) by Diego Ongaro

Detecting Causal Relationships in Distributed Computations: In Search of the Holy Grail

by R Schwarz · 1994

https://www.vs.inf.ethz.ch/publ/papers/holygrail.pdf

实现一个分布式系统更像是一门艺术而非工程问题,理解分布式系统的行为任然充满了挑战。为了正确理解分布式程序及其执行,确定计算中事件之间的因果关系和时间关系。并发性和非确定性在分析分布式系统的监控、调试和可视化等行为过程中起着非常重要的作用。

该论文是对已有因果关系研究成果的汇总,循循善诱地将 Lamport Clock、Vector Time、Characterizing Concurrency with Concurrent Regions、Global Predicates、Behavioral Patterns 等概念进行分析描述。

Causality relation

causality relation

这里的 causality 即 Lamport 的 happen before,但因果关系的表述更准确。

concurrency relation

因果关系是分布式计算中很多问题的基础,比如 consistent global snapshot、determining consistent recovery points、determining deadlocks or detecting the termination of a distributed computation 等。

Lamport Clock is consistent with causality but does not characterise it.

Vector Time

Causal Histories

Causal histories

Causality and causal history relation

Causal history 能很好地反应因果关系,但是 Causal history set 维护所有了的前置事件,数量过于庞大,但通过观察可以发现:

Observation-2-3

因此可以使用 Vector Time 来替代 Causal History,Vector Time 的定义:

Vector Time

the structure of vector time is isomorphic to the causality structure of the underlying distributed computation, a.k.a vector time charaterise causality.

Efficient Realizations of Vector Time

Vector Time 的主要缺点是它的大小,由于需要在发送消息是附加一个 O(N) 大小的向量时间,可能会造成大规模并行计算的瓶颈问题。

解决该问题的一种方法是通过在每个线程额外维护两个数组 LSi("last sent")和 LUi("last update"),以此来减少发送消息的大小,如下图所示:

Singhal’s and Kshemkalyani’s method to maintain vector time

但这可能会使同一个接收者接收的不同消息之间的因果关系变得不确定:

Loss of information about the causal relationship between messages

Characterizing Concurrency with Concurrent Regions

对于有些应用,只需要知道两个任意事件 e 和 e' 是否同时发生,而它们的因果关系则无关紧要。

Evaluating Global Predicates

由于相对较难理解,还没深入看 :()

Detecting Behavioral Patterns

由于相对较难理解,还没深入看 :()

Distributed Snapshots: Determining Global States of Distributed Systems

ACM Transactions on Computer Systems, Volume 3, Issue 1 ∙ Feb. 1985 ∙ pp 63–75

https://doi.org/10.1145/214451.214456

本文提出了一个在分布式系统中生成全局逻辑一致性快照的算法,该算法依附于底层的分布式计算(即不需要暂停计算任务)。分布式系统中的进程通过发送和接收消息进行通信,进程可以记录它自己的状态和它发送和接收的消息,不记录任何其它内容。线程之间不共享时钟或内存。

Chandy-Lamport 假设进程之间的拓扑是强连接(既任何两个进程之间直接或间接可以通信),在讲述该算法时,很多例子会用全连接的拓扑进行描述。

强连接:

strongly connected

全连接:

complete graph

该算法正确工作的前提:

  • FIFO Delivery
  • Reliable Delivery
  • Processes Don't Crash

每个进程负责记录:

  • Its own internal state
  • The state of all messages on its incoming channels

算法的核心依赖 Marker 消息在拓扑图中的传递,完整的全局状态检测算法由 Marker Sending RuleMarker Receiving Rule 获取:

  • The Marker Sending Rule for a process p: for each channel c incident on, and directed away from p, p sends one marker along c after p records its state and before p sends any further messages along c.
  • The Marker Receiving Rule for a process q: on receiving a marker along a channel c, if q has not yet recorded its state then it records its state, and records the state of c as empty. However, if q has already recorded its state, then the state of c is simply recorded as the sequence of messages received along c in between q recording its state and receiving the marker on c.

CSE138 (Distributed Systems) L8: Chandy-Lamport snapshot algorithm 描述了全连接拓扑时的 Chandy-Lamport 算法:

The Initiator Process

  • Records its own state
  • Sends a marker message out on all its outgoing channels
  • Starts recording messages arriving on all incoming channels

Processes Receiving a Marker Message (含 Initiator 进程)

当进程第一次看见 Marker 消息时:

  • Records its own state
  • Flags the channel on which the marker message was received as empty
  • Sends out a marker message on each of its outgoing channels
  • Starts recording incoming messages on all channels except the one on which it received the original marker message (now flagged as empty)

如果不是第一次看见,则意味着该进程已经发出自己的 Marker 消息之后又接收到其它的 Marker 消息:

  • Stops recording incoming messages on that channel
  • Sets that channel's final state to be the sequence of all messages received whilst recording was active

简单理解就是,对于一条链路,接收端在接收到 marker 消息之前的消息都会被接收端的状态记录,发送端发送 marker 消息之后的消息都会被接收端忽略。

算法记录的全局状态虽然可能对应不上系统所处的任何状态,但它提供了一个逻辑一致的快照状态,保证初始系统状态到最终系统状态是可达的。

另外,管理系统快照需要外部协调进程来处理:

  • 接收所有进程记录的 local state 和 ingest channel state
  • 整理这些状态来形成全局系统快照

全局快照可用于解决如下问题:

  • Checkpointing
  • Deadlock detection
  • Stable Property Detection

References:

[1] Distributed Snapshots: Determining Global States of Distributed Systems by Adrian Colyer
[2] Distributed Systems Lecture 8 Notes

Lightweight Asynchronous Snapshots for Distributed Dataflows

arXiv:1506.08603 · 2015

https://arxiv.org/abs/1506.08603

读过 Chandy-Lamport(后简称 CL)之后,理解 Asynchronous Barrier Snapshotting(后简称 ABS)就比较容易了,ABS 可以看做 CL 的一个特例。不同于 CL 假设的强连通图,ABS 是为解决有向无环图而设计的,其主要步骤包含:

  • A central coordinator periodically injects stage barriers to all the sources.
  • When a source receives a barrier it takes a snapshot of its current state, then broadcasts the barrier to all its outputs
  • When a non-source task receives a barrier from one of its inputs, it blocks that input until it receives a barrier from all inputs
  • When barriers have been received from all the inputs, the task takes a snapshot of its current state and broadcasts the barrier to its outputs
  • Then, the task unblocks its input channels to continue its computation

可以看出,ABS 的非 Souce 节点在第一次收到 barrier 的时候并没有马上进行对当前状态做 snapshot,而是等待所有 input channel 都收到 barrier 之后才进行 snapshot(即 Flink 中的 barrier alignment),这样减少了 CL 中对 channel 状态的记录。其算法描述为:

Asynchronous Barrier Snapshotting for Acyclic Execution Graphs

ABS 还可扩展用于有向循环图,通过额外记录 back-edges(a back-edge in a directed graph is an edge that points to a vertex that has already been visited during a depth-first search)通道的消息,与节点记录的状态共同构成快照,算法描述为:

Asynchronous Barrier Snapshotting for Cyclic Execution Graphs

另外该论文还描述了 Failure Recovery 的方法,整个 execution graph 从最后一个快照重启,每个 task 做如下操作:

  1. retrieves from persistent storage its associated state for the snapshot st and sets it as its initial state
  2. recovers its backup log and processes all contained records
  3. starts ingesting records from its input channels

filesystem

PolarFS: An Ultralow Latency and Failure Resilient Distributed File System for Shared Storage Cloud Database

Proceedings of the VLDB EndowmentVolume 11Issue 12August 2018 pp 1849–1862

https://doi.org/10.14778/3229863.3229872

PolarFS 是为 PolarDB 设计的一个分布式文件系统,通过利用 RDMA、NVMe 等 bypass kernel 的技术,使 PolarFS 具有低时延、高吞吐、高可用等特性。为了最大化 I/O 吞吐,PolarFS 开发了 ParalleRaft,它利用数据库能够容忍乱序完成的能力,打破了 Raft 的严格串行化。

PolarFS 的特点:

  • 使用 RDMA、NVMe 等新兴硬件,实现轻量的用户态的网络栈和 I/O 栈
  • 提供 POSIX-like 文件系统链接库 libpfs,应用直接调用 libpfs 提供的接口进而绕过 VFS。这样所有 I/O 路径都在用户态
  • 关键数据路径在设计上避免锁和上下文切换,充分利用 Zero-copy 技术
  • 使用允许 out-of-order log acknowledging 的 ParalleRaft 共识算法,提高 I/O 并发

为了支撑 PolarDB 的主(Primary)从(Read Only)共享 redo 日志和数据文件的功能,PolarFS 还具有以下特点:

  • 可以同步文件元数据的改动,使得这些改动对从节点可见
  • 文件元数据的并发修改是串行化的,以保证文件系统在所有节点的一致性
  • 当发生网络分区的时候,PolarFS 确保只有一个 Primary node 提供服务,避免数据出错

架构

PolarFS 由存储层(Storage layer)和文件系统层(File system layer)构成,存储层管理存储节点的磁盘资源并为每个数据库实例提供一个数据库卷(disk volumn),文件系统层负责卷内的文件管理以及文件系统元数据的并发及同步管理。

storage layer abstraction

文件系统层

文件系统元数据的管理,其负责在该逻辑存储空间上实现文件管理,并负责文件并发访问的同步和互斥。

  • 提供一个共享、并发的文件系统,可以被多个数据库实例访问
  • libfs 完全实现在用户态,提供一组 Posix-like 文件系统接口,如 pfs_mount 将应用程序附加到相应的 Volumn 上,并初始化文件系统状态;pfs_mount_growfs 可以将 Volumn 扩展的空间标识为文件系统可用
  • 元数据的管理分为两部分
    • organize metadata to access and update files and directories within a database node
    • coordinate and synchronize metadata modification among database nodes
  • 元数据的更改记录在 jounal 中,RO 实例通过轮询日志 anchor 的变动来将日志 replay 到本节点的元数据
  • 当网络分区发生的时候,有可能有多个实例写 journal,因此需要 disk paxos 算法来保证对 Journal 文件的互斥写

存储层

存储资源的虚拟化管理,其负责为每个数据库实例提供一个逻辑存储空间。

ChunkServer

  • 一个数据库实例对应一个 Volumn,每个 Volumn 对应一个或多个 Chunk
  • Chunk 大小为 10G 固定,通过将 Chunk Append 到 Volumn 使 Volumn 空间得以扩展
  • 100TB 的 Volumn 只需在元数据数据库中存储 10,000 条记录,这些元数据还可以缓存在 PolarSwitch 的内存中
  • Volumn 提供 512B 字节原子读写
  • Chunk 由 ChunkServer 管理,每个 Chunk 会 replicate 到 3 个 ChunkServer
  • Chunk 被进一步切分为 Block,每个 Block 64KB。磁盘空间按 Block 粒度进行分配并 mapping 到 Chunk 以获得 thin provision
  • Chunk LBA 到 Block 的 mapping 表(640KB)存储在 ChunkServer 上并在内存缓存
  • 每个 ChunkServer 进程管理一个 NVMe SSD 盘并进行了绑核以减少 ChunkServer 间的资源竞争
  • 每个 ChunkServer 对应一个 WAL,数据的修改先以 append 的方式写入 WAL,然后异步更新到对应的 Block,以保证原子性和持久性
  • Consensus Group 按 ChunkServer 粒度实现,使用 ParallelRaft 算法复制 I/O 请求,进而保证数据一致性

问题:

  • WAL 对应的 3D XPoint SSD 与 Chunk 空间分配方法没有细致的描述

PolarSwitch

  • 同一个或多个数据库实例一起部署在数据库服务器上
  • libpfs 将 I/O 请求传递给 PolarSwitch,每个 I/O 请求包含 (volumn id, offset, length)
  • PolarSwitch 根据缓存的 Volumn 元数据把 I/O 再发送到对应的 ChunkServer
  • PolarSwitch 上的 Volumn 元数据跟 PolarCtrl 是同步的
  • Chunk 的副本信息元数据缓存在 PolarSwitch

PolarCtrl

  • PolarCtrl 作为 PolarFS 的管控面,部署在一组(至少3个)专用的服务器上以保证服务高可用
  • 监测集群中 ChunkServer 的活动状态
  • 在元数据数据库(Mysql 实例)中维护 Volumns 状态 和 Chunks 位置
  • 创建 Volumn 并将 Chunks 分配给 ChunkServer
  • 将元数据同步到 PolarSwitch,推和拉两种方式都有
  • 监控 Volumn 和 Chunk 的 I/O 时延和吞吐
  • 周期性地发起副本内和副本间的CRC数据校验

I/O 执行模型

对于写请求,数据库实例通常会调用 pfs_fallocate 预分配空间,以此避免昂贵的元数据操作。libpfs 通过共享内存与 PolarSwitch 进行数据交换。共享内存以生产者消费者的形式组成多个 ring buffer。

读请求简单的由 Consensus Group 的 Leader 处理,而写请求则涉及所有的 Replica。下图展示了写 I/O 的流程:

write io excution flow

  1. PolarDB 通过 libpfs 和 PolarSwitch 之间的 ring buffer 将 I/O 请求发送给 PolarSwitch
  2. PolarSwitch 通过缓存的元数据信息将 I/O 转发给对应的 Chunk 的 Leader 节点
  3. Leader Node 通过 RDMA 语义操作(通常是单边操作)将 I/O 放到预注册好的 buffer 中,并将请求放到请求队列,一个专用的线程循环从该队轮询消息
  4. 请求通过 SPDK 被写入日志并且通过 RDMA 发送到 Follower 节点
  5. 在 Follower 节点,RDMA NIC 将 Leader 节点发来的 I/O 请求放到与注册好的 buffer 中,并将请求放到请求队列
  6. Follower 节点也有一个专用的线程从队列轮询将请求通过 SDPK 异步写入磁盘
  7. Follower 将写入成功的消息通过 RDMA 发送给 Leader
  8. Leader 节点收到多数 Follower 返回的写入成功后,Leader 通过 SDPK 将写请求写入磁盘(步骤4只写了日志)
  9. Leader 通过 RDMA 回复 PolarSwitch
  10. PolarSwitch 回复客户端

从上述过程可以看出 PolarFS 使用了大量的 bypass kernel 的技术。

ParallelRaft

这部分只是简单浏览了一下,以后有机会再详细拜读。

References:

[1] PolarFS:面向云数据库的超低延迟文件系统(发表于VLDB 2018) by 鸣嵩

storage

kv store

Dynamo: Amazon’s Highly Available Key-value Store

ACM SIGOPS Operating Systems Review, Volume 41, Issue 6 • December 2007 • pp 205–220

https://dl.acm.org/doi/10.1145/1323293.1294281

亚马逊电子商务平台运行在由数以万计的服务器和网络组件、分布在世界各地的数据中心构成的基础设施之上,在这样的规模下硬件故障需要被视为常态,这驱动了软件系统可靠性和可扩展性的设计。Dynamo 作为一个高可用(always writable)、高可扩展的 kv 存储系统,为亚马逊的一些核心服务提供永远在线(always-on)状态存储能力。

Dynamo is used to manage the state of services that have very high reliability requirements and need tight control over the tradeoffs between availability, consistency, cost-effectiveness and performance.

亚马逊平台有很多应用服务只需要通过主键访问数据存储系统,一个典型的例子就是购物车服务。Dynamo 将键和值都视为字节数组,通过 MD5 算法将主键映射为一个 128 比特的标识符,并暴露了两个接口:

  • get(key): {object, context}
  • put(key, context, object): void

context 作为 object 的元数据和 object 一起存储,它编码了 object 的版本信息。

Dynamo 的主要贡献在于评估了如何将不同的的技术结合起来提供一个高可用性系统。下表展示了 Dynamo 使用的技术及其优势:

Summary of techniques used in Dynamo and their advantages

Partitioning

一致性 Hash 算法来将数据分布到不同的存储节点,在此基础之上,Dynamo 将节点(node)映射为多个虚拟节点(vnode),每个 vnode 对应一致性 Hash 算法环上的一个点。使用 vnode 的优势有:

  • If a node becomes unavailable, the load handled by this node is evenly dispersed across the remaining available nodes.
  • When a node becomes available again, or a new node is added to the system, the newly available node accepts a roughly equivalent amount of load from each of the other available nodes.
  • The number of virtual nodes that a node is responsible can decided based on its capacity, accounting for heterogeneity in the physical infrastructure.

Replication

为了实现高可用性和持久性,Dynamo 将数据复制到 N 个节点(vnode),N 是每个 Dynamo 实例的配置项。每条数据的 key 在哈希环上映射的第一个 vnode 作为它所负责范围的 coordinator,与后续的 N - 1 个 vnode 一起构成存储该键值对的 preference list。为了使数据分布在 N 个物理节点上,preference list 被设计为跳过已包含物理节点上的其它 vnode。

Data Versioning

Dynamo 被设计为一个最终一致性存储,也就是说更新最终会到达所有副本。在节点失效或网络分区的情况下,为了保证写可用,Dynamo 将每次修改的结果视为一个新的不可变的数据版本,系统同一时刻可能存在同一数据的不同版本。

Most of the time, new versions subsume the previous version(s), and the system itself can determine the authoritative version (syntactic reconciliation).

However, version branching may happen, in the presence of failures combined with concurrent updates, resulting in conflicting versions of an object.

Dynamo 使用向量时钟来确定同一数据对象不同版本之间的因果关系。

If the counters on the first object’s clock are less-than-or-equal to all of the nodes in the second clock, then the first is an ancestor of the second and can be forgotten.

Otherwise, the two changes are considered to be in conflict and require reconciliation.

当 Dynamo 更新一个数据对象的时候,它需要指定它要更新的数据版本(vector clock 信息),这些数据包含在与 object 一同保存的 context 中。在处理一个读取请求的时候,如果 Dynamo 获取了多个不能协调的分支,会返回多个版本的数据,由 client 端来决使用什么策略来协调多个数据版本(merge or last write win)。下图展示了使用向量时钟的数据版本的演进过程:

Version evolution of an object over time

使用向量时钟的一个问题是当节点失败或网络分区发生的时候,写操作需要 preference list 之外的节点接管,这会造成 vector clock 的大小增长,Dynamo 使用了一种 clock truncate 的模式来解决此问题。

Sloppy Quorum Protocol

传统的 Quorum 有两个关键的可配置值:R 和 W:

  • R 是参与读操作必须成功的最小节点数
  • W 是参与写操作必须成功的最小节点数

设置 R + W > N 能保证读和写覆盖,可用于实现强一致性。

传统的 Quorum 协议在节点失败和网络分区的的时候会造成系统不可用。Dynamo 使用 Sloppy Quorum 来解决此问题。

All read and write operations are performed on the first N healthy nodes from the preference list, which may not always be the first N nodes encountered while walking the consistent hashing ring.

使用 Hinted Handoff,Dynamo 确保读取和写入操作不会因临时节点或网络的失败而失败,Sloppy Quorum 是 Dynamo always writable 的关键。

Anti-entropy Using Merkle trees

Hinted Handoff 不可用是,不同节点需要同步数据以保证副本的一致性,Dynamo 使用 Merkle tree 来更快地检测副本之间的不一致并是需要传输的数据量最小化。

Each node maintains a separate Merkle tree for each key range (the set of keys covered by a virtual node) it hosts. This allows nodes to compare whether the keys within a key range are up-to-date.

Membership

Dynamo 的最终一致性不仅体现在数据上,其节点成员信息通过 Gossp-based 协议达成最终一致性。

A gossip-based protocol propagates membership changes and maintains an eventually consistent view of membership. Each node contacts a peer chosen at random every second and the two nodes efficiently reconcile their persisted membership change histories.

数据分区和放置信息也是通过 Gossip-based 协议进行传播,以保证每个存储节点知道其对等方处理的数据范围。

Dynamo 通过一些节点扮演 seed 的角色来防止 logically partitioned ring

References:

[1] Review of the Amazon Dynamo Paper by Lindsey Kuper

Kudu: Storage for Fast Analytics on Fast Data

File Systems Unfit as Distributed Storage Backends: Lessons from 10 Years of Ceph Evolution

SOSP 2019

https://doi.org/10.1145/3341301.3359656

很多分布式文件系统选择将其存储后端构建在本地文件系统之上,如 ext4, XFS,原因在于:

  • 将持久化及磁盘空间管理的工作交给了健壮的内核代码
  • 本地文件系统提供了 Posix 接口及文件目录抽象
  • 可以使用标准工具(ls, find)来查看数据内容

Ceph 也不例外,在其架构演进的进程中前后使用过 Btrfs,XFS 等文件系统作为其 FileStore 的后端存储,但在遵循这种惯例 10 年后,发现以本地文件系统作为存储后端有着昂贵的代价:

  • developing a zero-overhead transaction mechanism is challenging
  • metadata performance at the local level can significantly affect performance at the distributed level
  • supporting emerging storage hardware is painstakingly slow

第一,在现有的文件系统之上实现高效的事务非常困难(性能开销、功能限制、接口复杂度或者实现难度),有三种可做尝试的方法:

  1. hooking into a file system’s internal (but limited) transaction mechanism
  2. implementing a WAL in user space
  3. using a key-value database with transactions as a WAL

Ceph 在尝试上述三种方式时都遇到了一些问题,如使用文件系统内置的事务机制会造成数据部分提交;在用户态实现 WAL 有 Read-Modify-Write、操作非幂等引起进程重启后数据不一致、数据双写等问题;使用 KV数据库作为 WAL 的 NewStore 解决了用户态 WAL 的问题,但由于每个事务需要先写数据再写元数据,分别对应一次 fsync,而对于 fsync,文件系统的行为是先 flush 数据再 flush 元数据,这就造成了在 NewStore 中写一个对象导致了四次昂贵的刷盘操作,典型的 journaling of journal 问题。

第二,元数据的性能低效会扩散到整个集群。Ceph 团队遇到的一个挑战是 Posix 文件系统的 readdir 操作返回结果是乱序的,为了保证排序的高效,需要对目录进行分裂,这就会影响全局性能。

第三,由于本地文件系统通常运行在内核态,当 Ceph 想要支持一个新的硬件(如 SMR、ZNS)的时就需要等待文件系统维护者的排期。

在 2015 年,Ceph 开始设计并实现 BlueStore —— 一个用户态的存储后端,数据直接存储在裸盘之上,元数据存储在 kv 中。它实现了如下目标:

  • Fast metadata operations
  • No consistency overhead for object writes
  • Copy-on-write clone operation
  • No journaling double-writes
  • Optimized I/O patterns for HDD and SSD

为了达到快速元数据操作,BlueStore 使用 RocksDB 来存取元数据。为了能让数据和元数据共存底层设备,BlueStore 实现了一个能运行 RockesDB 的极简文件系统 BlueFS,BlueFs 具有一下特点:

  • 为每个文件维护一个 inode,inode 记录了文件使用的 extent 数组
  • 目录只有一层,不存在目录嵌套
  • 元数据都在内存(super block、dirmap、filemap)
  • super block 存储在 offset = 4096 的磁盘位置,其中保存了 ino = 1 的 journal 文件 inode
  • 所有对文件系统的更改操作(如 mkdir, open, delete)都保存在 ino = 1 的文件中
  • 通过回放来获取 Dir/File 视图,然后通过 fileMap 来重构 Allocator 的可用块

通过这种方式,journal 和其他文件的 extent 是交错存放在整个磁盘上的,而磁盘的分配由 Space Allocator 统一管理。

对于大块写入,BlueStore 首先将数据写入新分配的 extent 中,一旦数据被持久化,相应的元数据被插入到 RocksDB,如此可以高效地支持克隆操作并能避免日志双写;对于小于 allocation size 的写入,数据和元数据都被插入 RocksDB,然后再异步写入磁盘。

BlueStore 实现为用户态并通过 DirectIO 直接访问裸盘,因此不实用 kernel 提供的 page cache。为了能提供高效的读写,BlueStore 是实现了它自己的 write-through cache,使用 2Q 算法并对数据进行 shard 来支持并发。

由于控制了所有的IO路径,Ceph 能更灵活高效地提供用户需要的功能特性。

论文中在开头部分提到:

  • Stonebraker: operating systems offer all things to all people at much higher overhead
  • exokernels demonstrated that customizing abstractions to applications results in significantly better performance

Ceph 在其演进过程中体会到了定制抽象层的重要性,对于云计算,byass kernel 也是很多产品的竞争高地。