у нас возникла проблема, связанная с переносом кода нашего приложения с версии 0.8.2.1 до 0.9.0.0 от Apache Kafka.Портирование приложения от Kafka 0.8.2.1 до Kafka 0.9.0. Чтение смещений выпуск
Мы имеем в виду, в данном случае, до версии Кафки выпущен Cloudera:
kafka_2.10-0.8.2.0-Кафка-1.3.2
kafka_2.11-0.9.0- kafka-2.0.2
Мы обнаружили проблему при чтении и записи смещений в теме метаданных __consumer_offsets. В частности, мы используем BlockingChannel для подключения к Kafka Broker, и во время вызова метода receive() мы получаем исключение EOFException.
В частности:
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129)
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120)
Одной из возможных причин может быть различия между двумя версиями Кафки API.
Кафка 0.8.2
в нашем приложении, мы называем
ConsumerMetadataResponse.readFrom(channel.receive().buffer())
приема метод заключается в следующем
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
response
}
, как мы можем видеть, что это возвращает kafka.network . Получите, что является чертой, которая расширяет черту kafka.network.Transmission. В этом Receive, метод буфера существует и переписан в kafka.network.BoundedByteBufferReceive
def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}
Кафки 0.9.0
Мы изменили предыдущую строку
GroupCoordinatorResponse.readFrom(channel.receive().payload())
приема метод в этой версии API выглядит следующим образом:
def receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()
val response = readCompletely(readChannel)
response.payload().rewind()
response
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}
Как мы видим, это возвращает вместо этого kafka.network.NetworkReceive, который является классом, который реализует интерфейс kafka.network.Receive, то теперь написанный на Java и совершенно отличную от предыдущей. Здесь нет никакого метода буфера, но только метод полезной нагрузки, которая возвращает содержимое
private ByteBuffer buffer;
Как мы могли бы решить? Заранее спасибо