hive自定义UDTF函数叉分函数 您所在的位置:网站首页 primitiveobjectinspector hive自定义UDTF函数叉分函数

hive自定义UDTF函数叉分函数

2023-08-07 00:30| 来源: 网络整理| 查看: 265

hive自定义UDTF函数叉分函数 1、介绍

从聚合体日志中需要拆解出来各子日志数据,然后单独插入到各日志子表中。通过表生成函数完成这一过程。

2、定义ForkLogUDTF 2.1 HiveUtil工具类 package com.oldboy.umeng.hive.util; import com.oldboy.umeng.common.domain.AppStartupLog; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.beans.BeanInfo; import java.beans.IntrospectionException; import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.lang.reflect.Method; import java.util.ArrayList; /** * hive工具 */ public class HiveUtil { /** * 组装对象检查器 */ public static void popOIs(Class clz, ArrayList fieldNames, ArrayList fieldOIs) throws IntrospectionException { // BeanInfo bi = Introspector.getBeanInfo(clz); PropertyDescriptor[] pps = bi.getPropertyDescriptors() ; for(PropertyDescriptor pp : pps){ String name = pp.getName() ; Class type = pp.getPropertyType() ; Method getter = pp.getReadMethod() ; Method setter = pp.getWriteMethod() ; if(getter != null && setter != null){ if(type == String.class){ fieldNames.add(name) ; fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; } else if(type == int.class || type == Integer.class){ fieldNames.add(name); fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector); } else if(type == long.class || type == Long.class){ fieldNames.add(name); fieldOIs.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector); } } } } /** * 按照指定的列表顺序组装数组 */ public static Object[] convert2Arr(AppStartupLog l, ArrayList fieldNames) { Object[] values = new Object[fieldNames.size()] ; for(int i = 0 ; i < fieldNames.size() ; i ++){ try { values[i] = getPropValue(l , fieldNames.get(i)) ; } catch (Exception e) { } } return values ; } /** * 从指定对象中提取指定的属性值 */ public static Object getPropValue(Object o , String propName) throws Exception { BeanInfo bi = Introspector.getBeanInfo(o.getClass()) ; PropertyDescriptor[] pps = bi.getPropertyDescriptors() ; for(PropertyDescriptor pp : pps){ String name = pp.getName() ; if(name.equals(propName)){ Method getter = pp.getReadMethod(); if(getter != null){ return getter.invoke(o) ; } } } return null ; } } 2.2 LogUtil工具类 package com.oldboy.umeng.common.util; import com.alibaba.fastjson.JSON; import com.oldboy.umeng.common.domain.*; import java.beans.BeanInfo; import java.beans.IntrospectionException; import java.beans.Introspector; import java.beans.PropertyDescriptor; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.text.DecimalFormat; import java.util.*; /** * 日志工具类 */ public class LogUtil { private static Random r = new Random(); /** * 通过内省生成日志对象 */ public static T genLog(Class t) throws Exception { //创建实例 Object obj = t.newInstance(); BeanInfo bi = Introspector.getBeanInfo(t); PropertyDescriptor[] pps = bi.getPropertyDescriptors(); //循环所有属性 for (PropertyDescriptor pp : pps) { //取set方法 Method setter = pp.getWriteMethod(); if (setter != null) { String pname = pp.getName(); Class ptype = pp.getPropertyType(); //字符串类型 if (ptype == String.class) { String pvalue = DictUtil.getRandString(pname.toLowerCase()); setter.invoke(obj, pvalue); } else if (ptype == int.class || ptype == Integer.class) { try { int pvalue = DictUtil.getRandInt(pname.toLowerCase()); setter.invoke(obj, pvalue); } catch (Exception e) { } } } } processLogTime(obj); return (T) obj; } /** * 生成实例,不包含父类的内容 */ public static T genLogNoParents(Class t) throws Exception { //创建实例 Object obj = t.newInstance(); Field[] fs = t.getDeclaredFields(); for (Field f : fs) { String fname = f.getName(); Class ftype = f.getType(); if (ftype == String.class) { String fvalue = DictUtil.getRandString(fname.toLowerCase()); f.setAccessible(true); f.set(obj, fvalue); } else if (ftype == int.class || ftype == Integer.class) { try { int fvalue = DictUtil.getRandInt(fname.toLowerCase()); f.setAccessible(true); f.set(obj, fvalue); } catch (Exception e) { } } } //处理设备id和时间问题 processLogTime(obj); return (T) obj; } /** * 处理时间 */ private static void processLogTime(Object obj) { long now = System.currentTimeMillis(); int dur = 10 * 24 * 60 * 60 * 1000; long thatTime = now - r.nextInt(dur); if (obj instanceof AppBaseLog) { ((AppBaseLog) obj).setCreatedAtMs(thatTime); } } public static List genLogList(Class t, int n) throws Exception { List list = new ArrayList(); for (int i = 0; i < n; i++) { list.add(genLogNoParents(t)); } return list; } /** * 生成日志聚合体 */ public static AppLogAggEntity getLogAgg() throws Exception { Random r = new Random(); int n = 5; AppLogAggEntity agg = genLog(AppLogAggEntity.class); processsDeviceId(agg); agg.setStartupLogs(genLogList(AppStartupLog.class, r.nextInt(n) + 1)); agg.setEventLogs(genLogList(AppEventLog.class, r.nextInt(n) + 1)); agg.setErrorLogs(genLogList(AppErrorLog.class, r.nextInt(n) + 1)); agg.setUsageLogs(genLogList(AppUsageLog.class, r.nextInt(n) + 1)); agg.setPageLogs(genLogList(AppPageLog.class, r.nextInt(n) + 1)); return agg; } /** * 处理设备id */ private static void processsDeviceId(AppLogAggEntity agg) { //1 - 00001 DecimalFormat df = new DecimalFormat("00000"); int devid = r.nextInt(10000) + 1; String str = "dv-" + df.format(devid); agg.setDeviceId(str); } /** * 生成特定的类对应的ddl语句 */ public static String genDDL(Class clazz) throws IntrospectionException { String RN = "\r\n"; //只含类名 String simpleName = clazz.getSimpleName(); //算表名 String tablename = simpleName.substring(3).toLowerCase() + "s"; StringBuilder builder = new StringBuilder(); builder.append(RN) .append("--") .append(tablename).append(RN).append( "create table if not exists " + tablename).append(RN).append("(").append(RN); BeanInfo bi = Introspector.getBeanInfo(clazz); PropertyDescriptor[] pps = bi.getPropertyDescriptors(); for (int i = 0; i < pps.length; i++) { PropertyDescriptor pp = pps[i]; String name = pp.getName(); Class type = pp.getPropertyType(); Method getter = pp.getReadMethod(); Method setter = pp.getWriteMethod(); if (getter != null && setter != null) { //不是最后 if (i != pps.length - 1) { if (type == String.class) { builder.append(" " + name + "\t\t\t string , " + RN); } else if (type == int.class || type == Integer.class) { builder.append(" " + name + "\t\t\t int , " + RN); } else if (type == long.class || type == Long.class) { builder.append(" " + name + "\t\t\t bigint , " + RN); } } else { if (type == String.class) { builder.append(" " + name + "\t\t\t string " + RN); } else if (type == int.class || type == Integer.class) { builder.append(" " + name + "\t\t\t int " + RN); } else if (type == long.class || type == Long.class) { builder.append(" " + name + "\t\t\t bigint " + RN); } } } } //追加结束符 builder.append(")") .append(RN) .append("partitioned by (ym int ,day int , hm int) ") .append(RN) .append("stored as parquet ;") .append(RN) ; return builder.toString() ; } /** * 生成所有的DDL语句 */ public static String genAllDDL() throws IntrospectionException { Class[] clazz = { AppStartupLog.class , AppEventLog.class , AppErrorLog.class , AppUsageLog.class , AppPageLog.class , } ; StringBuilder builder = new StringBuilder() ; builder.append("use umeng_big11 ;") ; builder.append("\r\n") ; for(Class clz : clazz){ builder.append(genDDL(clz)) ; } return builder.toString() ; } /** * 从json个数反串行化日志 */ public static AppLogAggEntity deserLog(String json){ String newJson = json.replace("\\\"" , "\"") ; AppLogAggEntity agg = JSON.parseObject(newJson , AppLogAggEntity.class) ; return agg ; } /** * 合并聚合体中公共属性到每个日志实体中。 */ public static void mergeProp(AppLogAggEntity agg) throws Exception { List sublogs = new ArrayList() ; sublogs.addAll(agg.getStartupLogs()); sublogs.addAll(agg.getErrorLogs()); sublogs.addAll(agg.getEventLogs()); sublogs.addAll(agg.getUsageLogs()); sublogs.addAll(agg.getPageLogs()); for(AppBaseLog log : sublogs){ doMergeProper(agg, log) ; } } /** * 将a的属性合并到b上去 */ private static void doMergeProper(Object a , Object b ) throws Exception { // Map bcache = new HashMap() ; //提取b的属性集合 BeanInfo b_bi = Introspector.getBeanInfo(b.getClass()) ; PropertyDescriptor[] b_pps = b_bi.getPropertyDescriptors(); for(PropertyDescriptor pp : b_pps){ String pname = pp.getName() ; Method setter = pp.getWriteMethod() ; if(setter != null){ bcache.put(pname,setter) ; } } BeanInfo bi = Introspector.getBeanInfo(a.getClass()) ; PropertyDescriptor[] pps = bi.getPropertyDescriptors() ; for(PropertyDescriptor pp : pps){ String name = pp.getName() ; Class type = pp.getPropertyType() ; Method getter = pp.getReadMethod() ; Method setter = pp.getWriteMethod() ; if(getter != null && setter != null && (type == String.class || type == int.class || type == Integer.class || type == long.class || type == Long.class)){ Object value = getter.invoke(a) ; Method b_set = bcache.get(name) ; if(b_set != null){ Class[] ptype = b_set.getParameterTypes(); if(ptype != null && ptype.length == 1){ if(ptype[0] == type){ b_set.setAccessible(true); b_set.invoke(b ,value ) ; } } } } } } } 2.3 ForkLogUDTF package com.oldboy.umeng.hive.udf; import com.oldboy.umeng.common.domain.AppLogAggEntity; import com.oldboy.umeng.common.domain.AppStartupLog; import com.oldboy.umeng.common.util.LogUtil; import com.oldboy.umeng.hive.util.HiveUtil; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.beans.IntrospectionException; import java.util.ArrayList; import java.util.List; /** * 自定义日志叉分函数 */ public class ForkLogUDTF extends GenericUDTF{ //存放字段名称列表 ArrayList fieldNames ; //字段对应的对象检查器 ArrayList fieldOIs ; //转换器数组 ObjectInspectorConverters.Converter[] converters = new ObjectInspectorConverters.Converter[4] ; /** * 判断参数合法性 , 定义输出表结构 , 准备转换器 */ public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if(args.length != 4){ throw new UDFArgumentException("参数个数不对,需要4个参数!!") ; } if(args[0].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentException("第一个参数需要string类型"); } if(args[1].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG){ throw new UDFArgumentException("第二参数需要long类型"); } if(args[2].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[2]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentException("第三个参数需要string类型"); } if(args[3].getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector)args[3]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentException("第四个参数需要string类型"); } //正常处理 //处理输入的OI converters[0] = ObjectInspectorConverters.getConverter(args[0] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; converters[1] = ObjectInspectorConverters.getConverter(args[1] , PrimitiveObjectInspectorFactory.javaLongObjectInspector) ; converters[2] = ObjectInspectorConverters.getConverter(args[2] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; converters[3] = ObjectInspectorConverters.getConverter(args[3] , PrimitiveObjectInspectorFactory.javaStringObjectInspector) ; //输出表结构 fieldNames = new ArrayList(); fieldOIs = new ArrayList(); //组装对象检查器集合 try { HiveUtil.popOIs(AppStartupLog.class , fieldNames , fieldOIs); } catch (IntrospectionException e) { e.printStackTrace(); } return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } public void process(Object[] args) throws HiveException { if (args.length != 4) { throw new UDFArgumentException("参数个数不对,需要4个参数!!"); } String servertimestr = (String)converters[0].convert(args[0]); long clienttimems = (Long)converters[1].convert(args[1]); String clientip = (String)converters[2].convert(args[2]); String log= (String)converters[3].convert(args[3]); //反序列化聚合体 AppLogAggEntity agg = LogUtil.deserLog(log) ; try { //合并属性 LogUtil.mergeProp(agg); List logs = agg.getStartupLogs() ; for(AppStartupLog l : logs){ forward(HiveUtil.convert2Arr( l , fieldNames)); } } catch (Exception e) { e.printStackTrace(); } } public void close() throws HiveException { } } 3、导出jar包部署到hive/lib目录下

4、在hive中注册函数 4.1 添加jar到类路径 $hive>add jar /soft/hive/umeng_hive.jar ; 4.2 注册函数 $hive>create function forklogs as 'com.oldboy.umeng.hive.udf.ForkLogUDTF' ; 4.3 调用函数,查看结果 $hive>use umeng_big11 ; $hive>select forklogs(servertimestr , clienttimems , clientip ,log) from raw_logs ;


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有