Hive--执行流程和源码解析

  Hive,在工作中使用相对较多,并且比较简单便捷,今天在家闲着没事,就写一遍博客来对hive的执行流程做一些分析,还有阅读一下它的源码,看看这个hive是怎么编译这个HiveQL,怎么去解析,怎么和我们hdfs上的数据关联,在mapreduce阶段怎么进行计算的。由于这个源码比较多,不是很好截图,有时候我会一部分一部分的截取。最主要的还是你们自己按照这个流程去看几遍,可以看看具体的细节。

  • 下面我们来看一张经典的图,本博客也是围绕这张图展开的讲解:
  • Hive与Hadoop的调用关系图: Hive与Hadoop的调用关系图

准备工作

  1. 下载hive的源码,我是直接在maven中添加的maven依赖如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    <!-- hadoop依赖包 START -->
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.6.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
    </dependency>
    <!-- hadoop依赖包 END-->

    <!-- hive依赖包 START-->
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>0.14.0</version>
    </dependency>
    <!-- hive依赖包 END-->
  2. 启动hive 并删除日志文件,因为我们要获取新的日志文件进行分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ########################1. 日志文件默认目录/tmp/当前用户目录/下###############
    [root@xxo07 root]# pwd
    /tmp/root

    ########################2. 删除一下当天日志文件###############
    [root@xxo07 root]# ll
    total 1832
    -rw-r--r--. 1 root root 67038 May 29 13:51 hive.log
    -rw-r--r--. 1 root root 1798483 May 28 20:58 hive.log.2016-05-28
    [root@xxo07 root]# rm -rf hive.log
  3. 执行hive语句

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    hive> select * from t_5 order by id ;
    Query ID = root_20160529142525_97b24c9c-a861-44d7-8fba-2c076c997c34
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks determined at compile time: 1
    In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
    set mapreduce.job.reduces=<number>
    Starting Job = job_1464498685344_0003, Tracking URL = http://xxo07:8088/proxy/application_1464498685344_0003/
    Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job -kill job_1464498685344_0003
    Interrupting... Be patient, this might take some time.
    Press Ctrl+C again to kill JVM
    killing job with: job_1464498685344_0003
    Hadoop job information for Stage-1: number of mappers: 0; number of reducers: 0
    2016-05-29 14:25:57,132 Stage-1 map = 0%, reduce = 0%
    Ended Job = job_1464498685344_0003 with errors
    Error during job, obtaining debugging information...
    FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
    MapReduce Jobs Launched:
    Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 FAIL
    Total MapReduce CPU Time Spent: 0 msec
    hive> select * from t_5 order by id ;
    Query ID = root_20160529142626_a68f490b-4e9f-43c7-a3b8-d16210751de7
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks determined at compile time: 1
    In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
    set mapreduce.job.reduces=<number>
    Starting Job = job_1464498685344_0004, Tracking URL = http://xxo07:8088/proxy/application_1464498685344_0004/
    Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job -kill job_1464498685344_0004
    Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
    2016-05-29 14:26:30,745 Stage-1 map = 0%, reduce = 0%
    2016-05-29 14:27:53,143 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.72 sec
    2016-05-29 14:28:20,569 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8.3 sec
    MapReduce Total cumulative CPU time: 8 seconds 300 msec
    Ended Job = job_1464498685344_0004
    MapReduce Jobs Launched:
    Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 8.3 sec HDFS Read: 1028 HDFS Write: 160 SUCCESS
    Total MapReduce CPU Time Spent: 8 seconds 300 msec
    OK
    1 2016-05-28 cq
    1 2015-05-30 wz
    2 2016-05-28 cq
    2 2015-05-30 yy
    3 2016-05-28 cq
    4 2016-05-28 cq
    5 2016-05-28 cq
    5 2015-05-30 cq
    7 2015-05-30 bj
    9 2015-05-30 hz
    Time taken: 130.479 seconds, Fetched: 10 row(s)

分析日志文件

  1. 下载日志文件并做了一下整理,整理后日志文件的总体结构如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <run>
    <TimeToSubmit>
    <compile>
    <parse></parse>
    <semanticAnalyze>
    <partition-retrieving></partition-retrieving>
    </semanticAnalyze>
    </compile>
    <execute>
    <runTasks>
    <serializePlan></serializePlan>
    <getSplits></getSplits>
    </runTasks>
    </execute>
    </TimeToSubmit>
    </run>

    ### 1. 程序开始于run方法,run方法下面有两个重要的方法compile和execute;
    ### 2. compile : 方法下面有parse 和 semanticAnalyze;
    ### 2. execute : 运行任务runTasks,做一些序列化、分割切片splite,计算 即mapreduce阶段
  • 第一阶段:运行run-TimeToSubmit-compile-parse

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    ###################### 1. 程序开始于这个run方法 #######################################
    <PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>

    ###################### 2. rum下面有TimeToSubmit方法 ##################################
    <PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver>
    Concurrency mode is disabled, not creating a lock manager

    ###################### 3. 运行compile ################################################
    <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>

    ################ 3.1. 运行compile,下面的parse解析HiveQL #########################
    <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
    Parsing command: select * from t_5 order by id
    Parse Completed
    </PERFLOG method=parse start=1464503172611 end=1464503172613 duration=2 from=org.apache.hadoop.hive.ql.Driver>
    ############### 3.1. 解析完成 ###################################################
  • 第二阶段:运行semanticAnalyze语义分析阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
      #################### 3.1. 运行compile,下面的semanticAnalyze语义分析 ################
    <PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver>
    Starting Semantic Analysis #### 开始语义分析
    Completed phase 1 of Semantic Analysis
    Get metadata for source tables #### 通过derby或者mysql获取metadata信息
    0: get_table : db=xxo tbl=t_5 #### 获取数据库中的表t_5
    ugi=root ip=unknown-ip-addr cmd=get_table : db=xxo tbl=t_5
    Get metadata for subqueries
    Get metadata for destination tables ###表的表述信息
    New scratch dir is hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1
    Completed getting MetaData in Semantic Analysis
    Set stats collection dir : hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-ext-10002
    Processing for FS(4)
    Processing for SEL(3)
    Processing for RS(2)
    Processing for SEL(1)
    Processing for TS(0)
    RS 2 oldColExprMap: {VALUE._col1=Column[_col2], VALUE._col0=Column[_col1], KEY.reducesinkkey0=Column[_col0]}
    RS 2 newColExprMap: {VALUE._col1=Column[_col2], VALUE._col0=Column[_col1], KEY.reducesinkkey0=Column[_col0]}

    ########### partition ###################
    <PERFLOG method=partition-retrieving from=org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner>
    0: get_partitions : db=xxo tbl=t_5
    ugi=root ip=unknown-ip-addr cmd=get_partitions : db=xxo tbl=t_5
    </PERFLOG method=partition-retrieving start=1464503172762 end=1464503172819 duration=57 from=org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner>

    Looking for table scans where optimization is applicable
    Found 0 null table scans
    Looking for table scans where optimization is applicable
    Found 0 null table scans
    Looking for table scans where optimization is applicable
    Found 0 null table scans
    Completed plan generation #####完成plan
    Semantic Analysis Completed
    </PERFLOG method=semanticAnalyze start=1464503172613 end=1464503172841 duration=228 from=org.apache.hadoop.hive.ql.Driver>
    Initializing Self OP[5]
    Operator 5 OP initialized
    Initialization Done 5 OP
    Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:t_5.id, type:int, comment:null), FieldSchema(name:t_5.dt, type:date, comment:null), FieldSchema(name:t_5.city, type:string, comment:null)], properties:null)
    ################ 3.1. semanticAnalyze完成 ################################

    </PERFLOG method=compile start=1464503172610 end=1464503172850 duration=240 from=org.apache.hadoop.hive.ql.Driver>
    ################## 3. compile完成 ############################################
  • 第三阶段:运行execute阶段,在execute中开启了线程,执行job任务

    1
    2
    3
    4
    5
    6
    7
    8
    ################## 4. execute开始执行 ############################################
    <PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver>
    Starting command: select * from t_5 order by id
    Query ID = root_20160529142626_a68f490b-4e9f-43c7-a3b8-d16210751de7
    Total jobs = 1

    ################### 2. TimeToSubmit在这里结束,其实是因为开启了多线程###########
    </PERFLOG method=TimeToSubmit start=1464503172610 end=1464503172854 duration=244 from=org.apache.hadoop.hive.ql.Driver>
  • 第四阶段:线程运行runTasks , 就是具体的MapReduce过程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    <PERFLOG method=runTasks from=org.apache.hadoop.hive.ql.Driver>
    <PERFLOG method=task.MAPRED.Stage-1 from=org.apache.hadoop.hive.ql.Driver>
    Launching Job 1 out of 1 ##########################执行 launchTask 方法

    ######### 调用Task.initialize方法 ##########################
    ######### 实例化了一个TaskRunner ############################
    ######### 执行 launchTask中的runSequential方法 返回TaskRunner
    Starting task [Stage-1:MAPRED] in serial mode

    ########################### 加载和设置一些配置文件######################
    Number of reduce tasks determined at compile time: 1
    In order to change the average load for a reducer (in bytes): ####加载参数
    set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
    set mapreduce.job.reduces=<number>
    New scratch dir is hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1
    Using org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
    Processing alias t_5

    ######################### Adding input file #########################################################
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq
    Adding input file hdfs://xxo07:9000/user/hive/warehouse/t_1/t_1.txt
    Content Summary not cached for hdfs://xxo07:9000/user/hive/warehouse/t_1/t_1.txt
    Changed input file to hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-mr-10003/0
    New scratch dir is hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1
  • serializePlan

    1
    2
    3
    4
    5
    6
    7
    ############################# serializePlan ###################################
    <PERFLOG method=serializePlan from=org.apache.hadoop.hive.ql.exec.Utilities>
    Serializing MapWork via kryo
    </PERFLOG method=serializePlan start=1464503172964 end=1464503173007 duration=43 from=org.apache.hadoop.hive.ql.exec.Utilities>
    <PERFLOG method=serializePlan from=org.apache.hadoop.hive.ql.exec.Utilities>
    Serializing ReduceWork via kryo
    </PERFLOG method=serializePlan start=1464503173015 end=1464503173148 duration=133 from=org.apache.hadoop.hive.ql.exec.Utilities>
  • 连接ResourceManager

    1
    2
    3
    Connecting to ResourceManager at xxo07/192.168.33.72:8032
    Connecting to ResourceManager at xxo07/192.168.33.72:8032
    Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  • getSplits 分割、切片

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    <PERFLOG method=getSplits from=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat>
    #################### CombineHiveInputSplit 合并hive Split文件
    CombineHiveInputSplit creating pool for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=bj
    CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=cq
    CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=hz
    CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=wz
    CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2015-05-30/city=yy
    CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq; using filter path hdfs://xxo07:9000/user/hive/warehouse/xxo.db/t_5/dt=2016-05-28/city=cq
    CombineHiveInputSplit: pool is already created for hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-mr-10003/0; using filter path hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-mr-10003/0
    Total input paths to process : 7
    DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 0
    number of splits 2
    </PERFLOG method=getSplits start=1464503174738 end=1464503174933 duration=195 from=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat>
  • 在hadoop中,提交并开始一个job任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
       Number of all splits 2
    number of splits:2
    Submitting tokens for job: job_1464498685344_0004
    Submitted application application_1464498685344_0004
    The url to track the job: http://xxo07:8088/proxy/application_1464498685344_0004/
    Starting Job = job_1464498685344_0004, Tracking URL = http://xxo07:8088/proxy/application_1464498685344_0004/
    Kill Command = /usr/local/hadoop-2.6.0/bin/hadoop job -kill job_1464498685344_0004
    Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
    Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
    2016-05-29 14:26:30,745 Stage-1 map = 0%, reduce = 0%
    2016-05-29 14:27:53,143 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.72 sec
    2016-05-29 14:28:20,569 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8.3 sec ###任务结束
    MapReduce Total cumulative CPU time: 8 seconds 300 msec
    Ended Job = job_1464498685344_0004
    Moving tmp dir: hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/_tmp.-ext-10001 to: hdfs://xxo07:9000/tmp/hive/root/9d2c112b-118e-46a1-8a08-c60c5bbc6fe0/hive_2016-05-29_14-26-12_611_2648166283911131981-1/-ext-10001

    #################runTasks阶段结束###############################
    </PERFLOG method=runTasks start=1464503172855 end=1464503303027 duration=130172 from=org.apache.hadoop.hive.ql.Driver>

    #################execute阶段结束###############################
    </PERFLOG method=Driver.execute start=1464503172850 end=1464503303049 duration=130199 from=org.apache.hadoop.hive.ql.Driver>
    MapReduce Jobs Launched:
    Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 8.3 sec HDFS Read: 1028 HDFS Write: 160 SUCCESS
    Total MapReduce CPU Time Spent: 8 seconds 300 msec
    OK

    <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
    </PERFLOG method=releaseLocks start=1464503303086 end=1464503303087 duration=1 from=org.apache.hadoop.hive.ql.Driver>

    ##################### run 结束######################################
    </PERFLOG method=Driver.run start=1464503172609 end=1464503303087 duration=130478 from=org.apache.hadoop.hive.ql.Driver>
    mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
    Total input paths to process : 1
    5 finished. closing...
    5 Close done
    Time taken: 130.479 seconds, Fetched: 10 row(s)

    <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
    </PERFLOG method=releaseLocks start=1464503304801 end=1464503304802 duration=1 from=org.apache.hadoop.hive.ql.Driver>

源码阅读

  1. 从 run() 方法到 runInternal() 方法
    从run()方法 到 runInternal()方法

  2. runInternal() 方法里面有compileInternal编译和execute方法
    进入compileInternal()方法进行编译
    进入execute()方法执行

  3. 下面我们先来看一下compileInternal()方法
    调用compile()方法,做语法,语义,计划生成

  4. compile方法中有parse方法和semanticAnalyze方法,语义分析完成后,会将语句中的相应信息放入到 org.apache.hadoop.hive.ql.QueryPlan
    compile方法中的解析parse
    compile方法中的语义分析
    解析parse方法
    语义分析抽象类 BaseSemanticAnalyzer 有很多的子类

  5. execute方法,从QueryPlan中获取信息,执行物理计划,就是提交 job 给 hadoop 进行执行。 通过调用launchTask方法,然后运行线程
    execute方法 调用launchTask方法
    launchTask方法,运行tast任务
    executeTask方法执行具体的物理计划

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器