2016-06-24 2 views
1

Ниже приведена ошибка после запуска потока в течение определенного времени? Я не могу найти, кто несет ответственность за создание файла .sst?Ошибка ошибки состояния Kafka-streams

Env:

Kafka version 0.10.0-cp1

scala 2.11.8

org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg 
     at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424) 
     at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414) 
     at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165) 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330) 
     at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) 
     at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) 
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory 
     at org.rocksdb.RocksDB.flush(Native Method) 
     at org.rocksdb.RocksDB.flush(RocksDB.java:1329) 
     at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422) 
     ... 9 more 
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]: (org.apache.kafka.streams.processor.internals.StreamThread:452) 
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg 
     at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324) 
     at org.apache.kafka.streams.state.internals.RocksDBStore.flushCache(RocksDBStore.java:379) 
     at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:411) 
     at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165) 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330) 
     at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) 
     at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) 
     at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:248) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:228) 
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory 
     at org.rocksdb.RocksDB.write0(Native Method) 
     at org.rocksdb.RocksDB.write(RocksDB.java:546) 
     at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:322) 
     ... 9 more 

ответ

4

RocksDB используется внутри Кафки Streams для обработки состояния оператора - и RocksDB записать некоторые файлы на диск.

Возможно ли, что кто-то удалил материал в папке /tmp и, таким образом, удалил состояние вашего приложения Kafka Streams? Если да, то настроить другое состояние магазина местоположение с помощью параметра state.dir (см http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters)

+0

Да это, кажется, говорят все: 'Вызванный: org.rocksdb.RocksDBException: Ошибка ввода-вывода:/TMP/Кафка-потоки/поз/0_0 /rocksdb/agg/000008.sst: Нет такого файла или каталога ' –

+0

@ rahul-shukla Это ответ на ваш вопрос? Если да, не стесняйтесь принимать и/или повышать. –

+0

@ matthias-j-sax Не могли бы вы прояснить, что происходит в случае, когда государственный диск очищается при перезагрузке? Это приводит к непоследовательному поведению приложения kafka streams для следующего запуска? Должно ли государство быть тщательно сохранено разработчиком? Я не мог найти упоминания об этом в документах Confluent. – Oleg