0

Я пытаюсь использовать JMX BulkLoader для данных ETL в Кассандре с удаленного узла на кластерDatstax Cassandra bulkloader

https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java

Однако после успешного установления соединения JMX, это, кажется, не объемной нагрузки.

Обратите внимание, что массовая загрузка выдается с удаленного узла в кластер cassandra.

Это почти как если кажется, что он ожидает, что будет работать в местности Кассандры кластера (то есть локальная Кассандра кластер)

Я пропускаю что-нибудь здесь. Может кто-нибудь совет

Исключение ниже

java.lang.IllegalArgumentException: Invalid каталог/XXXXXXXXX в org.apache.cassandra.service.StorageService.bulkLoadInternal (StorageService.java:3970) в org.apache. cassandra.service.StorageService.bulkLoadAsync (StorageService.java:3962) в sun.reflect.GeneratedMethodAccessor21.invoke (Unknown Source) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect . Метод.invoke (Method.java:606) at sun.reflect.misc.Trampoline.invoke (MethodUtil.java:75) на sun.reflect.GeneratedMethodAccessor2.invoke (Unknown Source) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:606) на солнце .reflect.misc.MethodUtil.invoke (MethodUtil.java:279) на com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2 (StandardMBeanIntrospector.java:112) в com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2 (StandardMBeanIntrospector .java: 46) at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM (MBeanIntrospector.java:237) at com.sun.jmx.mbeanserver.PerInterface.invoke (PerInterface.java:138) at com.sun .jmx.mbeanserver.MBeanSupport.invoke (MBeanSupport.java:252) at co m.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke (DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke (JmxMBeanServer.

class JmxBulkLoader(host: String, port: Int) { 

    private var connector: JMXConnector = _ 

    private var storageBean: StorageServiceMBean = _ 

    private var timer: Timer = new Timer() 

    connect("http://hostip , 7199) 

private def connect(host: String, port: Int) { 

    val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host, 

     port)) 

    Logger.info(" Connected to JMX Entity " + jmxUrl) 

    val env = new HashMap[String, Any]() 

    connector = JMXConnectorFactory.connect(jmxUrl, env) 

    val mbeanServerConn = connector.getMBeanServerConnection 

    val name = new ObjectName("org.apache.cassandra.db:type=StorageService") 

    storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean]) 

    } 

    def close() { 

    connector.close() 

    } 

    def bulkLoad(path: String): Boolean = { 

    try { 

     val timer = new Stopwatch().start 

     val result = storageBean.bulkLoadAsync(path) 

     timer.stop 

     Logger.info("Async Result of Bulk Load " + result) 

     Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.") 

     true 

    } catch { 

     case e: Exception => 

     Logger.error("Error in Bulk Loading " + e.printStackTrace()) 

     false 

    } 

    } 

} 

ответ

2

Это почти как если кажется, что он ожидает, что будет работать в местности Кассандры кластера (то есть локальный Кассандре кластера)

Не совсем. Но подумайте об этом: вы вызываете функцию mbean узла Cassandra со строковым параметром. Этот вызов выполняется по процессу Кассандры, который вы вызываете (т. Е. Подключаетесь к). Параметр указывает путь на стороне узла, к которому вы подключаетесь.

Вы должны убедиться, что путь существует на цели и содержит данные, которые вы ожидаете (например, через совместное хранилище или копирование файлов заранее).

1
  1. Таблица должна существовать в Кассандре
  2. Каталог должен быть доступен (локальный) для Кассандры узла.
  3. Каталог должен заканчиваться и целевой пространство ключей таблицы имя:
    /some_path/$KeySpaceName/$TableName

 Смежные вопросы

  • Нет связанных вопросов^_^