何种场景下Elasticsearch会丢失数据?

2015年08月05日 作者: 刘成鹏

大家关注分布式系统可靠性的话,也许读过Call me maybe这个博客系列。博主Kyle Kingsbury用自己开发的Jepsen测试过多种分布式系统(NoSQL、Message Queue等),其中包括我们常用的Elasticsearch(简称ES),先后针对ES 1.1.0和ES 1.5.0测过两轮。得到相同的悲观结论:因为可靠性原因,ES不能作为主存储。具体来说,ES节点在录入数据时,如果进程由于软硬件原因崩溃,即使重新启动,一段时间内录入的数据也会丢失无法恢复。所以不适合要求数据100%不丢失的使用场景。

这个问题不止Kyle Kingsbury发现过,Elasticsearch Github Issues里其他用户同样咨询过这个问题。ES committer Shay Banon解释如下:

kimchy commented The thought process we had is that the most common deployments of ES when its introduced to be used is next to a database, a replay-able upstream, or typically in a logging/metrics use case, where the default expected behavior of ES is to be semi geared towards faster insertion rate by not fsync'ing each operation (though one can configure ES to do so).

也就是:ES确实不能作为100%可靠的主存储,我们设计时候目标是高写入速度,大家应该搭配个100%可靠的数据库(或者其他啥机制)作为主存储机制。但这段话说得很含糊,只说和fsync相关,具体fsync的每个操作是指啥?要弄清楚这个问题,需要对机制作分析。下面是我们对ES 1.7.0源码的跟踪结果:

ES添加一条记录(即index)的过程

数据恢复的两种方式

  1. 分布式系统设计中的一个重要目标:系统可以从部分失效中自动恢复,而且不会严重地影响整体性能。特别是,当故障发生时,分布式系统应该可以一边进行恢复,一边降低服务质量继续操作。在容错的分布式系统中,从错误中恢复需要系统事先把它的状态保存到硬盘等存储中。它被称为快照,一般为减少空间,记录的是从上个状态点开始的增量变化。Elasticsearch中的translog(transaction log,记录ES的每一条增删改记录操作)文件存储着这样的信息。

  2. 为了增强系统的可靠性和提高性能,分布式系统要对数据进行备份,因为如果一个文件系统已经实现数据多备份,那么当一个备份被破坏后,文件系统只需转换到另一个数据备份就可以继续运行下去。故障节点恢复时也可以从其他节点上的备份得到正确数据,来达到最终容错的目的。数据多备份另一个作用是防止数据破坏。ES中数据单位是shard,每个shard有配置指定的多个replica,即备份,存储在多个节点上。

也就是说:ES数据要做得能恢复,必须要么在本节点以translog存储到磁盘,要么以replica方式备份到其他节点。

备注

备份导致了一致性问题:每当一个备份更新后,该备份就变得与其他备份不同,为了保持各个备份一致,分布式系统需要以一种让人注意不到的暂时的不一致方式存储。而这种一致性处理不适当,也可能会导致数据的丢失。此中情况暂时超出文本内容,以后文章会讨论。

ES新增一条记录的代码分析

当ES通过Netty网络框架得到一条加记录请求时,并经过很多解析,最终启动在TransportShardResplicationOperationAction中的线程,启动代码,如下:

而对于我们的问题,在这个线程中找到一部分答案。如下图所示:

主Shard数据操作分析

数据在主Shard插入时,先得到相关数据(比如这条记录的id)并添加入Lucene的内存,在符合一定条件下存入磁盘,默认条件如下原代码:

当translog满足任意阈值,或者Lucene超过flushThresholdPeriod时间没有提交,那么,ES 才会启动一线程将Lucene增加的数据存入磁盘,数据算稳定。默认情况translog也不是直接存入文件中的,如下代码

syncOnEachOperation = false (FsTranslog.java 中 81行),translog 会准备 64KB(FsTranslog.java 中116行)的内存,来暂存与客户输入相关数据,以减少IO的消耗。当然,并不是一定要累计到超过64KB的数据才会存储入 translog,ES 每隔 5s 有一个线程将translog 内存中的数据存入translog文件中,代码如下:

当然,我们也可以不用默认方式,在配置文件(config/elasticsearch.yml)中添加一句:index.gateway.local.sync: 0,syncOnEachOperation 会为 true,translog 也不会准备 64KB 这样的内存存储输入数据。

如上所述,translog中的数据和Lucene中的数据,一段时间内都不会直接存入文件,而是保存在内存中。此时节点宕机重启动,数据就可能会失去。通过数据复制未必都能挽救成功。

数据复制

由时序图可知,数据插入后进行才进行数据复制,那此时宕机数据是否直接丢失?不会。如果真出现这种情况,主节点就会回应客户端插入失败,客户可以选择稍后重新发起请求。 所以分析数据复制,也是有必要的,当然此时的逻辑有点乱,需要仔细的想这个过程。提示:因为插入数据时,此节点为主节点,但是经过一系列的操作后, 可能发送数据时发现自己主节点的头衔就被别人霸占的现象(比如此节点在插入时,因为网络原因没有和其他节点维持心跳,其他节选出新的主节点)。但是最终是通过调用Netty的ChannelFuture将数据传送出去,如下源代码:

由上面的代码,buffer是由Netty的ChannelFuture 来完成发送任务的,是个异步事件。由于数据复制是异步的,所以不会阻塞,客户端会由

listener.onResponse(response.response());

发送给客户,但是可能在客户得到回应时,复制过程还没有完成,所以在此时节点出现宕机,数据丢失。

TIP

ChannelFuture:异步结果,这个是Netty异步事件处理的关键,当一个事件被处理时,可以直接以ChannelFuture的形式直接返回,不用在当前操作中被阻塞。可以通过 ChannelFuture得到最终的执行结果,具体的做法是在ChannelFuture添加监听器listener,当操作最终被执行完 后,listener会被触发,我们可以在listener的回调函数中预定义我们的业务代码

总结

由上可知,存储是异步由内存存入磁盘,不能保证数据不被丢失,而复制也是也不发送,节点没有得到成功传输的答复就回应了客户端,更别说在复制结点上是否能正确存储,所以更不能保证数据不被丢失。

性能还是可靠性?

通过把Translog 文件可以直接写入磁盘,能避免数据在本文中可能出现的丢失情况。但是后果就是Shay Banon所说的牺牲了高写入速度。

那么究竟直接写入磁盘会对性能有多大影响呢?我写了个脚本测试,写入10000条小的数据,速度减小了7~8倍,损失很大。

解决这个问题有如下几种办法:

  1. 不算办法的办法就是忽视这个问题。大部分场景送入ES的日志并不关键,数据丢失无妨。或者ES只是作为方便文本查询的辅助工具,主数据在更可靠的存储媒介上。比如HBase一些方案就是通过Coprecessor让ES为文本建立倒排索引。这种场景下也能容忍数据丢失。

  2. 乐观数据恢复机制。数据送入ES后仍然保留一段时间,监控ES宕机事件,ES重启后用查询判断最后输入数据的位置,从下一条数据开始重送。这也是我们目前的做法。

  3. 重新设计ES这块的逻辑,毕竟很多NoSQL能同时做到高可靠和高写入。具体方案我们目前还在研究中。