Hive--执行流程和源码解析
Hive,在工作中使用相对较多,并且比较简单便捷,今天在家闲着没事,就写一遍博客来对hive的执行流程做一些分析,还有阅读一下它的源码,看看这个hive是怎么编译这个HiveQL,怎么去解析,怎么和我们hdfs上的数据关联,在mapreduce阶段怎么进行计算的。由于这个源码比较多,不是很好截图,有时候我会一部分一部分的截取。最主要的还是你们自己按照这个流程去看几遍,可以看看具体的细节。
准备工作
下载hive的源码,我是直接在maven中添加的maven依赖如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25<!-- hadoop依赖包 START -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<!-- hadoop依赖包 END-->
<!-- hive依赖包 START-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.14.0</version>
</dependency>
<!-- hive依赖包 END-->启动hive 并删除日志文件,因为我们要获取新的日志文件进行分析
1
2
3
4
5
6
7
8
9
10########################1. 日志文件默认目录/tmp/当前用户目录/下###############
[root@xxo07 root]# pwd
/tmp/root
########################2. 删除一下当天日志文件###############
[root@xxo07 root]# ll
total 1832
-rw-r--r--. 1 root root 67038 May 29 13:51 hive.log
-rw-r--r--. 1 root root 1798483 May 28 20:58 hive.log.2016-05-28
[root@xxo07 root]# rm -rf hive.log执行hive语句
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58hive> select * from t_5 order by id ;
Query ID = root_20160529142525_97b24c9c-a861-44d7-8fba-2c076c997c34
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1464498685344_0003, Tracking URL = http://xxo07:8088/proxy/application_1464498685344_0003/
Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job -kill job_1464498685344_0003
Interrupting... Be patient, this might take some time.
Press Ctrl+C again to kill JVM
killing job with: job_1464498685344_0003
Hadoop job information for Stage-1: number of mappers: 0; number of reducers: 0
2016-05-29 14:25:57,132 Stage-1 map = 0%, reduce = 0%
Ended Job = job_1464498685344_0003 with errors
Error during job, obtaining debugging information...
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec
hive> select * from t_5 order by id ;
Query ID = root_20160529142626_a68f490b-4e9f-43c7-a3b8-d16210751de7
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1464498685344_0004, Tracking URL = http://xxo07:8088/proxy/application_1464498685344_0004/
Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job -kill job_1464498685344_0004
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-05-29 14:26:30,745 Stage-1 map = 0%, reduce = 0%
2016-05-29 14:27:53,143 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.72 sec
2016-05-29 14:28:20,569 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8.3 sec
MapReduce Total cumulative CPU time: 8 seconds 300 msec
Ended Job = job_1464498685344_0004
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 8.3 sec HDFS Read: 1028 HDFS Write: 160 SUCCESS
Total MapReduce CPU Time Spent: 8 seconds 300 msec
OK
1 2016-05-28 cq
1 2015-05-30 wz
2 2016-05-28 cq
2 2015-05-30 yy
3 2016-05-28 cq
4 2016-05-28 cq
5 2016-05-28 cq
5 2015-05-30 cq
7 2015-05-30 bj
9 2015-05-30 hz
Time taken: 130.479 seconds, Fetched: 10 row(s)
分析日志文件
- 下载日志文件并做了一下整理,整理后日志文件的总体结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20<run>
<TimeToSubmit>
<compile>
<parse></parse>
<semanticAnalyze>
<partition-retrieving></partition-retrieving>
</semanticAnalyze>
</compile>
<execute>
<runTasks>
<serializePlan></serializePlan>
<getSplits></getSplits>
</runTasks>
</execute>
</TimeToSubmit>
</run>
### 1. 程序开始于run方法,run方法下面有两个重要的方法compile和execute;
### 2. compile : 方法下面有parse 和 semanticAnalyze;
### 2. execute : 运行任务runTasks,做一些序列化、分割切片splite,计算 即mapreduce阶段
第一阶段:运行run-TimeToSubmit-compile-parse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
###################### 1. 程序开始于这个run方法 #######################################
<PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>
###################### 2. rum下面有TimeToSubmit方法 ##################################
<PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver>
Concurrency mode is disabled, not creating a lock manager
###################### 3. 运行compile ################################################
<PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
################ 3.1. 运行compile,下面的parse解析HiveQL #########################
<PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
Parsing command: select * from t_5 order by id
Parse Completed
</PERFLOG method=parse start=1464503172611 end=1464503172613 duration=2 from=org.apache.hadoop.hive.ql.Driver>
############### 3.1. 解析完成 ###################################################第二阶段:运行semanticAnalyze语义分析阶段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43#################### 3.1. 运行compile,下面的semanticAnalyze语义分析 ################
<PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver>
Starting Semantic Analysis #### 开始语义分析
Completed phase 1 of Semantic Analysis
Get metadata for source tables #### 通过derby或者mysql获取metadata信息
0: get_table : db=xxo tbl=t_5 #### 获取数据库中的表t_5
ugi=root ip=unknown-ip-addr cmd=get_table : db=xxo tbl=t_5
Get metadata for subqueries
Get metadata for destination tables ###表的表述信息
New scratch dir is hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1
Completed getting MetaData in Semantic Analysis
Set stats collection dir : hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-ext-10002
Processing for FS(4)
Processing for SEL(3)
Processing for RS(2)
Processing for SEL(1)
Processing for TS(0)
RS 2 oldColExprMap: {VALUE._col1=Column[_col2], VALUE._col0=Column[_col1], KEY.reducesinkkey0=Column[_col0]}
RS 2 newColExprMap: {VALUE._col1=Column[_col2], VALUE._col0=Column[_col1], KEY.reducesinkkey0=Column[_col0]}
########### partition ###################
<PERFLOG method=partition-retrieving from=org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner>
0: get_partitions : db=xxo tbl=t_5
ugi=root ip=unknown-ip-addr cmd=get_partitions : db=xxo tbl=t_5
</PERFLOG method=partition-retrieving start=1464503172762 end=1464503172819 duration=57 from=org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner>
Looking for table scans where optimization is applicable
Found 0 null table scans
Looking for table scans where optimization is applicable
Found 0 null table scans
Looking for table scans where optimization is applicable
Found 0 null table scans
Completed plan generation #####完成plan
Semantic Analysis Completed
</PERFLOG method=semanticAnalyze start=1464503172613 end=1464503172841 duration=228 from=org.apache.hadoop.hive.ql.Driver>
Initializing Self OP[5]
Operator 5 OP initialized
Initialization Done 5 OP
Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:t_5.id, type:int, comment:null), FieldSchema(name:t_5.dt, type:date, comment:null), FieldSchema(name:t_5.city, type:string, comment:null)], properties:null)
################ 3.1. semanticAnalyze完成 ################################
</PERFLOG method=compile start=1464503172610 end=1464503172850 duration=240 from=org.apache.hadoop.hive.ql.Driver>
################## 3. compile完成 ############################################第三阶段:运行execute阶段,在execute中开启了线程,执行job任务
1
2
3
4
5
6
7
8################## 4. execute开始执行 ############################################
<PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver>
Starting command: select * from t_5 order by id
Query ID = root_20160529142626_a68f490b-4e9f-43c7-a3b8-d16210751de7
Total jobs = 1
################### 2. TimeToSubmit在这里结束,其实是因为开启了多线程###########
</PERFLOG method=TimeToSubmit start=1464503172610 end=1464503172854 duration=244 from=org.apache.hadoop.hive.ql.Driver>第四阶段:线程运行runTasks , 就是具体的MapReduce过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38<PERFLOG method=runTasks from=org.apache.hadoop.hive.ql.Driver>
<PERFLOG method=task.MAPRED.Stage-1 from=org.apache.hadoop.hive.ql.Driver>
Launching Job 1 out of 1 ##########################执行 launchTask 方法
######### 调用Task.initialize方法 ##########################
######### 实例化了一个TaskRunner ############################
######### 执行 launchTask中的runSequential方法 返回TaskRunner
Starting task [Stage-1:MAPRED] in serial mode
########################### 加载和设置一些配置文件######################
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes): ####加载参数
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
New scratch dir is hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1
Using org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
Processing alias t_5
######################### Adding input file #########################################################
Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj
Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq
Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz
Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz
Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy
Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq
Adding input file hdfs://xxo07:9000/user/hive/warehouse/t_1/t_1.txt
Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/t_1/t_1.txt
Changed input file to hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-mr-10003/0
New scratch dir is hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1serializePlan
1
2
3
4
5
6
7############################# serializePlan ###################################
<PERFLOG method=serializePlan from=org.apache.hadoop.hive.ql.exec.Utilities>
Serializing MapWork via kryo
</PERFLOG method=serializePlan start=1464503172964 end=1464503173007 duration=43 from=org.apache.hadoop.hive.ql.exec.Utilities>
<PERFLOG method=serializePlan from=org.apache.hadoop.hive.ql.exec.Utilities>
Serializing ReduceWork via kryo
</PERFLOG method=serializePlan start=1464503173015 end=1464503173148 duration=133 from=org.apache.hadoop.hive.ql.exec.Utilities>连接ResourceManager
1
2
3Connecting to ResourceManager at xxo07/192.168.33.72:8032
Connecting to ResourceManager at xxo07/192.168.33.72:8032
Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.getSplits 分割、切片
1
2
3
4
5
6
7
8
9
10
11
12
13<PERFLOG method=getSplits from=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat>
#################### CombineHiveInputSplit 合并hive Split文件
CombineHiveInputSplit creating pool for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj
CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq
CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz
CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz
CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy
CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq
CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-mr-10003/0; using filter path hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-mr-10003/0
Total input paths to process : 7
DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 0
number of splits 2
</PERFLOG method=getSplits start=1464503174738 end=1464503174933 duration=195 from=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat>在hadoop中,提交并开始一个job任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39Number of all splits 2
number of splits:2
Submitting tokens for job: job_1464498685344_0004
Submitted application application_1464498685344_0004
The url to track the job: http://xxo07:8088/proxy/application_1464498685344_0004/
Starting Job = job_1464498685344_0004, Tracking URL = http://xxo07:8088/proxy/application_1464498685344_0004/
Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job -kill job_1464498685344_0004
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
2016-05-29 14:26:30,745 Stage-1 map = 0%, reduce = 0%
2016-05-29 14:27:53,143 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.72 sec
2016-05-29 14:28:20,569 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8.3 sec ###任务结束
MapReduce Total cumulative CPU time: 8 seconds 300 msec
Ended Job = job_1464498685344_0004
Moving tmp dir: hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/_tmp.-ext-10001 to: hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-ext-10001
#################runTasks阶段结束###############################
</PERFLOG method=runTasks start=1464503172855 end=1464503303027 duration=130172 from=org.apache.hadoop.hive.ql.Driver>
#################execute阶段结束###############################
</PERFLOG method=Driver.execute start=1464503172850 end=1464503303049 duration=130199 from=org.apache.hadoop.hive.ql.Driver>
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 8.3 sec HDFS Read: 1028 HDFS Write: 160 SUCCESS
Total MapReduce CPU Time Spent: 8 seconds 300 msec
OK
<PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
</PERFLOG method=releaseLocks start=1464503303086 end=1464503303087 duration=1 from=org.apache.hadoop.hive.ql.Driver>
##################### run 结束######################################
</PERFLOG method=Driver.run start=1464503172609 end=1464503303087 duration=130478 from=org.apache.hadoop.hive.ql.Driver>
mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
Total input paths to process : 1
5 finished. closing...
5 Close done
Time taken: 130.479 seconds, Fetched: 10 row(s)
<PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
</PERFLOG method=releaseLocks start=1464503304801 end=1464503304802 duration=1 from=org.apache.hadoop.hive.ql.Driver>
源码阅读
从 run() 方法到 runInternal() 方法
runInternal() 方法里面有compileInternal编译和execute方法
下面我们先来看一下compileInternal()方法
compile方法中有parse方法和semanticAnalyze方法,语义分析完成后,会将语句中的相应信息放入到
org.apache.hadoop.hive.ql.QueryPlan
中execute方法,从
QueryPlan
中获取信息,执行物理计划,就是提交 job 给 hadoop 进行执行。 通过调用launchTask
方法,然后运行线程