У меня есть приложение Spark Streaming, которое считывает данные из SINGLE TOPIC в Кафке, обрабатывает их и вставляет в 2 разных пространства ключей в Кассандре, основываясь на содержании элемента. Некоторые данные могут перейти в пространстве ключей А, другой к B. пространства ключейЕсли еще в Spark Streaming
Я делаю это в настоящее время с помощью операции фильтра:
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")
Так фильтр применяется на каждом РДУ, те элементы, которые имеют арендатор поле А пойти keypace A и те, в которых поле арендатора B переходит в пространство ключей B.
Есть ли более эффективный способ сделать это, вместо того, чтобы применять операцию фильтрации 2 раза (особенно потому, что позже может быть более 2-х ключей)? Будет ли кэширование rdd до того, как операции фильтра увеличат производительность?
Повторяю, у меня есть DStream, который приходит из Kafka, я обрабатываю его, а затем в операции «foreachRDD» у меня есть фрагмент кода сверху, который вставляет данные в Cassandra.
Спасибо
Спасибо за ваш ответ. Должен ли я добавить «rdd.unpersist (true)» после трансфектации фильтра, чтобы освободить его из памяти? –
Вы можете, но если у вас есть этот кусок кода внутри метода. Затем, как только вы выйдете за пределы этого метода, он автоматически удалит его из памяти. –
Также, если я правильно помню, не поддерживается, сохраняет результат в памяти драйвера и удаляет его из рабочей памяти. уничтожить, с другой стороны, удаляет его повсюду. –