2015-12-12 1 views
2

Я хотел бы знать, как я могу заставить RedisQueueMessageDrivenEndpoint работать с IntegrationFlow, поэтому я могу получать сообщения, удаленные из списка, который указан в следующем коде? «RedisRpopChannel()», похоже, вообще не получает сообщений. Пожалуйста помоги.Как сделать RedisQueueMessageDrivenEndpoint работать с IntegrationFlow?

@Bean 
public RedisOutboundGateway redisOutboundGateway(RedisConnectionFactory connectionFactory) { 
    RedisOutboundGateway gateway = new RedisOutboundGateway(connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    gateway.setArgumentsSerializer(serializer); 
    return gateway; 
} 

@Bean 
public IntegrationFlow redisLpushRequestFlow(RedisOutboundGateway gateway, BeanFactory beanFactory) { 
    ExpressionArgumentsStrategy strategy = new ExpressionArgumentsStrategy(new String[]{"headers.queue", "#cmd == 'LPUSH' ? payload : null"}, true); 
    strategy.setBeanFactory(beanFactory); 
    gateway.setArgumentsStrategy(strategy); 
    return flow -> flow.publishSubscribeChannel(s->s.subscribe(f -> f 
      .enrich(e -> e.<ObjectNode>requestPayload(m -> { 
       String partition = m.getHeaders().get("correlationId").toString(); 
       ObjectNode objectNode = m.getPayload(); 
       objectNode.put(PayLoadKeys.PARTITION, partition); 
       objectNode.put(PayLoadKeys.SEQ, m.getHeaders().get("sequenceNumber").toString()); 
       return objectNode; 
      }).shouldClonePayload(false) 
        .header(RedisHeaders.COMMAND, "LPUSH").header("queue", files)) 
      .handle(gateway).channel("redisLpushResponseFlow.input"))); 
} 

@Bean 
public IntegrationFlow redisLpushResponseFlow() { 
    return flow -> flow.resequence().aggregate().<List<Long>>handle((p,h)-> { 
       ObjectNode objectNode = mapper.createObjectNode(); 
       objectNode.put(PayLoadKeys.PARTITION, h.get("correlationId").toString()); 
       if(h.get("mode").equals("debug")) { 
        objectNode.set(PayLoadKeys.DEBUG, 
          mapper.valueToTree(p.stream().collect(Collectors.toList()))); 
       } 
       return objectNode; 
      }).channel(httpInboundReplyChannel()); 
@Bean 
public MessageChannel redisRpopChannel() { 
    return MessageChannels.queue().get(); 
} 

@Bean(name = PollerMetadata.DEFAULT_POLLER) 
public PollerMetadata poller() { 
    return Pollers.fixedRate(500).get(); 
} 

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory, BeanFactory beanFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(files, connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setBeanFactory(beanFactory); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(redisRpopChannel()); 
    endpoint.afterPropertiesSet(); 
    endpoint.start(); 
    return endpoint; 
} 

@Bean 
public IntegrationFlow redisQueuePollingFlow() { 

    class ThrottledTaskExecutor implements TaskExecutor { 
     final Semaphore semaphore; 
     final TaskExecutor taskExecutor; 

     ThrottledTaskExecutor(ThreadPoolTaskExecutor taskExecutor) { 
      this.taskExecutor = taskExecutor; 
      this.semaphore = new Semaphore(taskExecutor.getCorePoolSize()); 
     } 

     @Override 
     public void execute(Runnable task) { 
      if (task == null) { 
       throw new NullPointerException("Task is null in ThrottledTaskExecutor."); 
      } 
      doSubmit(task); 
     } 

     void doSubmit(final Runnable task) { 
      try { 
       semaphore.acquire(); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new TaskRejectedException("Task could not be submitted because of a thread interruption."); 
      } 
      try { 
       taskExecutor.execute(new FutureTask<Void>(task, null) { 

        @Override 
        protected void done() { 
         semaphore.release(); 
        } 
       }); 
      } catch (TaskRejectedException e) { 
       semaphore.release(); 
       throw e; 
      } 
     } 
    } 

    return IntegrationFlows 
      .from(redisRpopChannel()) 
      .transform(Transformers.fromJson(ObjectNode.class)) 
      .handle(message -> { 
       ObjectNode p = (ObjectNode) message.getPayload(); 
       ThreadPoolTaskExecutor taskExecutor = taskExecutor(); 
       ThrottledTaskExecutor throttledTaskExecutor = new ThrottledTaskExecutor(taskExecutor); 
       if(p.hasNonNull(PayLoadKeys.ID_ARRAY)) { 
        String array = p.remove(PayLoadKeys.ID_ARRAY).asText(); 
        if (p.hasNonNull(array)) { 
         p.remove(array).forEach(id -> { 
          ObjectNode param = p.deepCopy(); 
          final Long finalId = id.asLong(); 
          param.put("id", finalId); 
          throttledTaskExecutor.execute(new JobLaunchTask(param)); 
         }); 
        } 
       } else { 
        throttledTaskExecutor.execute(new JobLaunchTask(p)); 
       } 
       taskExecutor.shutdown(); 
      }).get(); 
} 

ответ

1

Существует в настоящее время a problem при использовании управляемых сообщений конечных точек (которые определяются как @Bean с) в DSL.

Проблема заключается в том, что выходной канал требуется во время инициализации. Однако, когда конечная точка позже подключается к потоку, этот канал будет заменен.

Вы не должны называть методы, подобные afterPropertiesSet(), и start() в определении @Bean.

Это работало для меня ...

@Bean 
public RedisConnectionFactory connectionFactory() { 
    JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); 
    jedisConnectionFactory.setPort(6379); 
    return jedisConnectionFactory; 
} 

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(new DirectChannel()); // will be replaced 
    return endpoint; 
} 

@Bean 
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) { 
    return IntegrationFlows.from(redisQueueMessageDrivenEndpoint(connectionFactory)) 
      .handle(System.out::println) 
      .get(); 
} 

Я тестировал его с > lpush foo '{"foo":"bar"}' в Redis-Cli.

EDIT

Однако ваша техника тоже работает (для меня) ...

@Bean 
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) { 
    RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory); 
    Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class); 
    endpoint.setSerializer(serializer); 
    endpoint.setAutoStartup(true); 
    endpoint.setOutputChannel(rpopChannel()); 
    return endpoint; 
} 

@Bean 
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) { 
    return IntegrationFlows.from(rpopChannel()) 
      .handle(System.out::println) 
      .get(); 
} 

@Bean 
public MessageChannel rpopChannel() { 
    return new DirectChannel(); 
} 

Опять же, я удалил все управляемые контейнером свойства от конечной точки; Весна устанавливает все это.

+0

Благодарим за отзыв. Я очень это оценил. Я действительно восхищаюсь твоей работой. Интересно, почему нет адаптеров протокола, которые поддерживают итерации с помощью redis и добавляются в группу, поэтому мы все могли бы писать как IntegrationFlows.from (Redis.inboundGateway (connectionFactory, queue))? – hanishi

+0

У нас ограниченные ресурсы. Мы просим сообщество помочь нам расставить приоритеты по добавлению поддержки первого класса для Redis и т. Д. В DSL через [JIRA] (https://jira.spring.io/browse/INTEXT). Не стесняйтесь открывать проблему улучшения там, и мы увидим, что мы можем сделать. Мы также приветствуем [вклады] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md). –