2017-01-18 7 views
3

у нас возникла проблема, связанная с переносом кода нашего приложения с версии 0.8.2.1 до 0.9.0.0 от Apache Kafka.Портирование приложения от Kafka 0.8.2.1 до Kafka 0.9.0. Чтение смещений выпуск

Мы имеем в виду, в данном случае, до версии Кафки выпущен Cloudera:

kafka_2.10-0.8.2.0-Кафка-1.3.2

kafka_2.11-0.9.0- kafka-2.0.2

Мы обнаружили проблему при чтении и записи смещений в теме метаданных __consumer_offsets. В частности, мы используем BlockingChannel для подключения к Kafka Broker, и во время вызова метода receive() мы получаем исключение EOFException.

В частности:

java.io.EOFException 
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83) 
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129) 
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120) 

Одной из возможных причин может быть различия между двумя версиями Кафки API.

Кафка 0.8.2

в нашем приложении, мы называем

ConsumerMetadataResponse.readFrom(channel.receive().buffer()) 

приема метод заключается в следующем

def receive(): Receive = { 
    if(!connected) 
     throw new ClosedChannelException() 

    val response = new BoundedByteBufferReceive() 
    response.readCompletely(readChannel) 

    response 
} 

, как мы можем видеть, что это возвращает kafka.network . Получите, что является чертой, которая расширяет черту kafka.network.Transmission. В этом Receive, метод буфера существует и переписан в kafka.network.BoundedByteBufferReceive

def buffer: ByteBuffer = { 
    expectComplete() 
    contentBuffer 
    } 

Кафки 0.9.0

Мы изменили предыдущую строку

GroupCoordinatorResponse.readFrom(channel.receive().payload()) 

приема метод в этой версии API выглядит следующим образом:

def receive(): NetworkReceive = { 
    if(!connected) 
     throw new ClosedChannelException() 

    val response = readCompletely(readChannel) 
    response.payload().rewind() 

    response 
    } 

    private def readCompletely(channel: ReadableByteChannel): NetworkReceive =  { 
    val response = new NetworkReceive 
    while (!response.complete()) 
     response.readFromReadableChannel(channel) 
    response 
    } 

Как мы видим, это возвращает вместо этого kafka.network.NetworkReceive, который является классом, который реализует интерфейс kafka.network.Receive, то теперь написанный на Java и совершенно отличную от предыдущей. Здесь нет никакого метода буфера, но только метод полезной нагрузки, которая возвращает содержимое

    private ByteBuffer buffer; 

Как мы могли бы решить? Заранее спасибо

ответ

0

Kafka 0.9 поддерживает старого потребителя Kafka, чтобы добиться обратной совместимости с брокерами Kafka 0.8.2. Вы используете старого потребителя, который все еще присутствует в Kafka 0.9, чтобы читать сообщения от Kafka 0.9. Вы должны начать использовать новый потребительский API Kafka 0.9 для чтения данных от брокеров Kafka 0.9.

Надеюсь, это поможет.