Я пытаюсь использовать JMX BulkLoader для данных ETL в Кассандре с удаленного узла на кластерDatstax Cassandra bulkloader
Однако после успешного установления соединения JMX, это, кажется, не объемной нагрузки.
Обратите внимание, что массовая загрузка выдается с удаленного узла в кластер cassandra.
Это почти как если кажется, что он ожидает, что будет работать в местности Кассандры кластера (то есть локальная Кассандра кластер)
Я пропускаю что-нибудь здесь. Может кто-нибудь совет
Исключение ниже
java.lang.IllegalArgumentException: Invalid каталог/XXXXXXXXX в org.apache.cassandra.service.StorageService.bulkLoadInternal (StorageService.java:3970) в org.apache. cassandra.service.StorageService.bulkLoadAsync (StorageService.java:3962) в sun.reflect.GeneratedMethodAccessor21.invoke (Unknown Source) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect . Метод.invoke (Method.java:606) at sun.reflect.misc.Trampoline.invoke (MethodUtil.java:75) на sun.reflect.GeneratedMethodAccessor2.invoke (Unknown Source) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:606) на солнце .reflect.misc.MethodUtil.invoke (MethodUtil.java:279) на com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2 (StandardMBeanIntrospector.java:112) в com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2 (StandardMBeanIntrospector .java: 46) at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM (MBeanIntrospector.java:237) at com.sun.jmx.mbeanserver.PerInterface.invoke (PerInterface.java:138) at com.sun .jmx.mbeanserver.MBeanSupport.invoke (MBeanSupport.java:252) at co m.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke (DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke (JmxMBeanServer.
class JmxBulkLoader(host: String, port: Int) {
private var connector: JMXConnector = _
private var storageBean: StorageServiceMBean = _
private var timer: Timer = new Timer()
connect("http://hostip , 7199)
private def connect(host: String, port: Int) {
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,
port))
Logger.info(" Connected to JMX Entity " + jmxUrl)
val env = new HashMap[String, Any]()
connector = JMXConnectorFactory.connect(jmxUrl, env)
val mbeanServerConn = connector.getMBeanServerConnection
val name = new ObjectName("org.apache.cassandra.db:type=StorageService")
storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])
}
def close() {
connector.close()
}
def bulkLoad(path: String): Boolean = {
try {
val timer = new Stopwatch().start
val result = storageBean.bulkLoadAsync(path)
timer.stop
Logger.info("Async Result of Bulk Load " + result)
Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")
true
} catch {
case e: Exception =>
Logger.error("Error in Bulk Loading " + e.printStackTrace())
false
}
}
}