2017-02-16 14 views
1

Я пытаюсь передать файл данных (final) в качестве ввода.Передача мешка в качестве входного сигнала для UDF в PIG

dump final; 

дает: -

(4,john,john,David,Banking ,4,M,20-01-1994,78.65,345000,Arkansasdest1,Destination) 
(4,john,john,David,Banking ,4,M,20-01-1994,78.65,345000,Arkanssdest2,Destination) 
(4,johns,johns,David,Banking ,4,M,20-01-1994,78.65,345000,ArkansasSrc1,source) 
(4,johns,johns,David,Banking ,4,M,20-01-1994,78.65,345000,ArkansaSrc2,source) 

Я собирался написать UDF для обработки выше databag и находить несоответствия между источником и местом назначения для того, чтобы это сделать, я должен проверить, является ли мой UDF принимает бит данных или нет. поэтому я написал один образец ОДС ниже:

package PigUDFpck; 

import java.io.IOException; 
import java.util.Iterator; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 


public class databag extends EvalFunc<DataBag> { 
TupleFactory mTupleFactory = TupleFactory.getInstance(); 
BagFactory mBagFactory = BagFactory.getInstance(); 

public DataBag exec(Tuple input) throws IOException { // different return type 

    DataBag result = mBagFactory.newDefaultBag(); // change here 
    DataBag values = (DataBag)input.get(0); 
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) { 
     Tuple tuple = iterator.next(); 

     //logic 
     Tuple t = mTupleFactory.getInstance().newTuple(); 


     t.append(tuple); 

     result.add(t); 
    } 
    return result; // change here 
} 

} 

После этого я зарегистрировал путь, используя

REGISTER /usr/local/pig/UDF/UDFBAG.jar; 
DEFINE Databag Databag(); // not sure how to define it 

2017-02-16 19: 07: 05875 [Основной] WARN org.apache.pig.newplan .BaseOperatorPlan - обнаружено предупреждение IMPLICIT_CAST_TO_INT 2 раза. // получил это предупреждение после определения.

final1 = FOREACH final GENERATE(Databag(final)); 

ОШИБКА 1200: Pig сценарий не удалось разобрать: Invalid проекционный скаляр: финал: Колонка должна быть спроецирована из соотношения для его использования в качестве скалярного

Пожалуйста, помогите мне по определению UDF и как передать DataBag в UDF

Благодаря

+0

Ваш код выглядит хорошо, не зная, почему вы получаете предупреждение и ошибку. Можете ли вы попытаться сделать одно изменение в цикле for, вместо использования итератора, используйте 'for (Tuple tuple: values) {// code}'. Также почему вы создаете новый кортеж для каждой строки? –

+0

Hi Rajen, Не могли бы вы рассказать мне, как определить UDF. – Vickyster

ответ

1

Try

final1 = FOREACH final GENERATE(Databag(*)); 

Хотя, насколько я вижу, ваш финал содержит кортежи, а не мешки с кортежами, поэтому вам, вероятно, придется сначала сгруппировать его одним ключом. в этом случае это будет как-то вроде

final1 = FOREACH (group final [by key or all]) GENERATE(Databag(final));