2017-02-22 33 views
0

У нас есть искра потокового приложения, где мы получаем в dstream от Кафки и нужно хранить в dynamoDB .... я буду экспериментировать с двумя способами, чтобы сделать это, как описано в коде нижеСпарк карта против foreachRdd

requestsWithState является Dstream

Фрагмент кода 1 с foreachRDD:

requestsWithState.foreachRDD { rdd => 
    println("Data being populated to Pulsar") 
    rdd.foreach { case (id, eventStream) => 
    println("id is " + id + " Event is " + eventStream) 
    DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
    } 
} 

фрагмент кода 2 с картой:

requestsWithState.map (rdd => { rdd match { 
     case (id, eventStream) => { 
      println("id is " + id + " Event is " + eventStream) 
      val dynamoConnection = setupDynamoClientConnection() 
      DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
     } 
     } 
    }) 

requestsWithState.print(1) 

Код Snippet1 работа в порядке и заполнит базу данных ... второй фрагмент кода не работает .... мы новичок, чтобы зажечь и хотел бы знать причину этого и путь к заставить его работать? ........ причина, по которой мы экспериментируем (мы знаем, что это преобразование, и foreachRdd - это действие) foreachRdd очень медленный для нашего случая использования с большой нагрузкой на кластер, и мы обнаружили, что карта намного быстрее, если мы можем заставить его работать ..... пожалуйста, помогите нам получить код карты, работающий

+0

Вы должны (почти) никогда не имеют побочных эффектов на карте или flatMap! – JiriS

ответ

0

Карта - это тип преобразования (ленивая трансформация) в Spark и не будет выполняться, если после этого вы не вызовете искрение. Для трансформации и действий Спарк, относится к ссылке ниже http://spark.apache.org/docs/latest/programming-guide.html#transformations

+0

..... У меня есть действие после запроса картыWithState.print (1), но все же оно не работает. Я обновил вопрос, соответственно, пожалуйста, посмотрите. – user2359997

+0

RDD неизменяемы, поэтому карта вернет новый rdd. Итак, попробуйте с запросами 'code'WithState = requestWithState.map (rdd => {rdd match { case (id, eventStream) => { println (" id is "+ id +" Событие "+ eventStream) val dynamoConnection = setupDynamoClientConnection() DBUtils.putItem (dynamoConnection, идентификатор, eventStream.toString())} }} ) requestsWithState.print (1) 'code' – Neetika

0

Вариант с map не имеет каких-либо действий и .map не действие, но преобразование.

Преобразования не выполняются без каких-либо действий.

См., Например, http://training.databricks.com/visualapi.pdf или http://spark.apache.org/docs/latest/programming-guide.html#transformations

+0

я есть действие после отображения requestsWithState.print (1) но все же он не работает, я обновил вопрос, соответственно, пожалуйста, посмотрите – user2359997

+0

Привет, вы находите решение своей проблемы, так как вы добавляете запрос на действияWithState.print (1) ?? Я знаю, что это старый вопрос, но я переживаю ваш случай использования. благодаря –

0

DStream.map возвращает другой поток. Вы должны вызывать печать в этом потоке, а не в оригинале.

Таким образом, в Scala:

val transformedStream = requestsWithState.map (rdd => { rdd match { 
     case (id, eventStream) => { 
      println("id is " + id + " Event is " + eventStream) 
      val dynamoConnection = setupDynamoClientConnection() 
      DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
     } 
     } 
    }) 

transformedStream.print(1)