首页 » 智能 » Intel李锐:Hive on Spark解析_用户_数据

Intel李锐:Hive on Spark解析_用户_数据

落叶飘零 2024-12-15 02:05:51 0

扫一扫用手机浏览

文章目录 [+]

Hortonworks于2013年提出将Tez作为另一个打算引擎以提高Hive的性能。
Spark则是最初由加州大学伯克利分校开拓的分布式打算引擎,借助于其灵巧的DAG实行模式、对内存的充分利用,以及RDD所能表达的丰富语义,Spark受到了Hadoop社区的广泛关注。
在成为Apache顶级项目之后,Spark更是集成了流处理、图打算、机器学习等功能,是业界公认最具潜力的下一代通用打算框架。
鉴于此,Hive社区于2014年推出了Hive on Spark项目(HIVE-7292),将Spark作为继MapReduce和Tez之后Hive的第三个打算引擎。
该项目由Cloudera、Intel和MapR等几家公司共同开拓,并受到了来自Hive和Spark两个社区的共同关注。
目前Hive on Spark的功能开拓已基本完成,并于2015年1月初合并回trunk,估量会在Hive下一个版本中发布。
本文将先容Hive on Spark的设计架构,包括如何在Spark上实行Hive查询,以及如何借助Spark来提高Hive的性能等。
其余本文还将先容Hive on Spark的进度和操持,以及初步的性能测试数据。

背景

Hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个打算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行打算。
通过该项目,可以提高Hive查询的性能,同时为已经支配了Hive或者Spark的用户供应了更加灵巧的选择,从而进一步提高Hive和Spark的遍及率。

Intel李锐:Hive on Spark解析_用户_数据 智能

在先容Hive on Spark的详细设计之前,先大略先容一下Hive的事情事理,以便于大家理解如何把Spark作为新的打算引擎供给Hive利用。

在Hive中, 一条SQL语句从用户提交到打算并返回结果,大致流程如下图所示(Hive 0.14中引入了基于开销的优化器(Cost Based Optimizer,CBO),优化的流程会略有不同。

图1:SQL语句实行流程

语法剖析阶段,Hive利用Antlr将用户提交的SQL语句解析成一棵抽象语法树(Abstract Syntax Tree,AST)。
天生逻辑操持包括通过Metastore获取干系的元数据,以及对AST进行语义剖析。
得到的逻辑操持为一棵由Hive操作符组成的树,Hive操作符即Hive对表数据的处理逻辑,比如对表进行扫描的TableScanOperator,对表做Group的GroupByOperator等。
逻辑优化即对Operator Tree进行优化,与之后的物理优化的差异紧张有两点:一是在操作符级别进行调度;二是这些优化不针对特定的打算引擎。
比如谓词下推(Predicate Pushdown)便是一个逻辑优化:尽早的对底层数据进行过滤以减少后续须要处理的数据量,这对付不同的打算引擎都是有优化效果的。
天生物理操持即针对不同的引擎,将Operator Tree划分为多少个Task,并按照依赖关系天生一棵Task的树(在天生物理操持之前,各打算引擎还可以针对自身需求,对Operator Tree再进行一轮逻辑优化)。
比如,对付MapReduce,一个GROUP BY+ORDER BY的查询会被转化成两个MapReduce的Task,第一个进行Group,第二个进行排序。
物理优化则是各打算引擎根据自身的特点,对Task Tree进行优化。
比如对付MapReduce,Runtime Skew Join的优化便是在原始的Join Task之后加入一个Conditional Task来处理可能涌现倾斜的数据。
末了按照依赖关系,依次实行Task Tree中的各个Task,并将结果返回给用户。
每个Task按照不同的实现,会把任务提交到不同的打算引擎上实行。

总体设计

Hive on Spark总体的设计思路是,尽可能重用Hive逻辑层面的功能;从天生物理操持开始,供应一整套针对Spark的实现,比如SparkCompiler、SparkTask等,这样Hive的查询就可以作为Spark的任务来实行了。
以下是几点紧张的设计原则。

尽可能减少对Hive原有代码的修正。
这是和之前的Shark设计思路最大的不同。
Shark对Hive的改动太大以至于无法被Hive社区接管,Hive on Spark尽可能少改动Hive的代码,从而不影响Hive目前对MapReduce和Tez的支持。
同时,Hive on Spark担保对现有的MapReduce和Tez模式在功能和性能方面不会有任何影响。
对付选择Spark的用户,应使其能够自动的获取Hive现有的和未来新增的功能。
尽可能降落掩护本钱,保持对Spark依赖的松耦合。

基于以上思路和原则,详细的一些设计架构如下。

新的打算引擎

Hive的用户可以通过hive.execution.engine来设置打算引擎,目前该参数可选的值为mr和tez。
为了实现Hive on Spark,我们将spark作为该参数的第三个选项。
要开启Hive on Spark模式,用户仅需将这个参数设置为spark即可。

以Hive的表作为RDD

Spark以分布式可靠数据集(Resilient Distributed Dataset,RDD)作为其数据抽象,因此我们须要将Hive的表转化为RDD以便Spark处理。
实质上,Hive的表和Spark的HadoopRDD都是HDFS上的一组文件,通过InputFormat和RecordReader读取个中的数据,因此这个转化是自然而然的。

利用Hive原语

这里紧张是指利用Hive的操作符对数据进行处理。
Spark为RDD供应了一系列的转换(Transformation),个中有些转换也是面向SQL的,如groupByKey、join等。
但如果利用这些转换(就如Shark所做的那样),就意味着我们要重新实现一些Hive已有的功能;而且当Hive增加新的功能时,我们须要相应地修正Hive on Spark模式。
有鉴于此,我们选择将Hive的操作符包装为Function,然后运用到RDD上。
这样,我们只须要依赖较少的几种RDD的转换,而紧张的打算逻辑仍由Hive供应。

由于利用了Hive的原语,因此我们须要显式地调用一些Transformation来实现Shuffle的功能。
下表中列举了Hive on Spark利用的所有转换。

对repartitionAndSortWithinPartitions 大略解释一下,这个功能由SPARK-2978引入,目的是供应一种MapReduce风格的Shuffle。
虽然sortByKey也供应了排序的功能,但某些情形下我们并不须要全局有序,其余其利用的Range Partitioner对付某些Hive的查询并不适用。

物理实行操持

通过SparkCompiler将Operator Tree转换为Task Tree,个中须要提交给Spark实行的任务即为SparkTask。
不同于MapReduce中Map+Reduce的两阶段实行模式,Spark采取DAG实行模式,因此一个SparkTask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。
实行SparkTask时,就根据SparkWork所表示的DAG打算出终极的RDD,然后通过RDD的foreachAsync来触发运算。
利用foreachAsync是由于我们利用了Hive原语,因此不须要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。

SparkContext生命周期

SparkContext是用户与Spark集群进行交互的接口,Hive on Spark该当为每个用户的会话创建一个SparkContext。
但是Spark目前的利用办法假设SparkContext的生命周期是Spark运用级别的,而且目前在同一个JVM中不能创建多个SparkContext(请参考SPARK-2243)。
这明显无法知足HiveServer2的运用处景,由于多个客户端须要通过同一个HiveServer2来供应做事。
鉴于此,我们须要在单独的JVM中启动SparkContext,并通过RPC与远程的SparkContext进行通信。

任务监控与统计信息网络

Spark供应了SparkListener接口来监听任务实行期间的各种事宜,因此我们可以实现一个Listener来监控任务实行进度以及网络任务级别的统计信息(目前任务级别的统计由SparkListener采集,任务进度则由Spark供应的专门的API来监控)。
其余Hive还供应了Operator级别的统计数据信息,比如读取的行数等。
在MapReduce模式下,这些信息通过Hadoop Counter网络。
我们可以利用Spark供应的Accumulator来实现该功能。

测试

除了一样平常的单元测试以外,Hive还供应了Qfile Test,即运行一些事先定义的查询,并根据结果判断测试是否通过。
Hive on Spark的Qfile Test该当尽可能靠近真实的Spark支配环境。
目前我们采取的是local-cluster的办法(该支配模式紧张是Spark进行测试时利用,并不打算让一样平常用户利用),终极的目标是能够搭建一个Spark on YARN的Mini Cluster来进行测试。

实现细节

这一部分我们详细先容几个主要的实现细节。

SparkTask的天生和实行

我们通过一个例子来看一下一个大略的两表JOIN查询如何被转换为SparkTask并被实行。
下图左半部分展示了这个查询的Operator Tree,以及该Operator Tree如何被转化成SparkTask;右半部分展示了该SparkTask实行时如何得到终极的RDD并通过foreachAsync提交Spark任务。

图2:两表join查询到Spark任务的转换

SparkCompiler遍历Operator Tree,将其划分为不同的MapWork和ReduceWork。
MapWork为根节点,总是由TableScanOperator(Hive中对表进行扫描的操作符)开始;后续的Work均为ReduceWork。
ReduceSinkOperator(Hive中进行Shuffle输出的操作符)用来标记两个Work之间的边界,涌现ReduceSinkOperator表示当前Work到下一个Work之间的数据须要进行Shuffle。
因此,当我们创造ReduceSinkOperator时,就会创建一个新的ReduceWork并作为当前Work的子节点。
包含了FileSinkOperator(Hive中将结果输出到文件的操作符)的Work为叶子节点。
与MapReduce最大的不同在于,我们并不哀求ReduceWork一定是叶子节点,即ReduceWork之后可以链接更多的ReduceWork,并在同一个SparkTask中实行。

从该图可以看出,这个查询的Operator Tree被转化成了两个MapWork和一个ReduceWork。
在实行SparkTask时,首先根据MapWork来天生最底层的HadoopRDD,然后将各个MapWork和ReduceWork包装成Function运用到RDD上。
在有依赖的Work之间,须要显式地调用Shuffle转换,详细选用哪种Shuffle则要根据查询的类型来确定。
其余,由于这个例子涉及多表查询,因此在Shuffle之前还要对RDD进行Union。
经由这一系列转换后,得到终极的RDD,并通过foreachAsync提交到Spark集群上进行打算。

运行模式Hive on Spark支持两种运行模式:本地和远程。
当用户把Spark Master URL设置为local时,采取本地模式;别的情形则采取远程模式。
本地模式下,SparkContext与客户端运行在同一个JVM中;远程模式下,SparkContext运行在一个独立的JVM中。
供应本地模式紧张是为了方便调试,一样平常用户不应选择该模式。
因此我们这里也紧张先容远程模式(Remote SparkContext,RSC)。
下图展示了RSC的事情事理。

图3:RSC事情事理

用户的每个Session会创建一个SparkClient,SparkClient会启动RemoteDriver进程,并由RemoteDriver创建SparkContext。
SparkTask实行时,通过Session提交任务,任务的主体便是对应的SparkWork。
SparkClient将任务提交给RemoteDriver,并返回一个SparkJobRef,通过该SparkJobRef,客户端可以监控任务实行进度,进行缺点处理,以及采集统计信息等。
由于终极的RDD打算没有返回结果,因此客户端只须要监控实行进度而不须要处理返回值。
RemoteDriver通过SparkListener网络任务级别的统计数据,通过Accumulator网络Operator级别的统计数据(Accumulator被包装为SparkCounter),并在任务结束时返回给SparkClient。

SparkClient与RemoteDriver之间通过基于Netty的RPC进行通信。
除了提交任务,SparkClient还供应了诸如添加Jar包、获取集群信息等接口。
如果客户端须要利用更一样平常的SparkContext的功能,可以自定义一个任务并通过SparkClient发送到RemoteDriver上实行。

理论上来说,Hive on Spark对付Spark集群的支配办法没有特殊的哀求,除了local以外,RemoteDriver可以连接到任意的Spark集群来实行任务。
在我们的测试中,Hive on Spark在Standalone和Spark on YARN的集群上都能正常事情(须要动态添加Jar包的查询在yarn-cluster模式下还不能运行,请参考HIVE-9425)。

优化

我们再来看几个针对Hive on Spark的优化。

Map Join

Map Join是Hive中一个很主要的优化,其事理是,如果参与Join的较小的表可以放入内存,就为这些小表在内存中天生Hash Table,这样较大的表只须要通过一个MapWork被扫描一次,然后与内存中的Hash Table进行Join就可以了,省去了Shuffle和ReduceWork的开销。
在MapReduce模式下,通过一个在客户端本地实行的任务来为小表天生Hash Table,并保存在本地文件系统上。
后续的MapWork首先将Hash Table上传至HDFS的Distributed Cache中,然后只要读取大表和Distributed Cache中的数据进行Join就可以了。

Hive on Spark对付Map Join的实现与MapReduce不同。
最初我们考虑利用Spark供应的广播功能来把小表的Hash Table分发到各个打算节点上。
利用广播的优点是Spark采取了高效的广播算法,其性能该当优于利用Distributed Cache。
而利用广播的缺陷是会为Driver和打算节点带来很大的内存开销。
为了利用广播,Hash Table的数据须要先被传送到Driver端,然后由Driver进行广播;而且纵然在广播之后,Driver仍须要保留这部分数据,以便应对打算节点的缺点。
虽然支持Spill,但广播数据仍会加剧Driver的内存压力。
此外,利用广播相对的开拓本钱也较高,不利于对已有代码的复用。

因此,Hive on Spark选择了类似于Distributed Cache的办法来实现Map Join(请参考HIVE-7613),而且为小表天生Hash Table的任务可以分布式的实行,进一步减轻客户真个压力。
下图描述了Hive on Spark如何天生Map Join的任务。

图4:Hive on Spark Join优化

不同于MapReduce,对付Hive on Spark而言,LocalWork只是为了供应一些优化时的必要信息,并不会真正被实行。
对付小表的扫描以独立的SparkTask分布式地实行,为此,我们也实现了能够分布式运行的HashTableSinkOperator(Hive中输出小表Hash Table的操作符),其紧张事理是通过提高HDFS Replication Factor的办法,使得天生的HashTable能够被每个节点在本地访问。

虽然目前采纳了类似Distributed Cache的这种实现办法,但如果在后期的测试中创造广播的办法确实能够带来较大的性能提升,而且其引入的内存开销可以被接管,我们也会考虑改用广播来实现Map Join。

Table Cache

Spark的一个上风便是可以充分利用内存,许可用户显式地把一个RDD保存到内存或者磁盘上,以便于在多次访问时提高性能。
其余,在目前的RDD转换模式中,一个RDD的数据是无法同时被多个下贱利用的(请参考SPARK-2688),当一个RDD须要通过不同的转换得到不同的子节点时,就要被打算多次。
这时,我们也该当利用Cache来避免重复打算。

在Shark和SparkSQL中,都许可用户显式地把一张表Cache来提高对该表的查询性能;对付Hive on Spark,我们也该当充分利用这一特性。

一个运用处景是Multi Insert查询,即同一个数据源经由运算后须要被插入到多个表中的情形。
比如以下查询。

在这种情形下,对应的SparkWork中,一个MapWork/ReduceWork会有多个下贱的Work,如果不进行Cache,那么共享的数据源就会被打算多次。
为了避免这种情形,我们会将这些MapWork/ReduceWork复制成多个,每个对应一个下贱的Work(请参考HIVE-8118),并对其共享的数据源进行Cache(由于IOContext的同步问题,该功能尚未完成,估量会在HIVE-9492中实现)。

更为一样平常的运用处景是一张表在查询中被利用了多次的情形,Hive on Spark目前还不会针对这种查询进行Cache,不过在后续的事情中会考虑采取自动的或者用户指定的办法来优化这种查询。

项目进度与操持

Hive on Spark最初在Hive的Git仓库中的spark分支下开拓,到2014年12月尾,已经完成功能开拓,Hive已有的各种查询基本都能支持。
2015年1月初spark分支被合并回trunk,估量会不才一个版本中发布(详细版本号待定)。

对付已经搭建好Hadoop和Spark集群的用户,利用Hive on Spark是比较随意马虎的,紧张是引入Spark依赖和进行恰当的配置即可,详细步骤可以参考Hive on Spark Getting Started Wiki(https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started)。
此外,Cloudera和Intel还在更早的时候供应了亚马逊AWS虚拟机镜像,以方便感兴趣的用户更加快捷的体验Hive on Spark(请参考Hands-on Hive-on-Spark in the AWS Cloud,http://blog.cloudera.com/blog/2014/12/hands-on-hive-on-spark-in-the-aws-cloud/)。

项目的下一阶段事情重点紧张在于Bug修复、性能优化,以及搭建基于YARN的Mini Cluster进行单元测试(请参考HIVE-9211)等。
为了担保效率,开拓事情仍将在spark分支上进行,希望理解项目最新进展的用户可以关注该分支。

初步性能测试

随着项目开拓靠近尾声,我们也已经开始对Hive on Spark进行初步的性能测试。
测试集群由10台亚马逊AWS虚拟机组成,Hadoop集群由HDP 2.2搭建,测试数据为320GB的TPC-DS数据集。
目前的测试用例有6条,包含了自定义的查询以及TPC-DS中的两条查询。
由于Hive紧张用于处理ETL查询,因此我们在TPC-DS中选取用例时,选取的是较为靠近ETL查询的用例(TPC-DS中的用例紧张针对交互型查询,Impala、SparkSQL等引擎更适宜此类查询)。
为了更有针对性,测试紧张是对Hive on Spark和Hive on Tez进行性能比拟,最新的一组测试数据如下图所示(鉴于项目仍在开拓中,该数据仅供参考)。

图5:Hive on Spark vs. Hive on Tez

图中横坐标为各个测试用例,纵坐标为所用韶光,以秒为单位。

总结

Hive on Spark由多家公司协作开拓,从项目开始以来,受到了社区的广泛关注,HIVE-7292更是有超过140位用户订阅,已经成为Hive社区中关注度最高的项目之一。
由于涉及到两个开源项目,Hive社区和Spark社区的开拓职员也进行了紧密的互助。
在开拓过程中创造的Spark的不敷之处以及新的需求得到了积极的相应(请参考SPARK-3145)。
通过将Spark作为Hive的引擎,现有的用户拥有了更加灵巧的选择。
Hive与Spark两个社区均将从中受益。

本文阐述了Hive on Spark的总体设计思想,并详细先容了几个主要的实现细节。
末了总结了项目进展情形,以及最新的性能数据。
希望通过本文能为用户更好的理解和利用Hive on Spark带来帮助。

关于作者:李锐,2013年取得复旦大学打算机运用技能专业硕士研究生学位,现任英特尔公司软件工程师,Hive Committer。

标签:

相关文章