0

У меня есть приложение Spark Streaming, которое считывает данные из SINGLE TOPIC в Кафке, обрабатывает их и вставляет в 2 разных пространства ключей в Кассандре, основываясь на содержании элемента. Некоторые данные могут перейти в пространстве ключей А, другой к B. пространства ключейЕсли еще в Spark Streaming

Я делаю это в настоящее время с помощью операции фильтра:

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName") 
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName") 

Так фильтр применяется на каждом РДУ, те элементы, которые имеют арендатор поле А пойти keypace A и те, в которых поле арендатора B переходит в пространство ключей B.

Есть ли более эффективный способ сделать это, вместо того, чтобы применять операцию фильтрации 2 раза (особенно потому, что позже может быть более 2-х ключей)? Будет ли кэширование rdd до того, как операции фильтра увеличат производительность?

Повторяю, у меня есть DStream, который приходит из Kafka, я обрабатываю его, а затем в операции «foreachRDD» у меня есть фрагмент кода сверху, который вставляет данные в Cassandra.

Спасибо

ответ

0

Перед тем, как сделать

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName") 
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName") 

Убедитесь, что делать rdd.cache()

Когда вы делаете, как и выше, ваша искра пытается читать РДД данные дважды. Spark никогда не сохраняет никаких rdd в памяти, если вы не кэшируете или не транслируете его.

Другим способом может быть чтение всех данных сразу, кеширование, если набор данных не огромен. Затем используйте groupByKey, где ключ будет вашим ключевым словом (элементом) в этом случае.

+0

Спасибо за ваш ответ. Должен ли я добавить «rdd.unpersist (true)» после трансфектации фильтра, чтобы освободить его из памяти? –

+0

Вы можете, но если у вас есть этот кусок кода внутри метода. Затем, как только вы выйдете за пределы этого метода, он автоматически удалит его из памяти. –

+0

Также, если я правильно помню, не поддерживается, сохраняет результат в памяти драйвера и удаляет его из рабочей памяти. уничтожить, с другой стороны, удаляет его повсюду. –

 Смежные вопросы

  • Нет связанных вопросов^_^