2016-12-09 14 views
0

Теперь я использую java для создания redis pub/sub system и возникла проблема. Я покажу вам детали:redis подписчик не может работать с издателем redis

Издатель здесь:

public class RedisMessagePublisher implements MessagePublisher { 

public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic) 
{ 
    this.redisTemplate = redisTemplate; 
    this.topic = topic; 
} 

private StringRedisTemplate redisTemplate; 

private ChannelTopic topic; 

@Override 
public void publish(String message) { 
    redisTemplate.convertAndSend(topic.getTopic(), message); 
    } 
} 

Издатель правильно и может работать правильно.

Тогда давайте перейдем к абонентскому класса:

public class RedisMessageSubscriber implements MessageListener { 

//action inspect here 
private Action2<Message, byte[]> action; 

public void setAction(Action2<Message, byte[]> action) { 
    logger.info("action set"); 
    this.action = action; 
} 

private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class); 

@Override 
public void onMessage(Message message, byte[] bytes) { 

    logger.info("===> redis subscribe message in <==="); 

    if (action != null) 
     action.call(message, bytes); 
    else 
     logger.info("===> action is null <==="); 
    } 
} 

В абонентском классе я использовал RxJava впрыснуть Action, так что я могу использовать его гораздо легче.

Но вопрос здесь, после того, как я опубликовал сообщение от издателя, я могу с, что сообщение может быть передано методу OnMessage, журнал печати не было, что я ожидал:

===> redis subscribe message in <=== 
===> action is null <=== 

Что я ожидал заключается в том, что, когда я опубликовал новое сообщение, абонент получил его и выполнил действие, которое я создал.

Службы Я использовал, чтобы вызвать издатель и подписчик ниже:

@RestController("redispubsubcontroller") 
@RequestMapping(value = "/redis") 
public class redispubsubcontroller { 

@Autowired 
private RedisMessagePublisher redisMessagePublisher; 

@Autowired 
private RedisMessageSubscriber redisMessageSubscriber; 

private static Logger logger = LogManager.getLogger(redispubsubcontroller.class); 

@RequestMapping(value = "/publisher", method = {RequestMethod.GET}) 
public ApiResponse getConfig(String message,HttpServletRequest request, 
              HttpServletResponse response) { 

    redisMessageSubscriber.setAction(new Action2<Message, byte[]>() { 
     @Override 
     public void call(Message message, byte[] bytes) { 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       String result = objectMapper.readValue(message.getBody(), String.class); 
       logger.info("receive:"+result); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    redisMessagePublisher.publish(message); 

    return new ApiResponse("success","message sent"); 
    } 
} 

Из выше коды, вы можете с, что я подписался на тему и установить новое действие для абонента:

redisMessageSubscriber.setAction(new Action2<Message, byte[]>() { 
    @Override 
    public void call(Message message, byte[] bytes) { 
     ObjectMapper objectMapper = new ObjectMapper(); 
     try { 
      String result = objectMapper.readValue(message.getBody(), String.class); 
      logger.info("receive:"+result); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
}); 

Но я не знаю, почему, после запуска издателя, абонент может получить сообщение, но удерживать NULL Действие, действие, которое я создал, не перешло к нему.

Любой может помочь? Есть ли какие-либо проблемы с этим механизмом?

==== EDIT =====

код RedisMessageConfig ниже:

@Configuration 
public class RedisMessageConfig { 

@Bean 
ChannelTopic topic() { 
    return new ChannelTopic("useraddresspubsub:queue"); 
} 

@Bean 
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber()); 
} 

@Autowired 
private RedisConnectionFactory JedisConnectionFactory; 

@Bean 
RedisMessageListenerContainer redisContainer() { 
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(JedisConnectionFactory); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
    } 
} 

==== ==== решаемые

Наконец я получил это решена за мп-идею, слегка изменил myredismessagesподписчик на myredismessageconfig, потому что поток от redismessageconfig до redismessagesubscriber, поэтому в redismessageconfig мне нужно сначала ввести действие t o это, то redismessageconfig создаст новый redismessagesubscriber и проведет новое созданное действие. Код ниже:

@Component 
public class MyRedisMessageConfig extends RedisMessageConfig { 

private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class); 

public MyRedisMessageConfig() { 
    super.action = new Action2<Message, byte[]>() { 
     @Override 
     public void call(Message message, byte[] bytes) { 
      String result = new String(message.getBody()); 
       logger.info("received:" + result); 
      } 
     }; 
    } 
} 

Скриншот ниже: enter image description here

ответ

1

Это не путь MessageListener предназначен для работы. Кроме того, вы создаете общее изменчивое состояние. Два одновременных вызова изменяют одновременно состояние RedisMessageSubscriber.

Я предполагаю, что вы сталкиваетесь с проблемами видимости, когда вы устанавливаете action в одном потоке, а получение сообщений происходит в другом потоке.

Если вам требуется разное поведение на MessageListener, то создайте несколько слушателей, которые реализуют это поведение.

+0

Значит, вы имеете в виду, что многопоточная проблема, вызвавшая эту проблему? – CharlieShi

+0

Нет. Несколько потоков делают его просто видимым для вас. Проблема вызвана совместным изменчивым состоянием. – mp911de

+0

Хорошо, не могли бы вы поделиться с нами некоторыми убедительными идеями, чтобы повлиять на этот сценарий? – CharlieShi