PINGO是一个由百度大数据部与百度美国研发中心合作而成的分布式交换查询平台。在PINGO之前,百度的大数据查询作业主要由基于Hive的百度QueryEngine去完成。QueryEngine很好的支持着百度的离线计算任务,可是它对交互式的在线计算任务支持并不好。为了更好的支持交互式任务,我们在大约一年前设计了基于SparkSQL与Tachyon的PINGO的雏形。在过去一年中, 通过跟不同业务的结合,PINGO逐渐的演变成一套成熟高效的交互式查询系统。本文将详细介绍PINGO的架构迭代过程以及性能评估。
QueryEngine是基于Hive的百度内部的大数据查询平台,这套系统在过去几年中较好的支撑了百度的相关业务。 图1展示了QueryEngine的架构图,其服务入口叫做Magi。用户向Magi提交查询请求, Magi为这次请求分配一个执行机, 执行机会调用Hive读取Meta信息并向Hadoop队列提交任务. 在这一过程中, 用户需要自行提供计算需要的队列资源。随着近几年对大数据平台的要求越来越高, 我们在使用QueryEngine过程中也发现了一些问题:首先QueryEngine需要由用户提供计算资源, 这使得数据仓库的用户需要去了解Hadoop以及相关的管理流程, 这增加了用户使用数据的门槛。 第二, 对于很多小型计算任务而言, MR的任务的起动时间也较长, 往往用户的计算任务只需要1分钟, 但是排队/提交任务就需要超过一分钟甚至更长的时间。这样的结果是,QueryEngine虽然很好的支持线下执行时间长的任务,但是对线上的一些交换式查询任务(要求延时在一到两分钟内)确是无能为力。
为了解决这些问题, 在大约一年前,我们尝试在离线计算的技术栈上搭建起一套具有在线服务属性的SQL计算服务 PINGO。如图2所示: PINGO使用了SparkSQL为主要的执行引擎, 主要是因为Spark具有下面的特点:
在过去一年中,PINGO从一个雏形开始,通过跟不同业务的结合,逐渐的演变成一套成熟高效的系统。中间经历过几次的架构演变,在本章中,我们将详细介绍PINGO的迭代过程。
PINGO初版的目标是提升性能,让其能支持交互式查询的任务。由于Spark是基于内存的计算引擎,相对于Hive有一定的性能优势, 所以第一步我们选择使用Spark SQL。为求简单,最初的服务是搭建在Spark Standalone集群上的。我们知道, Spark在Standalone模式下是不支持资源伸缩的, 一个Spark Application在启动的时候会根据配置获取计算资源。 这时就算一个查询请求只有一个Task还在运行, 该Application所占用的所有资源都不能释放。好在一个Spark Application可以同时运行多个Job, 每个Job都能够’平分’其所占用的计算资源。
基于上面的考虑, 如图3所示,我们利用Spark的Standalone集群搭建了第一版PINGO服务. 我们的主服务节点叫做Operation Manager, 它本身也是一个Spark Driver。所有用户的请求都会从Magi Service发送到Magi Worker, 再由Magi Worker分配给Operation Manager, 通过Operation Manager在Spark集群上执行。
通过PINGO 1.0, 我们把数据仓库的计算引擎从Hive/MR转变成了Spark。 不过, 单纯的替换计算引擎, 也许可以把查询的性能提升1-2倍, 但是并不会使查询的速度有数量级上的提高. 想要从本质上提高查询速度, 我们还需要改进存储。对此我们设计了PINGO 1.1, 加入了以Tachyon为载体的缓存系统,并在缓存层上搭建了缓存管理系统ViewManager, 进一步提升系统性能。
很多快速的查询系统都将Parquet之类列存储格式和分布式KeyValue存储引擎结合起来, 通过建立索引/物化视图等手段加速SQL查询的速度. 通过将部分查询条件下推到存储引擎, 某些SQL查询语句甚至可以被提速至100倍以上。然而我们的数据都存储在旧版本的Hive数据仓库中, 文件系统被限定为HDFS, 数据格式一般为低版本的ORC File, 也有不少表是ORCFile或者文本。 因为迁移成本/数据上下游依赖等兼容性等原因, 我们没有办法更改Hive数据仓库本身的存储.
不过根据局部性原理, 任何数据访问都有热点. 我们可以建立缓存系统, 在缓存系统中应用最新的文件系统和存储格式. 通过把热点输入通过预加载的方式导入到缓存系统, 我们可以加速整个系统的访问性能.为此, 我们设计了以下的系统架构:
在这个架构中, 我们引入了一个新模块 ViewManager, 该模块负责管理缓存系统。 它的主要功能是识别热点数据, 将数据导入到缓存系统中, 并在查询时改变SQL的执行过程, 使得Spark从缓存而不是原始位置读取数据。在这个架构下,当一个Query进来时,会先向OperationManager请求。当接受到请求后,OperationManager会向ViewManager查询数据是否已经在缓存中。如果已经在缓存,数据将被直接返回给用户, Query完成。如果数据不在缓存中,OperationManager会向底层Data Warehouse发起请求, 等数据到达时返回給用户。同时,ViewManager也会向底层Data Warehouse发起请求, 等数据到达时建立一个Cache Entry, 这样的话下次同样Query进来时,就会从缓存读数据。注意这里我们OperationManager与ViewManager对会向底层Data Warehouse的数据读取是两个独立的STREAM, 这样保证两者不会互相干扰。
那PINGO又是如何动态去读取缓存中或者底层Data Warehouse的数据的呢?毕竟在Query Plan中,数据地址是写死的。 为了能够让缓存对用户透明, 我们在SparkSQL上做了一些改进, 如下图所述
Spark中利用Catalyst框架来执行SQL语句。 对于Catalyst框架来说, 其输入是Unresolved Logical Plan, 这可以简单理解为SQL语句的结构化描述。 Catalyst调用Catalog来向MetaService查询表的元数据, Catalog返回MetastoreRelation来代表Hive表, 其中含有读取该表的所有必要信息,以便后续的解析和优化。 而在执行时, Catalyst会把MetastoreRelation转化为HiveTableScan, 来完成对数据的读取。
为了实现对缓存的透明利用, 我们在其中两个地方了做了扩展。 首先我们在Catalog中为我们缓存的表返回CachableRelation来代替MetastoreRelation。 而在将LogicalPlan转变为真正执行的PhysicalPlan时, 我们会把CachableRelation翻译为两种TableScan的Union, 一个针对那些被缓存住的数据, 另外一个针对那些没有被缓存住的数据。
通过这种方式, 我们能够做到在用户不感知的情况下, 完成对数据仓库存储层的替换和优化。 目前我们做的仅仅是改变存储的文件系统和格式, 将来也会将同样的实现思路应用到索引和物化视图上。
PINGO 1.1服务很好的提高了系统性能, 但是在运营了一段时间之后, 我们逐渐感觉到Spark的集群管理问题正在成为整个系统的瓶颈. 这具体表现在两个方面
发现这些问题是好事,说明系统已经逐渐成熟,开始往怎么更好的运维的方向发展了。 为了解决上面的问题, 我们对系统进行了进一步的迭代. 迭代后的架构如下图所示:
在这一版架构中, 我们将PINGO的服务和调度功能独立出来, 与真正执行计算的部分剥离。 支持了单一查询入口PinoMaster进行调度, 多个集群Pingo Applicatoin执行计算的多集群功能。 PingoMaster同时维护多个Spark Application。 当一个Query到来时, 我们会根据集群的归属/存储的位置选择一个最优的Application执行任务。 另外, 这一版本的PINGO还支持在yarn集群上起动Application。 百度内部有自己的资源管理系统, 提供了和yarn兼容的接口. 通过迁移PINGO服务到yarn上避免了原本Standalone版本需要的很多运维工作, 并且可以通过定期重启Application来解决Spark的可靠性问题。
为了能够在PINGO中实现更好的调度策略, 我们也对Spark SQL进行了深度扩展.
当Pingo收到一个Query后, 我们在Master端就完成对这条Query的分析和部分优化, 并将这些信息保存到QueryContext中。 当SQL在Application端真正执行的时候, 我们从Master端而不是Meta端拿到执行所需要的信息. 这样做的好处是可以在根据Query的内容来做出调度. 基于这个执行流程, 目前我们开发了两个比较有效的调度策略:
在上一章中,我们详细介绍了PINGO架构的迭代,在本章中,我们重点看一下PINGO的性能。如图8所示,首先我们对比了使用Hive以及使用Spark作为计算引擎的性能。 这里的Benchmark选取的Query是百度内部交互式数据分析的典型负载, 主要是Join/Aggregate等操作, 输入的数据量从100M到2T左右.我们可以看到, Spark相比Hive有较大的性能优势。在比较大比较复杂的Query中, Spark取得了2到3倍的加速比。注意在这个对比中,内存资源有限,用的是64GB内存的机器,很多Spark的数据被迫落盘, 如果有更多内存,加速比会更高。
下一步我们了解一下加了缓存后的性能, 在这个实验中,我们使用了192GB的机器,有更多的内存去支持缓存以及内存计算。如图9所示,在这个环境下,使用Spark相对于Hive有大概5到7倍的提速。 在加入了Tachyon后,相对于Hive无缓存的情况有30到50倍的提速。我们相信在未来的几年内,内存价格会大幅降低,内存计算会变成主流,所以使用内存做缓存是一个正确的方向。
PINGO服务目前被应用在交互式查询场景中, 旨在为PM和RD提供快速的数据分析服务。 图10展示了在同样的计算资源下, 在生产环境使用PINGO前后的Query执行时间分布图。注意,在这个生产环境中,我们用的是64GB内存的机器, 为了提供足够的缓存空间,我们使用了Tachyon的Tiered Storage功能,让缓存分布在内存以及本地磁盘中。 我们可以看到, 在传统的基于Hive+MR的服务模式下, 只有1%左右的Query能够在两分钟内完成. 而采用了基于Spark的PINGO服务, 有50%+的Query可以在两分钟内执行完成。 能够取得这样的加速效果, 部分原因是Spark本身的执行效率比Hive要高一些。这个本身还有很大的优化空间,比如如果我们使用内存缓存的话,执行时间可以轻易的压缩到30秒内。
经过过去一年的迭代与发展,PINGO已经很好的支持了百度内部的交互式查询业务。通过PINGO,很多查询的延时从以前的30到40分钟,降低到了2分钟以内,很大的提高了查询者的工作效率。今后PINGO的发展将朝着更好用,已经更快两个方向发展。为了让PINGO更好用,我们正在PINGO之上开发UI, 让用户与图形界面交互(而不是通过输入SQL Query)就能轻易的查询到想要的数据。为了让PINGO更快,更有交互性,我们希望可以把90%以上的Query 时间降低到30秒以下。第一步就是要扩大我们的Cache的覆盖率与性能,设计出更好的缓存预取以及缓存替换机制,并且加速Cache读取的延时。第二步,我们也通过硬件加速的方法使用FPGA加速某些常用的SQL Operator。就在最近,我们对SQL的JOIN Operator进行了FPGA硬件加速,相对CPU取得了3倍的加速比。我们也会很快把这项技术使用到PINGO上。
作者介绍:
温翔,百度公司大数据部资深工程师。北京大学计算机本科以及硕士研究生毕业。曾在腾讯公司从事大数据平台开发。目前主要从事百度大数据交换式查询平台架构以及开发。
沈光昊,百度公司大数据部主任架构师。浙江大学计算机硕士。曾在Facebook工作。目前主要负责百度大数据部的总体架构。
蔡旻谐,百度美国研发中心研发架构师。台湾大学毕业,加州大学洛杉矶分校(UCLA)计算机硕士,曾任职于雅虎微软从事搜索广告平台核心设计研发。目前任职于百度美研基础架构部,主要从事分布式计算平台Spark与深度学习平台的研发设计工作。
徐宝强,百度公司大数据部高级项目经理。牛津大学计算机硕士。目前主要从事大数据平台项目管理工作。
刘少山,百度公司美国研发中心高级架构师。加州大学欧文分校计算机博士。曾在LinkedIn, Microsoft, Microsoft Research, INRIA, Intel以及Broadcom工作。目前主要从事百度深度学习, 大数据,以及异构计算平台架构与开发。
关于:中科研拓
深圳市中科研拓科技有限公司专注提供软件外包、app开发、智能硬件开发、O2O电商平台、手机应用程序、大数据系统、物联网项目等开发外包服务,十年研发经验,上百成功案例,中科院软件外包合作企业。通过IT技术实现创造客户和社会的价值,致力于为用户提供很好的软件解决方案。联系电话400-0316-532,邮箱sales@zhongkerd.com,网址www.zhongkerd.com