介绍

在构建元数据仓库的时候, 我们常常需要通过实际运行的 HQL 来分析相应的字段血缘关系.

一般的做法是, 我们可以通过利用 antlr库 定义 HQL 的语法树, 通过生成相应语音的解析代码. 自行解析语法树.

但是这样带来了几个问题.

  1. 血缘关系没有和 Hive MetaStore 结合起来. 实际解析结果只能依赖于 HQL 输入的信息. 没办法对相应的 HQL 完成校验工作
  2. HQL 语法形式多样. 如果自行编写的话, 难以覆盖完全应用场景.

基于以上两个问题, 我们在实际应用中可以复用一部分 hive 中对血缘关系的实现代码. 进行改动. 以完成我们的需求目标.

Hive 中的 lineage 代码实现

接下里我们来看 hive 中的 lineage 是如何实现的. 以下内容基于 hive:rel/release-2.3.4 版本

首先我们找到 org.apache.hadoop.hive.ql.Driver 类, 这个类是实现执行 HQL 的具体类, 包含了关于 compileexecute 的相关代码. 这里我们找到代码的 run 方法

// <https://github.com/apache/hive/blob/56acdd2120b9ce6790185c679223b8b5e884aaf2/ql/src/java/org/apache/hadoop/hive/ql/Driver.java#L1235-L1292>
  
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
  CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
  ...
}

可以看到这里调用了 runInternal 方法. 我们找到相关方法定义

// <https://github.com/apache/hive/blob/56acdd2120b9ce6790185c679223b8b5e884aaf2/ql/src/java/org/apache/hadoop/hive/ql/Driver.java#L1407-L1584>

private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
    throws CommandNeedRetryException {
   
    ...
    PerfLogger perfLogger = null;

    int ret;
    if (!alreadyCompiled) {
      // compile internal will automatically reset the perf logger
      ret = compileInternal(command, true);  // 执行编译
      // then we continue to use this perf logger
      perfLogger = SessionState.getPerfLogger();
      if (ret != 0) {
        return createProcessorResponse(ret);
      }
    } else {
      // reuse existing perf logger.
      perfLogger = SessionState.getPerfLogger();
      // Since we're reusing the compiled plan, we need to update its start time for current run
      plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
    }
    // the reason that we set the txn manager for the cxt here is because each
    // query has its own ctx object. The txn mgr is shared across the
    // same instance of Driver, which can run multiple queries.
    HiveTxnManager txnManager = SessionState.get().getTxnMgr();
    ctx.setHiveTxnManager(txnManager);

    boolean startTxnImplicitly = false;
    
    ...

    if (requiresLock()) {
      // a checkpoint to see if the thread is interrupted or not before an expensive operation
      if (isInterrupted()) {
        ret = handleInterruption("at acquiring the lock.");
      } else {
        ret = acquireLocksAndOpenTxn(startTxnImplicitly);
      }
      if (ret != 0) {
        return rollback(createProcessorResponse(ret));
      }
    }
    ret = execute(true);  // 请求执行
    if (ret != 0) {
      //if needRequireLock is false, the release here will do nothing because there is no lock
      return rollback(createProcessorResponse(ret));
    }

    ...
}

在该方法中. 除了初始化了一些日志组件, 还执行了对 HQL 的编译以及执行过程. 这里我们只需关心对 HQL 的语法解析, 所以只看 compileInternal 方法即可

通过 compileInternal 方法我们看到最终执行的具体方法 compile

private int compileInternal(String command, boolean deferClose) {
    ...

    try {
      if (metrics != null) {
        metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
      }
      ret = compile(command, true, deferClose);  // 执行 compile 的具体方法
    } finally {
      compileLock.unlock();
    }

    ...
  }

compile 中. 主要做了这么几件事情. 如下所示

public int compile(String command, boolean resetTaskIds, boolean deferClose) {
    // 初始化日志记录

    // 填充命令占位符

    String queryStr = command;

    // 状态判断

    // 初始化一些组件

    // Whether any error occurred during query compilation. Used for query lifetime hook.
    boolean compileError = false;
    try {

      // ..
      ASTNode tree = ParseUtils.parse(command, ctx);  // 解析语法树
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);

      // Trigger query hook before compilation

      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);

      // Do semantic analysis and plan generation
      if (saHooks != null && !saHooks.isEmpty()) {
        // ...
      } else {
        sem.analyze(tree, ctx);  // 触发获取 血缘关系的 hooks
      }

      // validate the plan

      // get the output schema
      schema = getSchema(sem, conf);
      plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
        queryState.getHiveOperation(), schema);

      // initialize FetchTask right here

      //do the authorization check

      // 日志输出
      return 0;
    } catch (Exception e) {
      // ...
    } finally {
      // ...
    }
  }

其中 SemanticAnalyzerFactory 这里是根据我们查询的 HQL 类型获取相应的处理实例. 根据跳转查阅可知 QUERY 类型的 HQL 会返回 org.apache.hadoop.hive.ql.parse.CalcitePlanner 类型实例