2017-01-21 5 views
1

У меня есть процессор, который преобразует byte[] полезных нагрузок в MyClass полезных нагрузок:Преобразование типа потока данных Spring Cloud не работает в компоненте процессора?

@Slf4j 
@EnableBinding(Processor.class) 
public class MyDecoder { 

    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
    public MyClass decode(final byte[] payload) { 
     MyClass decoded = doStuff(payload); 
     if (decoded != null) { 
      log.info("Successfully decoded!"); 
     } 

     return decoded; 
    } 
} 

Я пытался создать следующий DSL: some-source | my-decoder | some-sink и some-sink сообщения об ошибках, потому что он не имеет MyClass класс в ClassLoader. Это ожидаемое поведение.

Я пытался применять преобразование типа на my-decoder как: some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink и я получаю следующие ошибки в журнале my-decoder:

2017-01-20 21:45:17.278 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 
2017-01-20 21:45:18.441 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 
2017-01-20 21:45:20.512 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded! 
2017-01-20 21:45:20.515 ERROR 9408 --- [afka-listener-2] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = example.some-source, partition = 0, offset = 1, key = null, value = [[email protected]) 

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na] 
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.5.RELEASE.jar!/:na] 
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.5.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778) [spring-kafka-1.0.4.RELEASE.jar!/:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] 
Caused by: java.lang.IllegalArgumentException: payload must not be null 
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.3.5.RELEASE.jar!/:4.3.5.RELEASE] 
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194) ~[spring-cloud-stream-1.1.0.RELEASE.jar!/:1.1.0.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE] 
    ... 42 common frames omitted 

Я могу видеть, что сообщение было преобразовано из byte[] в MyClass и не является нулевым. Я не понимаю, почему я увидеть сообщение 3 раза, прежде чем он терпит неудачу, потому что Кафка свойство «повторит» является 0, как показано в журнале my-decoder на старте:

2017-01-20 21:44:32.080 INFO 9408 --- [   main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
    compression.type = none 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    metadata.fetch.timeout.ms = 60000 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    bootstrap.servers = [localhost:9092] 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    buffer.memory = 33554432 
    timeout.ms = 30000 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    block.on.buffer.full = false 
    ssl.key.password = null 
    max.block.ms = 60000 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 540000 
    ssl.truststore.password = null 
    max.in.flight.requests.per.connection = 5 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    ssl.protocol = TLS 
    request.timeout.ms = 30000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    acks = 1 
    batch.size = 16384 
    ssl.keystore.location = null 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    retries = 0 
    max.request.size = 1048576 
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    send.buffer.bytes = 131072 
    linger.ms = 0 

Я пытался писать тесты интеграции:

@RunWith(SpringJUnit4ClassRunner.class) 
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) 
@DirtiesContext 
public abstract class MyDecoderTests { 

    @Autowired 
    protected Processor channels; 

    @Autowired 
    protected MessageCollector collector; 

    public static class UsingNothingIntegrationTests extends MyDecoderTest { 

     @Test 
     public void test() throws Exception { 
      channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray()))); 
      assertThat(collector.forChannel(channels.output()), receivesPayloadThat(instanceOf(MyClass.class))); 
     } 
    } 

    @SpringBootTest("spring.cloud.stream.bindings.output.contentType=application/json") 
    public static class UsingOutputConverterIntegrationTests extends MyDecoderTest { 

     @Test 
     public void test() throws Exception { 
      channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray()))); 
      assertThat(collector.forChannel(channels.output()), receivesPayloadThat(is("{\"example\": true\"}"))); 
     } 
    } 

    @Configuration 
    @EnableAutoConfiguration 
    @Import(MyDecoderConfiguration.class) 
    public static class MyDecoderTestApplication { 

    } 
} 

Тестирование выполняется успешно, происходит преобразование.

Тогда я думал, что мой DSL не правильно, так что я написал новый источник для тестирования с:

@Bean 
@InboundChannelAdapter(Source.OUTPUT) 
public MessageSource<MyClass> exampleSource() { 
    return() -> new GenericMessage<>(getMyClassObject()); 
} 

И следующий DSL преобразует MyClass в JSON, как и ожидалось: my-source --spring.cloud.stream.bindings.output.contentType=application/json | some-sink

Почему я получить сообщение об декодировании, зарегистрированное 3 раза, и почему он не работает с сообщением «полезная нагрузка не должно быть пустым»? Это что-то с моим процессором?

ответ

1

Вы видите 3 попытки, потому что это конфигурация повторного запуска по умолчанию для входного канала в связующем.

В этом случае есть ошибка в том, что если конвертер не может преобразовать сообщение, он пытается создать сообщение с полезной нагрузкой null.

Причина, по которой он не может преобразовать полезную нагрузку, состоит в том, что он видит входящий контент-тип (предположительно application/octet-stream) и не может преобразовать его в JSON.

Обходной добавить файл к классам:

META-INF/spring.integration.properties 

и добавить

spring.integration.readOnly.headers=contentType 

к нему.

Это предотвращает распространение заголовка входящего содержимого в исходящее сообщение.

Для этого требуется интеграция весов 4.3.2 или выше.

В будущем выпуске SCSt это будет установлено по умолчанию.

+0

Где я могу увидеть и изменить конфигурацию связующего? Так как производитель kafka сообщил «retries = 0» при запуске? – aturkovic

+0

Другое дело, не могли бы вы указать, что я сделал неправильно в своем тесте? Как получилось, что UseOutputConverterIntegrationTests успешно прошло и преобразовал его в JSON? – aturkovic

+1

Это не имеет ничего общего с kafka, это встроенная функциональность после доставки сообщений, независимо от реализации связующего; см. [Потребительские свойства в весеннем облачном потоке] (http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR1/reference/htmlsingle/#_consumer_properties); в частности 'maxAttempts'. Причина, по которой ваш тест работает, заключается в том, что вы не задали заголовок 'contentType' в сообщении, отправленном вашим тестом. Как я уже сказал, это ошибка; исходящий конвертер использует входящий заголовок contentType; если вы установите этот заголовок в своем тесте, я ожидаю, что вы получите тот же самый отказ. –