Struct複雜數據類型的UDF編寫、兼容HIVE的GenericUDF編寫

一、背景介紹:MaxCompute 2.0版本升級後,Java UDF支持的數據類型從原來的BIGINT、STRING、DOUBLE、BOOLEAN擴展了更多基本的數據類型,同時還擴展支持了ARRAY、MAP、STRUCT等複雜類型,以及Writable參數。Java UDF使用複雜數據類型的方法,STRUCT對應com.aliyun.odps.data.Struct。com.aliyun.odps.data.Struct從反射看不出Field Name和Field Type,所以需要用@Resolve註解來輔助。即如果需要在UDF中使用STRUCT,要求在UDF Class上也標註上@Resolve註解。但是當我們Struct類型中的field有很多字段的時候,這個時候需要我們去手動的添加@Resolve註解就不是那麼的友好。針對這一個問題,我們可以使用Hive 中的GenericUDF去實現。MaxCompute 2.0支持Hive風格的UDF,部分Hive UDF、UDTF可以直接在MaxCompute上使用。

二、複雜數據類型UDF示例示例定義了一個有三個複雜數據類型的UDF,其中第一個用ARRAY作為參數,第二個用MAP作為參數,第三個用STRUCT作為參數。由於第三個Overloads用了STRUCT作為參數或者返回值,因此要求必須對UDF Class添加@Resolve註解,指定STRUCT的具體類型。1.代碼編寫

<code>@Resolve("struct<bigint>,string->string")
public class UdfArray extends UDF {
public String evaluate(List<string> vals, Long len) {
return vals.get(len.intValue());
}
public String evaluate(Map<string> map, String key) {
return map.get(key);
}
public String evaluate(Struct struct, String key) {
return struct.getFieldValue("a") + key;
}
}/<string>/<string>/<bigint>/<code>

2.打jar包添加資源

<code>add jar UdfArray.jar
/<code>

3.創建函數

<code>create function my_index as 'UdfArray' using 'UdfArray.jar';/<code>

4.使用UDF函數

<code>select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;/<code>

三、使用Hive的GenericUDF

這裡我們使用Struct複雜數據類型作為示例,主要處理的邏輯是當我們結構體中兩個字段前後沒有差異時不返回,如果前後有差異將新的字段及其值組成新的結構體返回。示例中Struct的Field為3個。使用GenericUDF方式可以解決需要手動添加@Resolve註解。1.創建一個MaxCompute表

<code>CREATE TABLE IF NOT EXISTS `tmp_ab_struct_type_1` (
`a1` struct<string>,
`b1` struct<string>
);
/<string>/<string>/<code>

2.表中數據結構如下

<code>insert into table tmp_ab_struct_type_1 SELECT named_struct('a',1,'b',3,'c','2019-12-17 16:27:00'), named_struct('a',5,'b',6,'c','2019-12-18 16:30:00');
/<code>

查詢數據如下所示:

Struct複雜數據類型的UDF編寫、兼容HIVE的GenericUDF編寫

3.編寫GenericUDF處理邏輯(1)QSC_DEMOO類

<code>package com.aliyun.udf.struct;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import java.util.ArrayList;
import java.util.List;

/**
* Created by ljw on 2019-12-17
* Description:
*/
@SuppressWarnings("Duplicates")
public class QSC_DEMOO extends GenericUDF {
StructObjectInspector soi1;
StructObjectInspector soi2;

/**
* 避免頻繁Struct對象
*/
private PubSimpleStruct resultStruct = new PubSimpleStruct();
private List extends StructField> allStructFieldRefs;

//1. 這個方法只調用一次,並且在evaluate()方法之前調用。該方法接受的參數是一個arguments數組。該方法檢查接受正確的參數類型和參數個數。
//2. 輸出類型的定義
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
String error = "";
//檢驗參數個數是否正確
if (arguments.length != 2) {
throw new UDFArgumentException("需要兩個參數");

}
//判斷參數類型是否正確-struct
ObjectInspector.Category arg1 = arguments[0].getCategory();
ObjectInspector.Category arg2 = arguments[1].getCategory();
if (!(arg1.equals(ObjectInspector.Category.STRUCT))) {
error += arguments[0].getClass().getSimpleName();
throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \"" +
arg1.name() + "\" " + "is found" + "\\n" + error);
}
if (!(arg2.equals(ObjectInspector.Category.STRUCT))) {
error += arguments[1].getClass().getSimpleName();
throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \""
+ arg2.name() + "\" " + "is found" + "\\n" + error);
}
//輸出結構體定義
ArrayList<string> structFieldNames = new ArrayList();
ArrayList<objectinspector> structFieldObjectInspectors = new ArrayList();
soi1 = (StructObjectInspector) arguments[0];
soi2 = (StructObjectInspector) arguments[1];
StructObjectInspector toValid = null;
if (soi1 == null)
toValid = soi2;
else toValid = soi1;

//設置返回類型
allStructFieldRefs = toValid.getAllStructFieldRefs();
for (StructField structField : allStructFieldRefs) {
structFieldNames.add(structField.getFieldName());
structFieldObjectInspectors.add(structField.getFieldObjectInspector());
}
return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
}

//這個方法類似UDF的evaluate()方法。它處理真實的參數,並返回最終結果。
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
//將hive中的struct類型轉換成com.aliyun.odps.data.Struct, 如果有錯誤,請調試,查看deferredObjects的數據是什麼樣子的
//然後自己進行重新封裝 !!!

ArrayList list1 = (ArrayList) deferredObjects[0].get();
ArrayList list2 = (ArrayList) deferredObjects[1].get();
int len = list1.size();

ArrayList fieldNames = new ArrayList<>();
ArrayList fieldValues = new ArrayList<>();

for (int i = 0; i < len ; i++) {
if (!list1.get(i).equals(list2.get(i))) {
fieldNames.add(allStructFieldRefs.get(i).getFieldName());
fieldValues.add(list2.get(i));
}
}
if (fieldValues.size() == 0) return null;
return fieldValues;
}

//這個方法用於當實現的GenericUDF出錯的時候,打印出提示信息。而提示信息就是你實現該方法最後返回的字符串。
@Override
public String getDisplayString(String[] strings) {
return "Usage:" + this.getClass().getName() + "(" + strings[0] + ")";
}
}/<objectinspector>/<string>/<code>

(2)PubSimpleStruct類

<code>package com.aliyun.udf.struct;
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import java.util.List;

public class PubSimpleStruct implements Struct {

private StructTypeInfo typeInfo;
private List<object> fieldValues;

public StructTypeInfo getTypeInfo() {
return typeInfo;
}

public void setTypeInfo(StructTypeInfo typeInfo) {
this.typeInfo = typeInfo;
}

public void setFieldValues(List<object> fieldValues) {
this.fieldValues = fieldValues;
}

public int getFieldCount() {

return fieldValues.size();
}

public String getFieldName(int index) {
return typeInfo.getFieldNames().get(index);
}

public TypeInfo getFieldTypeInfo(int index) {
return typeInfo.getFieldTypeInfos().get(index);
}

public Object getFieldValue(int index) {
return fieldValues.get(index);
}

public TypeInfo getFieldTypeInfo(String fieldName) {
for (int i = 0; i < typeInfo.getFieldCount(); ++i) {
if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) {
return typeInfo.getFieldTypeInfos().get(i);
}
}
return null;
}

public Object getFieldValue(String fieldName) {
for (int i = 0; i < typeInfo.getFieldCount(); ++i) {
if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) {
return fieldValues.get(i);
}
}
return null;
}

public List<object> getFieldValues() {
return fieldValues;
}

@Override
public String toString() {
return "PubSimpleStruct{" +
"typeInfo=" + typeInfo +
", fieldValues=" + fieldValues +
'}';
}
}
/<object>/<object>/<object>/<code>

3、打jar包,添加資源

<code>add jar test.jar;
/<code>

4、創建函數

<code>CREATE FUNCTION UDF_DEMO as 'com.aliyun.udf.test.UDF_DEMOO' using 'test.jar';
/<code>

5、測試使用UDF函數

<code>set odps.sql.hive.compatible=true;
select UDF_DEMO(a1,b1) from tmp_ab_struct_type_1;/<code>

查詢結果如下所示:


Struct複雜數據類型的UDF編寫、兼容HIVE的GenericUDF編寫


注意:
(1)在使用兼容的Hive UDF的時候,需要在SQL前加set odps.sql.hive.compatible=true;語句,set語句和SQL語句一起提交執行。

(2)目前支持兼容的Hive版本為2.1.0,對應Hadoop版本為2.7.2。如果UDF是在其他版本的Hive/Hadoop開發的,則可能需要使用此Hive/Hadoop版本重新編譯。有疑問可以諮詢阿里雲MaxCompute技術支持:劉建偉

<code>    <dependency>
<groupid>org.apache.hadoop/<groupid>
<artifactid>hadoop-common/<artifactid>
<version>2.7.2/<version>
/<dependency>
<dependency>
<groupid>org.apache.hive/<groupid>
<artifactid>hive-exec/<artifactid>
<version>2.1.0/<version>
/<dependency>
/<code>

歡迎加入“MaxCompute開發者社區2群”,點擊鏈接申請加入或掃描二維碼https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745

Struct複雜數據類型的UDF編寫、兼容HIVE的GenericUDF編寫

更多行業上雲案例敬請關注【阿里云云棲號】


分享到:


相關文章: