0

У меня есть приложение Spring Boot (app0), которое использует Spring Cloud Stream Kafka для чтения из темы.Spring Cloud Stream Kafka - EmbeddedHeadersMessageConverter - java.lang.StringIndexOutOfBoundsException: индекс строки из диапазона в

Есть два других приложений (app1, app2), которые производят сообщения в эту тему.

app1 публикует сообщения, используя интерфейс OrderSource:

public interface OrderSource{ 

    String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS; 

    @Output(OrderSource.OUTPUT_PAYMENT) 
    MessageChannel output(); 

Например:

orderSource.output().send(MessageBuilder.withPayload(order).build(), 500); 

В этом случае app0 считывает сообщения из App1 без каких-либо проблем.

app2 публикует свои сообщения, используя KafkaTemplate:

ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }"); 
try { 
    SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS); 

В этом случае я наблюдаю следующее исключение из EmbeddedHeadersMessageConverter:

java.lang.StringIndexOutOfBoundsException: String index out of range: 152 
    at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91] 
    at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91] 
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE] 

Видимо он пытается извлечь заголовки из полезной нагрузки сообщение. Как я могу предотвратить это исключение, поддерживая оба источника сообщений (KafkaTemplate и OrderSource).

ответ

1

Для связи с приложениями, отличными от Spring-Cloud-Stream, вам необходимо настроить headerMode на потребителя до raw.

Вам также необходимо будет сделать то же самое на продюсере для app1, чтобы он не вставлял заголовки.

См. consumer properties и producer properties.

+0

Спасибо Гэри. Несколько вопросов: каковы предпосылки не встраивания заголовков? Будут ли они еще отправлены? – codependent

+0

У Kafka нет концепции заголовков; поэтому мы должны внедрить их в полезную нагрузку. Единственным заголовком, переданным (по умолчанию), является тип содержимого; это необходимо только в том случае, если вы полагаетесь на связующее, чтобы выполнить любое сложное преобразование сообщений в приложении-потребителе. Если вам это нужно или настроили связующее (app1) для вставки любых пользовательских заголовков, вам не повезло. Но вокруг этого будет использовать встроенный конвертер в вашем приложении2 перед отправкой. Или используйте разные темы для app1/app2 и соответственно настройте app0. –

+0

Теперь все ясно, спасибо! Не могли бы вы указать на какие-либо ссылки для этого: «Обход должен был бы использовать встроенный конвертер в вашем приложении2 перед отправкой». – codependent