2016-06-22 3 views
0

Есть два набора данных А и В (имеющих один столбец - ID)Реализовать функциональность AB с помощью Каскадные с LEFT JOIN

Cat A 

    1 
    2 
    3 
    4 
    5 
    6 
    7 

cat B 
4 
5 
2 
8 
18 
19 
2197 

Cat A-B 
1 
3 
6 
7 

Это вычитание делается в 2 этапа шаг 1: присоединиться к BY ID, LEFT JOIN в пО ID) это даст набор данных, который имеет 2 колонки, где первый столбец будет иметь все записи для набора данных A и 2-го столбца будут иметь только записи соответствия из B

1 
2 2 
3 
4 4 
5 5 
6 
7 

Шаг 2: Фильтр набор данных от шага 1 по записям, где второе поле null Таким образом, мы реализовали A-B, используя LEFT JOIN.

Я могу выполнить шаг 1, но я не в состоянии осуществить шаг 2. Ниже приведен исходный код для шага 1

public class AMinusB { 

public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap, 
     Tap outputTap) { 
    Pipe bpipe = new Pipe("b_pipe"); 
    Pipe apipe = new Pipe("a_pipe"); 
    Fields b_user_id = new Fields("B_id"); 
    Fields a_user_id = new Fields("A_id"); 

    Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id, 
      new LeftJoin()); 
    Pipe retainPipe = new Pipe("retain", joinPipe); 
    retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id")); 

    Pipe cdistPipe = new Pipe("UniquePipe", retainPipe); 

    Fields selector = new Fields("A_id", "B_id"); 

    cdistPipe = new Unique(cdistPipe, selector); 

    FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap) 
      .addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap) 
      .setName("A-B using left outer join"); 
    return flowDef; 
} 

public static void main(String[] args) { 
    String Apath = "path to data set A"; 
    String Bpath = "path to data set B"; 
    String outputPath = "path to output"; 
    Properties properties = new Properties(); 
    AppProps.setApplicationJarClass(properties, 
      LocationsNumForAProduct.class); 
    FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties); 

    Fields A = new Fields("A_id"); 
    Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath); 

    Fields B = new Fields("B_id"); 
    Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath); 

    Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath); 

    FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap); 
    flowConnector.connect(flowDefLeftJoin).complete(); 

} 

}

ответ