配置访问

这里其实是使用了 spark-shell 的 scala 环境,通过 hbase 本身提供的 jar 包来读写 hbase

执行以下步骤

cd $SPARK_HOME/jars
mkdir hbase && cd hbase
cp $HBASE_HOME/lib/hbase*.jar $HBASE_HOME/lib/guava-11.0.2.jar $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HBASE_HOME/lib/protobuf-java-2.5.0.jar ./

修改spark-default.conf, 填上的有 hadoop 、hive、hbase 的配置路径,以及刚刚的 jar 包路径

spark.driver.extraClassPath      /opt/hadoop/etc/hadoop:/opt/hive/conf:/opt/hbase/conf:/opt/spark/jars/hbase/*:/opt/spark/jars/*
spark.executor.extraClassPath      /opt/hadoop/etc/hadoop:/opt/hive/conf:/opt/hbase/conf:/opt/spark/jars/hbase/*:/opt/spark/jars/*

这里的目的是让spark 在执行中能够找到相应的 jar 包 路径和相应的其他 configuration

执行 spark-shell

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{Put, HBaseAdmin, HTable, Scan, Get}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.commons.codec.digest.DigestUtils

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "new")
// 如果前面没有配置相应的 configuration 路径的话,就要在此处指定 相应的配置
// conf.set("hbase.zookeeper.quorum", "slave1,slave4,slave5")

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
 classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
 classOf[org.apache.hadoop.hbase.client.Result])
rdd.count()
rdd.map(t => (t._1.toString(), t._2.toString())).first()

读写接口

这里需要用到 hbase 提供的接口

github: https://github.com/apache/hbase-connectors (从hbase项目的中hbase-spark 模块移除出来)

下载之后进行编译

cd spark
# 下面的参数需要和实际环境一致, 这里需要注意的是 json4s 这个包 需要和 $SPARK_HOME/jars/json4s 的jar 包版本一样
mvn -Dspark.version=2.4.0 -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -Djson4s.version=3.5.3 -DskipTests=True clean install

编译好的jar包 分别在 spark/hbase-spark-it/target 和 spark/hbase-spark/target 目录下, 复制到spark 加载 hbase 的路径下

同时因为这里只是包含了编译结果的包,如果要正常运行,还需要将 hbase/lib/client-facing-thirdparty (hbase2.1以上) 目录下的jar 包 拷贝到 spark 的加载路径下, 同步所有机器。

cp $HBASE_HOME/lib/client-facing-thirdparty/* $SPARK_HOME/jars/hbase

然后我们就可以在 spark-shell 中使用 hbase 的 rdd 等相关功能了