У нас есть искра потокового приложения, где мы получаем в dstream от Кафки и нужно хранить в dynamoDB .... я буду экспериментировать с двумя способами, чтобы сделать это, как описано в коде нижеСпарк карта против foreachRdd
requestsWithState является Dstream
Фрагмент кода 1 с foreachRDD:
requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
фрагмент кода 2 с картой:
requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
Код Snippet1 работа в порядке и заполнит базу данных ... второй фрагмент кода не работает .... мы новичок, чтобы зажечь и хотел бы знать причину этого и путь к заставить его работать? ........ причина, по которой мы экспериментируем (мы знаем, что это преобразование, и foreachRdd - это действие) foreachRdd очень медленный для нашего случая использования с большой нагрузкой на кластер, и мы обнаружили, что карта намного быстрее, если мы можем заставить его работать ..... пожалуйста, помогите нам получить код карты, работающий
Вы должны (почти) никогда не имеют побочных эффектов на карте или flatMap! – JiriS