1. 首页
  2. IT资讯

HBaseClient

 

 

 

package com.feng.scheduler.log;  import java.io.IOException; import java.util.ArrayList; import java.util.List;  import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  /**  * @author bjzhongdegen  *   */ public class HBaseClient { 	protected static final Logger LOGGER = LoggerFactory.getLogger(HBaseClient.class);  	private static Configuration HBASE_CONF = null; 	private static HTablePool TABLE_POOL = null; 	private static final String CONNECTOR = "#"; 	private static com.feng.scheduler.common.filemanager.Configuration commonConf = null;  	/** 	 * 初始化配置 	 */ 	static { 		commonConf = new com.feng.scheduler.common.filemanager.Configuration("common-config.xml"); 		Configuration conf = new Configuration(); 		// 与hbase/conf/hbase-site.xml中hbase.zookeeper.quorum配置的值相同 		conf.set("hbase.zookeeper.quorum", commonConf.getDefault("hbase.zookeeper.quorum", "127.0.0.1")); 		// 与hbase/conf/hbase-site.xml中hbase.zookeeper.property.clientPort配置的值相同 		conf.set("hbase.zookeeper.property.clientPort", commonConf.getDefault("hbase.zookeeper.property.clientPort", "2181")); 		HBASE_CONF = HBaseConfiguration.create(conf); 		TABLE_POOL = new HTablePool(HBASE_CONF, 30);  	}  	public static LogRecord scanLog(String tableName, String prifixKey, String startkey, String stopKey) { 		LOGGER.info("Get prifixKey["+prifixKey+"], startkey="+startkey+", stopKey="+stopKey); 		LogRecord result = new LogRecord(); 		 		StringBuffer sb = new StringBuffer(); 		HTableInterface table = null; 		try { 			table = TABLE_POOL.getTable(tableName); 			Scan s = new Scan(); 			s.setCaching(100); 			List<Filter> list = new ArrayList<Filter>();  			Filter prifixFilter =new PrefixFilter(prifixKey.getBytes()); 			Filter pageFilter = new PageFilter(100); 			list.add(pageFilter); 			list.add(prifixFilter); 			Filter all = new FilterList(Operator.MUST_PASS_ALL, list); 			s.setFilter(all); 			s.setStartRow(startkey.getBytes()); 			if(!StringUtils.isBlank(stopKey)) { 				s.setStopRow(stopKey.getBytes()); 			} 			ResultScanner rs = table.getScanner(s); 			for (Result r : rs) {                         //新版本api                         for(Cell cell:r.rawCells()){                            System.out.println("RowName:"+new String(CellUtil.cloneRow(cell))+" ");                         System.out.println("Timetamp:"+cell.getTimestamp()+" ");                         System.out.println("column Family:"+new String(CellUtil.cloneFamily(cell))+" ");                         System.out.println("row Name:"+new String(CellUtil.cloneQualifier(cell))+" ");                         System.out.println("value:"+new String(CellUtil.cloneValue(cell))+" ");                                }  				KeyValue[] kv = r.raw(); 				for (int i = 0; i < kv.length; i++) { 					sb.append(new String(kv[i].getValue())).append("n"); 					if(i == kv.length - 1) { 						result.setNextTimestamp(getTimeStamp(new String(kv[i].getRow()))); 					} 				} 			} 			rs.close(); 		} catch (IOException e) { 			LOGGER.error("Get prifixKey["+prifixKey+"], startkey="+startkey+", stopKey="+stopKey, e); 		} finally { 			closeHTable(table); 		} 		result.setContent(sb.toString()); 		return result; 	} 	 	/** 	 * @param string 	 * @return 	 */ 	private static Long getTimeStamp(String stopKey) { 		return Long.valueOf(stopKey.split(CONNECTOR)[1]); 	}  	public static void pushLog(String tableName, String taskInstanceId, String logContent) { 		pushLog(tableName, taskInstanceId, logContent, "f", "content"); 	} 	 	public static void pushLog(String tableName, String taskInstanceId, String logContent, String family, String qualifier) { 		LOGGER.info("push " + taskInstanceId + " log to " + tableName); 		HTableInterface table = null; 		try { 			table = TABLE_POOL.getTable(tableName); 			Put put = new Put((taskInstanceId + CONNECTOR + System.currentTimeMillis()).getBytes()); 			put.add(family.getBytes(), qualifier.getBytes(), logContent.getBytes()); 			table.put(put); 		} catch (IOException ioe) { 			LOGGER.error("push " + taskInstanceId + " log to " + tableName + " failed.", ioe); 		} catch (Throwable e) { 			LOGGER.error("push " + taskInstanceId + " log to " + tableName + " failed.", e); 		} finally { 			closeHTable(table); 		} 	}  	private static void closeHTable(HTableInterface table) { 		if(table == null) 			return; 		 		try { 			table.close(); 		} catch (IOException e) { 			LOGGER.warn("close hbase table FAILED", e); 		} 	}  	/** 	 * @param args 	 */ 	public static void main(String[] args) { //		for(int i=0; i< 100;i ++) { //			HBaseClient.pushLog("task_logs", "xxxxxx", "lllllllllllllllllllllllllll"); //		} 		 		LogRecord result = HBaseClient.scanLog("task_logs", "9#", "9#1", null); 		System.out.println(result.getContent()); 		 	} }

 

发布了287 篇原创文章 · 获赞 12 · 访问量 3万+

原文始发于:

主题测试文章,只做测试使用。发布者:貪圖浪蕩,转转请注明出处:http://www.cxybcw.com/142682.html

联系我们

13687733322

在线咨询:点击这里给我发消息

邮件:1877088071@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code