基本逻辑是 php 通过调用 hbase thrift server, 然后 hbase thrift server 再去调用hbase 的api

这里首先第一步,安装 thrift 编译环境, 参考其他章节

编译完成之后进入 hbase 的源码目录

cd <Hbase源码目录>/hbase-thrift/src/main/resources/org/apache/hadoop/hbase

这里可选的有thrift 和 thrift2 两种接口,根据需要选择是使用 thrift 还是 thrift2

这里选择 thrift 接口 cd thrift

执行命令生成 php 的 相应包 thrift -gen php hbase.thrift

同时进入之前 thrift 的源码目录,找到 php 的lib 包 cd <thrift源码目录>/lib/php/lib

将两个包一并复制到 php 可以找到的目录。然后编写测试代码

需要注意的是。 hbase 默认的超时时间为 60s . 由参数hbase.thrift.server.socket.read.timeout

控制,单位为毫秒,当 client 和 thrift server 超过一定时间没有读写,thrift server 会关闭连接,此时读写操作会报 TTransportException:write broken pipe 的错误,但 transport 的连接仍为打开状态,需要注意。

可以通过增加 hbase.thrift.server.socket.read.timeout 的时间,并增加异常处理重试。

另外针对单个任务执行时间过长,需要调整 hbase.thrift.connection.max-idletime 来增大最大处理时间

附上读写 thrift 的 php 代码

<?php

ini_set('display_errors',E_ALL);

require_once"./../../../common/PHP/class_loader.php";
require_once"./../../../common/PHP/HBaseThrift2/require.php";

useThrift\\Transport\\TSocket;
useThrift\\Protocol\\TBinaryProtocol;
useThrift\\Transport\\TBufferedTransport;

$host='slave1';
$port=9090;

$socket=newTSocket($host,$port);
$transport=newTBufferedTransport($socket);
$protocol=newTBinaryProtocol($transport);

//createclient
$client=newTHBaseServiceClient($protocol);
$transport->open();

$tableName="new";

#putdata

$column_value=newTColumnValue();
$column_value->family="cf";
$column_value->qualifier="col";
$column_value->value=100;

$put=newTPut();
$put->row='row_100';
$put->columnValues=[$column_value];

$client->put($tableName,$put);

#getdata

$column_1=newTColumn();
$column_1->family='cf';
$column_1->qualifier='col';

$get=newTGet();
$get->row='a';
$get->columns=[$column_1];

$arr=$client->get($tableName,$get);

$results=$arr->columnValues;
foreach($resultsas$result)
{
$qualifier=(string)$result->qualifier;
$value=$result->value;
print_r($qualifier);
print_r($value);
}

#HOWTOSCAN

$scan=newTScan();
$scan->startRow='row_10';
$scan->stopRow='row_101';
$scan->columns=[$column_1];
$num=1000;

$scanId=$client->openScanner($tableName,$scan);
$scan_results=$client->getScannerRows($scanId,$num);
foreach($scan_resultsas$scanRet)
{
$scan_row=$scanRet->row;
$scan_cols=$scanRet->columnValues;
print_r($scan_row);
print_r($scan_cols);
}

$client->closeScanner($scanId);

$transport->close();