摘要:?本文介紹了如何使用MaxCompute UDF對JSON格式的日志進行信息提取和轉(zhuǎn)換。
一、業(yè)務場景分析:
由于業(yè)務的復雜性,數(shù)據(jù)開發(fā)者需要面對不同來源的不同類型數(shù)據(jù),需要把這些數(shù)據(jù)抽取到數(shù)據(jù)平臺,按照設計好的數(shù)據(jù)模型對關鍵業(yè)務字段進行抽取,形成一張二維表,以便后續(xù)在大數(shù)據(jù)平臺/數(shù)據(jù)倉庫中進行統(tǒng)計分析、關聯(lián)計算。
本文結(jié)合一個具體的案例來介紹如何使用MaxCompute對json格式的日志數(shù)據(jù)進行轉(zhuǎn)換處理。
1.數(shù)據(jù)來源:應用實時寫入ECS主機的指定目錄下的日志文件中;
2.數(shù)據(jù)格式:日志文件中,每條日志的格式如下圖所示(示例中對數(shù)據(jù)進行了簡化和脫敏),每一條日志中包含了設備信息,以及1個或多個Session信息,且每條日志中的Session數(shù)量是動態(tài)的:1個或多個Session。每條日志的內(nèi)容示例如下:
3.數(shù)據(jù)處理需求:采集日志數(shù)據(jù),對日志數(shù)據(jù)進行解析、轉(zhuǎn)換,對轉(zhuǎn)換后的日志數(shù)據(jù)在MaxCompute進行統(tǒng)計分析。由于日志數(shù)據(jù)是json格式的,其中包含了多個業(yè)務字段信息,需要將業(yè)務字段提前出來,才能在MaxCompute進行后續(xù)的業(yè)務統(tǒng)計(如進行按照時段進行PV/UV統(tǒng)計、按照設備類型進行統(tǒng)計、關聯(lián)設備ID與會員信息進行統(tǒng)計等),所以本文的關鍵需求就是如何把json格式數(shù)據(jù)的關鍵信息解析為一張包含業(yè)務字段的二維表。
二、解決方案:
本文的解決方案中,選擇使用阿里云的日志服務+MaxCompute產(chǎn)品組合來滿足以上業(yè)務需求,其中日志服務僅僅完成日志采集和投遞的職能,不做數(shù)據(jù)解析和轉(zhuǎn)化工作。
1.日志采集:通過日志服務獲取日志數(shù)據(jù)到logstore(這部分內(nèi)容可參考日志服務幫助文檔)
2.通過日志服務的投遞功能(幫助文檔)將日志定時投遞歸檔到MaxCompute的1張原始日志表,其中每條日志所有信息都寫入到原始日志表的1個字段content中。
3.利用MaxCompute對原始數(shù)據(jù)進行字段解析和提取。
1)利用內(nèi)建函數(shù)get_json_object進行數(shù)據(jù)提取
selectget_json_object(content,'$.DeviceID')?as?DeviceID,get_json_object(content,'$.UniqueIdentifier')?as?UniqueIdentifier,get_json_object(content,'$.GameID')?as?GameID,get_json_object(content,'$.Device')?as?Device,get_json_object(content,'$.Sessions\[0].SessionID')?as?Session1_ID,get_json_object(content,'$.Sessions\[0].Events\[0].Name')?as?Session1_EventName,get_json_object(content,'$.Sessions\[1].SessionID')?as?Session2_ID,get_json_object(content,'$.Sessions\[1].Events\[0].Name')?as?Session2_EventNamefrom?log_target_json?where?pt='20180725'?limit?10
提取的結(jié)果如下:
方案總結(jié):以上處理邏輯,是把一條日志的業(yè)務字段分別提取成為行字段,適合每個json記錄中的信息固定且可以映射為表字段,例如上面的例子,把session1和session2的信息提取出來后,分別看做不同的列字段來處理。但如果每條日志記錄包含的session數(shù)量是動態(tài)不固定的時候,這種處理邏輯就難以滿足需要,例如下一條日志就包含了3個session,如果要提取每個session的信息,就要求解析的SQL增加Session3_ID, Session3_EventName邏輯,如果再下一條日志包含100個session呢?這種提取方式就很難處理了。
這種情況,可以使用UDTF自定義函數(shù)來實現(xiàn)。
2)開發(fā)MaxCompute UDTF函數(shù),對日志進行處理
根據(jù)數(shù)據(jù)特點,1條日志包含了多個session信息,屬于1:N的關系,轉(zhuǎn)換到數(shù)據(jù)倉庫的二維表時,需要解析到最小粒度的session信息,把1行轉(zhuǎn)成N行,提取所有session信息。業(yè)務目標如下所示:
在MaxCompute中,對1行記錄處理轉(zhuǎn)換為多行記錄需要使用UDTF來實現(xiàn)。
我們這里以JAVA UDTF為例,對content字段中的每條json記錄進行解析,獲取并返回需要提取的業(yè)務字段。這里的UDTF的處理邏輯會深入到json的第3級,循環(huán)解析出最小粒度的數(shù)據(jù)并返回多條記錄。
package?com.aliyun.odps;import?com.aliyun.odps.udf.UDFException;import?com.aliyun.odps.udf.UDTF;import?com.aliyun.odps.udf.annotation.Resolve;import?com.google.gson.Gson;import?java.io.IOException;import?java.util.List;import?java.util.Map;@Resolve("string->string,string,string,string,string,string,string,string")public?class?get_json_udtf?extends?UDTF?{
????@Override
????public?void?process(Object[]?objects)?throws?UDFException,?IOException?{
????????String?input?=?(String)?objects[0];
????????Map?map?=?new?Gson().fromJson(input,?Map.class);
????????Object?deviceID?=?map.get("DeviceID");
????????Object?uniqueIdentifier?=?map.get("UniqueIdentifier");
????????Object?gameID?=?map.get("GameID");
????????Object?device?=?map.get("Device");
????????List?sessions?=?(List)?map.get("Sessions");
????????for?(Object?session?:?sessions)?{
????????????Map?sMap?=?(Map)?session;
????????????Object?sessionID?=?sMap.get("SessionID");
????????????List?events?=?(List)?sMap.get("Events");
????????????for?(Object?event?:?events)?{
????????????????String?name?=?(String)?((Map)?event).get("Name");
????????????????String?timestamp?=?(String)?((Map)?event).get("Timestamp");
????????????????String?networkStatus?=?(String)?((Map)?event).get("NetworkStatus");
????????????????forward(deviceID,?uniqueIdentifier,gameID,device,
????????????????????????sessionID,name,timestamp,networkStatus);
????????????}
????????}
????}}注:關于UDF本身編寫、打包上傳、創(chuàng)建Function等知識請參閱官方文檔https://help.aliyun.com/document_detail/27867.html。
程序編寫完畢后,需要打包、上傳UDTF并創(chuàng)建UDF函數(shù):
對編譯好的程序進行打包處理,生成jar包,在MaxCompute客戶端(odpscmd)中,上傳這個資源:
add jar maxcompute_demo-1.0-SNAPSHOT.jar -f;
然后通過命令行創(chuàng)建function:
create function get_json_udtf as com.aliyun.odps.get_json_udtf using maxcompute_demo-1.0-SNAPSHOT.jar';
創(chuàng)建后查看函數(shù):
測試驗證:
對包含原始日志的表進行查詢,使用創(chuàng)建的get_json_udtf對content字段進行查詢:
查詢結(jié)果如下,UDFT函數(shù)對每條json記錄進行處理,生成了多條記錄,符合預期:?
同時,如需要固化處理邏輯,還可以使用insert into語法,將解析后的結(jié)果查詢到一張新表,通過作業(yè)調(diào)度來實現(xiàn)周期性的數(shù)據(jù)轉(zhuǎn)換。
三、總結(jié):
本文通過一個日志分析的大數(shù)據(jù)分析場景,通過一個常見的json日志處理的需求為例,介紹了通過日志服務采集日志到MaxCompute,同時使用MaxCompute的內(nèi)建函數(shù)/UDF等方式,對json格式的日志數(shù)據(jù)進行解析和轉(zhuǎn)換,提取關鍵業(yè)務字段、生成了可用于后續(xù)分析的日志表。
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
電子發(fā)燒友App



























評論