Я разрабатываю собственный интерпретатор для определенного для домена языка. Основываясь на примере, приведенном в документации Apache Zeppelin (https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html), интерпретатор работает очень хорошо. Теперь я хочу сохранить некоторые результаты в новом DataFrame.Zeppelin: Как создать DataFrame из пользовательского интерпретатора?
Я нашел код для создания DataFrames (http://spark.apache.org/docs/latest/sql-programming-guide.html), но я не могу использовать его в своем интерпретаторе, потому что я вообще не нахожу способ получить доступ к действительной среде SparkContext (часто называемой «sc») из моего пользовательского интерпретатора.
Я попытался (статический) SparkContext.getOrCreate(), но это даже привело к исключению ClassNotFoundException. Затем я добавил все zeppelin-spark-dependencies ... jar в мою папку интерпретатора, которая решила проблему загрузки класса, но теперь я получаю SparkException («главный url должен быть установлен ...»).
Любая идея, как я могу получить доступ к SparkContext для ноутбука из пользовательского интерпретатора? Большое спасибо!
UPDATE
Благодаря комментарий Kangrok Ли ниже, мой код выглядит следующим образом: смотрите ниже. Он работает и, кажется, создает DataFrame (по крайней мере, он больше не бросает никаких исключений). Но я не могу потреблять созданный DataFrame в последующем SQL пункта (первый абзац использует мой «% OPL» переводчика, как указано ниже, что должно создать «результат» DataFrame):
%opl
1 2 3
> 1
> 2
> 3
%sql
select * from result
> Table not found: result; line 1 pos 14
Так, вероятно, есть все еще что-то не так с моим способом справиться с SparkContext. Есть идеи? Большое спасибо!
package opl;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OplInterpreter2 extends Interpreter {
static {
Interpreter.register("opl","opl",OplInterpreter2.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.master", "local[4]", "spark.master")
.add("spark.app.name", "Opl Interpreter", "spark.app.name")
.add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer")
.build());
}
private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);
private void log(Object o) {
if (logger != null)
logger.warn("OplInterpreter2 "+o);
}
public OplInterpreter2(Properties properties) {
super(properties);
log("CONSTRUCTOR");
}
@Override
public void open() {
log("open()");
}
@Override
public void cancel(InterpreterContext arg0) {
log("cancel()");
}
@Override
public void close() {
log("close()");
}
@Override
public List<String> completion(String arg0, int arg1) {
log("completion()");
return new ArrayList<String>();
}
@Override
public FormType getFormType() {
log("getFormType()");
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext arg0) {
log("getProgress()");
return 100;
}
@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
log("interpret() "+string);
PrintStream oldSys = System.out;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
execute(string);
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
baos.toString());
} catch (Exception ex) {
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.ERROR,
InterpreterResult.Type.TEXT,
ex.toString());
}
}
private void execute(String code) throws Exception {
SparkContext sc = SparkContext.getOrCreate();
SQLContext sqlc = SQLContext.getOrCreate(sc);
StructType structType = new StructType().add("value",DataTypes.IntegerType);
ArrayList<Row> list = new ArrayList<Row>();
for (String s : code.trim().split("\\s+")) {
int value = Integer.parseInt(s);
System.out.println(value);
list.add(RowFactory.create(value));
}
DataFrame df = sqlc.createDataFrame(list,structType);
df.registerTempTable("result");
}
}