2

Я читаю данные json из очереди Kafka с использованием искровой структурированной потоковой передачи, но мне нужно записать данные json в Elasticsearch.Spark Structured streaming ForeachWriter не может получить sparkContext

Однако я не могу получить sparkContext внутри ForeachWriter, чтобы преобразовать json в RDD. Это бросает NPE.

Как я могу получить SparkContext в Writer для преобразования json в RDD?

+0

измените свой вопрос и предоставить соответствующий код, и ошибка, которую вы got – Yaron

+0

Почему Kafka не может напрямую подключиться к Elastic? –

+0

@ cricket_007 без искры? Мне нужно использовать потоки Spark для обработки входящих данных с ML –

ответ

0

Вы не можете. Методы в ForeachWriter работают у исполнителей. Вы можете либо написать приемник Elasticsearch самостоятельно, либо обратиться к исходным API API Elasticsearch для записи данных.

+0

«С помощью elasticsearch-hadoop любое RDD может быть сохранено в Elasticsearch, если его содержимое может быть переведено в документы» https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html –

+0

Вы не можете получить RDD, если не будете писать раковину. – zsxwing

+0

OP должен иметь RDD от Kafka DStream. –

0

Я решил проблему, получив экземпляр SparkContext внутри ForeachWriter

val writer = new ForeachWriter[CustomerData] { 

    override def open(partitionId: Long, version: Long) = true 
    override def process(value: CustomerData) = { 
     val spark = SparkSession 
     .builder() 
     .getOrCreate() //this works 
     ... 
    } 
    override def close(errorOrNull: Throwable) = {} 
} 

PS: Это, вероятно, создает новый SparkSession

+2

Вы действительно проверяли, работает ли это или нет? Я могу представить, что это сломает много путей кода, потому что вы создаете SparkSession у исполнителей. – zsxwing

+0

@zsxwing да, это сработало, я предположил, что он не создал новую сессию Spark, но возвращает текущую SparkSession - 'getOrCreate()' –

+0

В исполнителях нет возможности использовать SparkSession, поэтому она создаст новую. И вы можете увидеть некоторые случайные ошибки, потому что это нарушает некоторые предположения. Например, 'SparkEnv.get' может возвращать неправильное значение в исполнителях. – zsxwing