聊一聊Spark写文件的机制--如何保证数据一致性 - 茅庐 - CSDN博客

通常情况下,我们在单机上写文件时,都会生成一个指定文件名的文件,而调用Spark DataFrame的write接口来写文件时,所得到的结果却与此不同。

如下图右侧所示,其写入了3个数据文件在指定的路径下。

为什么会这样呢?这与Spark的执行方式有关。Spark是分布式计算系统,其RDD中的数据是分散在多个Partition中的,而每个Partiton对应一个Task来执行,这些Task会根据vcores个数来并行执行。在上图的示例中,分配了3个Partition,所以生成了part-00000、part-00001、part-00002三个文件(文件名中间的一段UUID是在job中生成的)。按照这样的执行方式,假设我们直接把数据写入到指定的路径下,会出现哪些问题?

由于是多个Task并行写文件,如何保证所有Task写的所有文件要么同时对外可见,要么同时不可见?在上图示例中,三个Task的写入速度是不同的,那就会导致不同时刻看到的文件个数是不一样的。另外,如果有一个Task执行失败了,就会导致有2个文件残留在这个路径下。 同一个Task可能因为Speculation或者其他极端原因导致某一时刻有多个Task Attempt同时执行,即同一个Task有多个实例同时写相同的数据到相同的文件中,势必会造成冲突。如何保证最终只有一个是成功的并且数据是正确的?

上述的问题都与数据一致性有关,为了应对这些问题,尽可能保证数据一致性,Hadoop FileOutputCommitter设计了Rename机制(Spark写文件还是调用Hadoop的相关库来完成的)。Rename机制先后有两个版本:v1和v2,二者在性能和保证数据一致性的粒度上有所区别。

下图所示为v1的思想,其需要经历两次Rename。每个Task首先将数据写入如下临时路径 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName} , 例如 hdfs:///data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

Task写入完成后,执行commitTask做第一次Rename,将文件从Task Attempt的临时目录中移动到Task的临时目录 ${output.dir.root}/_temporary/${appAttempt}/${task}/${fileName} 中。例如 hdfs:///data/_temporary/0/task_20190219101232_0003_m_000005/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

最后,当所有Task都完成上述操作后,由Driver负责执行commitJob做第二次Rename,依次将文件从每个Task的临时目录中移动到真实目录 ${output.dir.root}/${fileName} 中,并写入_SUCCESS标识。 如 hdfs:///data/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

v1的思想较好的解决了前面提到的问题,基于此,只有在Rename的过程中出问题才可能导致数据一致性问题,然而这种概率相比之前提到的情况要低很多。但是,两次Rename也带来了性能问题,主要表现在:当有大量Task写入时,即使所有Task都完成了,还需要等待很长一段时间Job才能结束,这个时间主要花在Driver端做第二次Rename。

于是,就有了下图所示的v2思想。相比v1而已,主要去除了在commitJob中做第二次Rename来提高性能,但是牺牲了一部分一致性。在v2中,如果部分Task已执行成功,而此时Job失败了,就会导致有一部分数据对外可见了,需要数据的消费者自己根据是否有_SUCCESS标记来判断其完整性。