2

Есть ли способ зарегистрировать пользовательские кодеки при создании экземпляра CassandraConnector?Добавление пользовательского кодека в CassandraConnector

Сейчас я зарегистрировать мои кодеки каждый раз, когда я называю cassandraConnector.withSessionDo

val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf) 
... 
... 
.mapPartitions(partition => { 
    cassandraConnector.withSessionDo(session => { 
    // register custom codecs once for each partition so it isn't loaded as often for each data point 
    if (partition.nonEmpty) { 
     session.getCluster.getConfiguration.getCodecRegistry 
     .register(new TimestampLongCodec) 
     .register(new SummaryStatsBlobCodec) 
     .register(new JavaHistogramBlobCodec) 
    } 

Это кажется немного, как анти шаблон, чтобы сделать это таким образом. Это также действительно засоряет наши журналы, потому что у нас есть службы потоковой передачи искры, которая работает через каждые 30 секунд, и это заполнять наши журналы с:

16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats] 
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it collides with previously registered codec JavaHistogramBlobCodec [blob <-> Histogram] 
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec TimestampLongCodec [timestamp <-> java.lang.Long] because it collides with previously registered codec TimestampLongCodec [timestamp <-> java.lang.Long] 

Edit:

Я попытался их регистрации сразу, как это:

val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf) 
cassandraConnector.withClusterDo(cluster => { 
    cluster.getConfiguration.getCodecRegistry 
    .register(new TimestampLongCodec) 
    .register(new SummaryStatsBlobCodec) 
    .register(new JavaHistogramBlobCodec) 
}) 

Это^работает локально, но при развертывании в нашем кластере mesos он не может найти кодеки. Я предполагаю, что это потому, что он регистрирует только локально в драйвере и никогда не добавляет их в версию исполнителей.

+0

почему бы не зарегистрировать их только один раз, например, после создания экземпляра коннектора? они регистрируются в сеансе, поэтому, если вы его повторно используете, все будет в порядке. – xmas79

+0

Мне показалось, что я добавил что-то об этом в мой вопрос, извините. Я отредактировал свой вопрос. Так вы хотели зарегистрировать их? – nickn

ответ

3

Лучший способ заключается в переопределении соединения Cassandra завод, что-то вроде этого

import com.datastax.driver.core.Cluster 
import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnectorConf, DefaultConnectionFactory} 
object MyConnectionFactory extends CassandraConnectionFactory { 
    override def createCluster(conf: CassandraConnectorConf): Cluster = { 
    val cluster = DefaultConnectionFactory.createCluster(conf) 
    cluster.getConfiguration.getCodecRegistry 
     .register(new TimestampLongCodec) 
     .register(new SummaryStatsBlobCodec) 
     .register(new JavaHistogramBlobCodec) 
    cluster 
    } 
} 

и набор spark.cassandra.connection.factory параметр, чтобы указать на класс

+0

Мне очень нравится этот ответ. Спасибо за помощь. – nickn