2016-05-08 2 views
2

Я разрабатываю собственный интерпретатор для определенного для домена языка. Основываясь на примере, приведенном в документации Apache Zeppelin (https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html), интерпретатор работает очень хорошо. Теперь я хочу сохранить некоторые результаты в новом DataFrame.Zeppelin: Как создать DataFrame из пользовательского интерпретатора?

Я нашел код для создания DataFrames (http://spark.apache.org/docs/latest/sql-programming-guide.html), но я не могу использовать его в своем интерпретаторе, потому что я вообще не нахожу способ получить доступ к действительной среде SparkContext (часто называемой «sc») из моего пользовательского интерпретатора.

Я попытался (статический) SparkContext.getOrCreate(), но это даже привело к исключению ClassNotFoundException. Затем я добавил все zeppelin-spark-dependencies ... jar в мою папку интерпретатора, которая решила проблему загрузки класса, но теперь я получаю SparkException («главный url должен быть установлен ...»).

Любая идея, как я могу получить доступ к SparkContext для ноутбука из пользовательского интерпретатора? Большое спасибо!

UPDATE

Благодаря комментарий Kangrok Ли ниже, мой код выглядит следующим образом: смотрите ниже. Он работает и, кажется, создает DataFrame (по крайней мере, он больше не бросает никаких исключений). Но я не могу потреблять созданный DataFrame в последующем SQL пункта (первый абзац использует мой «% OPL» переводчика, как указано ниже, что должно создать «результат» DataFrame):

%opl 
1 2 3 
> 1 
> 2 
> 3 

%sql 
select * from result 
> Table not found: result; line 1 pos 14 

Так, вероятно, есть все еще что-то не так с моим способом справиться с SparkContext. Есть идеи? Большое спасибо!

package opl; 

import java.io.ByteArrayOutputStream; 
import java.io.PrintStream; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

import org.apache.spark.SparkContext; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructType; 
import org.apache.zeppelin.interpreter.Interpreter; 
import org.apache.zeppelin.interpreter.InterpreterContext; 
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; 
import org.apache.zeppelin.interpreter.InterpreterResult; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class OplInterpreter2 extends Interpreter { 

static { 
    Interpreter.register("opl","opl",OplInterpreter2.class.getName(), 
     new InterpreterPropertyBuilder() 
     .add("spark.master", "local[4]", "spark.master") 
     .add("spark.app.name", "Opl Interpreter", "spark.app.name") 
     .add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer") 
     .build()); 
} 

private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class); 

private void log(Object o) { 
    if (logger != null) 
     logger.warn("OplInterpreter2 "+o); 
} 

public OplInterpreter2(Properties properties) { 
    super(properties); 
    log("CONSTRUCTOR"); 
} 

@Override 
public void open() { 
    log("open()"); 
} 

@Override 
public void cancel(InterpreterContext arg0) { 
    log("cancel()"); 
} 

@Override 
public void close() { 
    log("close()"); 
} 

@Override 
public List<String> completion(String arg0, int arg1) { 
    log("completion()"); 
    return new ArrayList<String>(); 
} 

@Override 
public FormType getFormType() { 
    log("getFormType()"); 
    return FormType.SIMPLE; 
} 

@Override 
public int getProgress(InterpreterContext arg0) { 
    log("getProgress()"); 
    return 100; 
} 

@Override 
public InterpreterResult interpret(String string, InterpreterContext context) { 
    log("interpret() "+string); 
    PrintStream oldSys = System.out; 
    try { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     PrintStream ps = new PrintStream(baos); 
     System.setOut(ps); 
     execute(string); 
     System.out.flush(); 
     System.setOut(oldSys); 
     return new InterpreterResult(
       InterpreterResult.Code.SUCCESS, 
       InterpreterResult.Type.TEXT, 
       baos.toString()); 
    } catch (Exception ex) { 
     System.out.flush(); 
     System.setOut(oldSys); 
     return new InterpreterResult(
       InterpreterResult.Code.ERROR, 
       InterpreterResult.Type.TEXT, 
       ex.toString()); 
    } 
} 

private void execute(String code) throws Exception { 
    SparkContext sc = SparkContext.getOrCreate(); 
    SQLContext sqlc = SQLContext.getOrCreate(sc); 
    StructType structType = new StructType().add("value",DataTypes.IntegerType); 
    ArrayList<Row> list = new ArrayList<Row>(); 
    for (String s : code.trim().split("\\s+")) { 
     int value = Integer.parseInt(s); 
     System.out.println(value); 
     list.add(RowFactory.create(value)); 
    } 
    DataFrame df = sqlc.createDataFrame(list,structType); 
    df.registerTempTable("result"); 
} 
} 

ответ

0

Я думаю, что вы должны сконфигурировать искровой кластер, такой как инструкция ниже.

spark.master = "local[4]"

spark.app.name = "My Spark App"

spark.serializer = "org.apache.spark.serializer.KryoSerializer"

Использование SparkContext.getOrCreate() выглядит хорошо для меня.

Спасибо, Kangrok Ли

0

Наконец я нашел решение, хотя я не думаю, что это очень хороший. В приведенном ниже коде я использую функцию getSparkInterpreter(), которую я нашел в org.apache.zeppelin.spark.PySparkInterpreter.java.

Это требует, чтобы я поместил свой упакованный код (jar) в папку интерпретатора Spark вместо своей собственной папки интерпретатора, которая, по моему мнению, должна быть предпочтительной (согласно https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html). Кроме того, мой интерпретатор не отображается на странице конфигурации переводчика Цеппелина как собственный интерпретатор. Но он может быть использован в пункте Цеппелина, тем не менее.

И: В коде я могу создать DataFrame, и это также можно использовать вне моего абзаца - чего я хотел достичь.

package opl; 

import java.io.ByteArrayOutputStream; 
import java.io.PrintStream; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructType; 
import org.apache.zeppelin.interpreter.Interpreter; 
import org.apache.zeppelin.interpreter.InterpreterContext; 
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; 
import org.apache.zeppelin.interpreter.InterpreterResult; 
import org.apache.zeppelin.interpreter.LazyOpenInterpreter; 
import org.apache.zeppelin.interpreter.WrappedInterpreter; 
import org.apache.zeppelin.spark.SparkInterpreter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class OplInterpreter2 extends Interpreter { 

    static { 
     Interpreter.register(
       "opl", 
       "spark",//"opl", 
       OplInterpreter2.class.getName(), 
       new InterpreterPropertyBuilder() 
        .add("sth", "defaultSth", "some thing") 
        .build()); 
    } 

    private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class); 

    private void log(Object o) { 
     if (logger != null) 
      logger.warn("OplInterpreter2 "+o); 
    } 

    public OplInterpreter2(Properties properties) { 
     super(properties); 
     log("CONSTRUCTOR"); 
    } 

    @Override 
    public void open() { 
     log("open()"); 
    } 

    @Override 
    public void cancel(InterpreterContext arg0) { 
     log("cancel()"); 
    } 

    @Override 
    public void close() { 
     log("close()"); 
    } 

    @Override 
    public List<String> completion(String arg0, int arg1) { 
     log("completion()"); 
     return new ArrayList<String>(); 
    } 

    @Override 
    public FormType getFormType() { 
     log("getFormType()"); 
     return FormType.SIMPLE; 
    } 

    @Override 
    public int getProgress(InterpreterContext arg0) { 
     log("getProgress()"); 
     return 100; 
    } 

    @Override 
    public InterpreterResult interpret(String string, InterpreterContext context) { 
     log("interpret() "+string); 
     PrintStream oldSys = System.out; 
     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      PrintStream ps = new PrintStream(baos); 
      System.setOut(ps); 
      execute(string); 
      System.out.flush(); 
      System.setOut(oldSys); 
      return new InterpreterResult(
        InterpreterResult.Code.SUCCESS, 
        InterpreterResult.Type.TEXT, 
        baos.toString()); 
     } catch (Exception ex) { 
      System.out.flush(); 
      System.setOut(oldSys); 
      return new InterpreterResult(
        InterpreterResult.Code.ERROR, 
        InterpreterResult.Type.TEXT, 
        ex.toString()); 
     } 
    } 

    private void execute(String code) throws Exception { 
     SparkInterpreter sintp = getSparkInterpreter(); 
     SQLContext sqlc = sintp.getSQLContext(); 
     StructType structType = new StructType().add("value",DataTypes.IntegerType); 
     ArrayList<Row> list = new ArrayList<Row>(); 
     for (String s : code.trim().split("\\s+")) { 
      int value = Integer.parseInt(s); 
      System.out.println(value); 
      list.add(RowFactory.create(value)); 
     } 
     DataFrame df = sqlc.createDataFrame(list,structType); 
     df.registerTempTable("result"); 
    } 

    private SparkInterpreter getSparkInterpreter() { 
     LazyOpenInterpreter lazy = null; 
     SparkInterpreter spark = null; 
     Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); 
     while (p instanceof WrappedInterpreter) { 
      if (p instanceof LazyOpenInterpreter) { 
       lazy = (LazyOpenInterpreter) p; 
      } 
      p = ((WrappedInterpreter) p).getInnerInterpreter(); 
     } 
     spark = (SparkInterpreter) p; 
     if (lazy != null) { 
      lazy.open(); 
     } 
     return spark; 
    } 
}