在构建元数据仓库的时候, 我们常常需要通过实际运行的 HQL 来分析相应的字段血缘关系.
一般的做法是, 我们可以通过利用 antlr库 定义 HQL 的语法树, 通过生成相应语音的解析代码. 自行解析语法树.
但是这样带来了几个问题.
基于以上两个问题, 我们在实际应用中可以复用一部分 hive 中对血缘关系的实现代码. 进行改动. 以完成我们的需求目标.
接下里我们来看 hive 中的 lineage 是如何实现的. 以下内容基于 hive:rel/release-2.3.4 版本
首先我们找到 org.apache.hadoop.hive.ql.Driver
类, 这个类是实现执行 HQL 的具体类, 包含了关于 compile
和 execute
的相关代码. 这里我们找到代码的 run
方法
// <https://github.com/apache/hive/blob/56acdd2120b9ce6790185c679223b8b5e884aaf2/ql/src/java/org/apache/hadoop/hive/ql/Driver.java#L1235-L1292>
public CommandProcessorResponse run(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
CommandProcessorResponse cpr = runInternal(command, alreadyCompiled);
...
}
可以看到这里调用了 runInternal
方法. 我们找到相关方法定义
// <https://github.com/apache/hive/blob/56acdd2120b9ce6790185c679223b8b5e884aaf2/ql/src/java/org/apache/hadoop/hive/ql/Driver.java#L1407-L1584>
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
...
PerfLogger perfLogger = null;
int ret;
if (!alreadyCompiled) {
// compile internal will automatically reset the perf logger
ret = compileInternal(command, true); // 执行编译
// then we continue to use this perf logger
perfLogger = SessionState.getPerfLogger();
if (ret != 0) {
return createProcessorResponse(ret);
}
} else {
// reuse existing perf logger.
perfLogger = SessionState.getPerfLogger();
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
}
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
HiveTxnManager txnManager = SessionState.get().getTxnMgr();
ctx.setHiveTxnManager(txnManager);
boolean startTxnImplicitly = false;
...
if (requiresLock()) {
// a checkpoint to see if the thread is interrupted or not before an expensive operation
if (isInterrupted()) {
ret = handleInterruption("at acquiring the lock.");
} else {
ret = acquireLocksAndOpenTxn(startTxnImplicitly);
}
if (ret != 0) {
return rollback(createProcessorResponse(ret));
}
}
ret = execute(true); // 请求执行
if (ret != 0) {
//if needRequireLock is false, the release here will do nothing because there is no lock
return rollback(createProcessorResponse(ret));
}
...
}
在该方法中. 除了初始化了一些日志组件, 还执行了对 HQL 的编译以及执行过程. 这里我们只需关心对 HQL 的语法解析, 所以只看 compileInternal
方法即可
通过 compileInternal
方法我们看到最终执行的具体方法 compile
private int compileInternal(String command, boolean deferClose) {
...
try {
if (metrics != null) {
metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
}
ret = compile(command, true, deferClose); // 执行 compile 的具体方法
} finally {
compileLock.unlock();
}
...
}
在 compile
中. 主要做了这么几件事情. 如下所示
public int compile(String command, boolean resetTaskIds, boolean deferClose) {
// 初始化日志记录
// 填充命令占位符
String queryStr = command;
// 状态判断
// 初始化一些组件
// Whether any error occurred during query compilation. Used for query lifetime hook.
boolean compileError = false;
try {
// ..
ASTNode tree = ParseUtils.parse(command, ctx); // 解析语法树
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
// Trigger query hook before compilation
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
// ...
} else {
sem.analyze(tree, ctx); // 触发获取 血缘关系的 hooks
}
// validate the plan
// get the output schema
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
queryState.getHiveOperation(), schema);
// initialize FetchTask right here
//do the authorization check
// 日志输出
return 0;
} catch (Exception e) {
// ...
} finally {
// ...
}
}
其中 SemanticAnalyzerFactory
这里是根据我们查询的 HQL 类型获取相应的处理实例. 根据跳转查阅可知 QUERY
类型的 HQL 会返回 org.apache.hadoop.hive.ql.parse.CalcitePlanner
类型实例