Есть два набора данных А и В (имеющих один столбец - ID)Реализовать функциональность AB с помощью Каскадные с LEFT JOIN
Cat A
1
2
3
4
5
6
7
cat B
4
5
2
8
18
19
2197
Cat A-B
1
3
6
7
Это вычитание делается в 2 этапа шаг 1: присоединиться к BY ID, LEFT JOIN в пО ID) это даст набор данных, который имеет 2 колонки, где первый столбец будет иметь все записи для набора данных A и 2-го столбца будут иметь только записи соответствия из B
1
2 2
3
4 4
5 5
6
7
Шаг 2: Фильтр набор данных от шага 1 по записям, где второе поле null Таким образом, мы реализовали A-B, используя LEFT JOIN.
Я могу выполнить шаг 1, но я не в состоянии осуществить шаг 2. Ниже приведен исходный код для шага 1
public class AMinusB {
public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap,
Tap outputTap) {
Pipe bpipe = new Pipe("b_pipe");
Pipe apipe = new Pipe("a_pipe");
Fields b_user_id = new Fields("B_id");
Fields a_user_id = new Fields("A_id");
Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id,
new LeftJoin());
Pipe retainPipe = new Pipe("retain", joinPipe);
retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id"));
Pipe cdistPipe = new Pipe("UniquePipe", retainPipe);
Fields selector = new Fields("A_id", "B_id");
cdistPipe = new Unique(cdistPipe, selector);
FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap)
.addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap)
.setName("A-B using left outer join");
return flowDef;
}
public static void main(String[] args) {
String Apath = "path to data set A";
String Bpath = "path to data set B";
String outputPath = "path to output";
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties,
LocationsNumForAProduct.class);
FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);
Fields A = new Fields("A_id");
Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath);
Fields B = new Fields("B_id");
Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath);
Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath);
FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap);
flowConnector.connect(flowDefLeftJoin).complete();
}
}