2015-06-02 1 views
4

Я спокоен с штормом Apache и пытался с топологией трезубцев для Kafka i.e TransactionalTridentKafkaSpout. Все работает отлично, кроме интерфейса Storm. Несмотря на то, что я не представил никаких данных в своей теме, пользовательский интерфейс Storm продолжает показывать недопустимые испускаемые/переданные значения. Значение счетчика продолжает увеличиваться, даже если в этой теме нет данных. Я попытался удалить данные/журналы, хранящиеся в Zookeeper, шторм, Кафка и воссоздать Кафка тему, а также установилНеправильные значения штормового UI и капиллярный инструмент

topology.stats.sample.rate: 1.0 

, но до сих пор проблема не решена.

А также я наткнулся на инструмент под названием Capillary для мониторинга кластера гроз. Я использую следующие свойства

capillary.zookeepers="192.168.125.20:2181" 
capillary.kafka.zkroot="192.168.125.20:/home/storm/kafka_2.11-0.8.2.0" 
capillary.storm.zkroot="192.168.125.20:/home/storm/apache-storm-0.9.3" 

Я использую встроенный Кафки зоопарка здесь. Даже это не работает при получении исключения.

! @6mbg4bp7l - Internal server error, for (GET) [/] -> 

play.api.Application$$anon$1: Execution exception[[JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values 
at [Source: [email protected]; line: 1, column: 9]]] 
     at play.api.Application$class.handleError(Application.scala:296) ~[com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at play.api.DefaultApplication.handleError(Application.scala:402) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:205) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:202) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4] 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) [org.scala-lang.scala-library-2.10.4.jar:na] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values 
at [Source: [email protected]; line: 1, column: 9] 
     at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:495) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 
     at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1178) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2] 

Любая помощь на любом из них будет замечательной. Заранее спасибо.

Конфигурация и исходный фрагмент кода:

final Config config = new Config(); 
    config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 3000); 
    config.setNumWorkers(2); 
    config.put(Config.NIMBUS_HOST, "192.168.125.20"); 
    config.put(Config.NIMBUS_THRIFT_PORT, 6627); 
    config.put(Config.STORM_ZOOKEEPER_PORT, 2181); 
    config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.125.20")); 
    config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); 
    config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); 
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10); 
    config.put(Config.DRPC_SERVERS, Arrays.asList("192.168.125.20")); 
    config.put(Config.DRPC_PORT, 3772); 

final BrokerHosts zkHosts = new ZkHosts("192.168.125.20"); 
final TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "Test_Topic", ""); 
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4; 
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4; 
kafkaConfig.forceFromStart = false; 

final TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); 
final TridentTopology topology = new TridentTopology(); 
topology.newStream("spout", kafkaSpout) 
     .each(new Fields("str"), new TestFunction(), new Fields("test")) 
     .each(new Fields("str"), new PrintFilter()); 

топологии Краткое изображение: Topology Stats

+0

Не использовать капилляр, но это выглядит как-то не так с конфигурацией file.looking в их исходном коде, как это выглядит ваш конф файл/путь? – user2720864

+0

@ user2720864 Спасибо за ваш ответ. Я обновил настройки conf в своем вопросе. – DMA

+0

Я считаю, что это что-то wromng с Capillary. стоит посмотреть на https://github.com/keenlabs/capillary/issues/5 – user2720864

ответ

2

Вы, возможно, видя, что я назвал бы UI metric artifacts of Trident? Эти кортежи также отображаются в прилавках пользовательского интерфейса Storm:

Trident выполняет партию каждые 500 мс (по умолчанию). Партия включает в себя пучок координационных сообщений, идущих на все болты, для координации партии (даже если партия пуста). Вот что вы видите.

(источник: Trident Kafka Spout - Ack Count Increasing Even Though No Messages Are Processed)

+0

Большое спасибо за ваш ответ. Да, возможно, это то, что я вижу в пользовательском интерфейсе. Координационные сообщения не должны отображаться как испускаемые/обработанные значения, что приводит к путанице в знании фактических значений. Есть ли способ получить фактическое количество сообщений, испускаемых/обработанных, без учета координационного сообщения, как указано в указанной ссылке? – DMA

+0

@DMA Вы нашли способ обойти это? – Sach

+0

@miguno Ссылки, похоже, истекли. У вас есть альтернативные ссылки? – Sach