Теперь я использую java для создания redis pub/sub system и возникла проблема. Я покажу вам детали:redis подписчик не может работать с издателем redis
Издатель здесь:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
Издатель правильно и может работать правильно.
Тогда давайте перейдем к абонентскому класса:
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
В абонентском классе я использовал RxJava впрыснуть Action, так что я могу использовать его гораздо легче.
Но вопрос здесь, после того, как я опубликовал сообщение от издателя, я могу с, что сообщение может быть передано методу OnMessage, журнал печати не было, что я ожидал:
===> redis subscribe message in <===
===> action is null <===
Что я ожидал заключается в том, что, когда я опубликовал новое сообщение, абонент получил его и выполнил действие, которое я создал.
Службы Я использовал, чтобы вызвать издатель и подписчик ниже:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
Из выше коды, вы можете с, что я подписался на тему и установить новое действие для абонента:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
Но я не знаю, почему, после запуска издателя, абонент может получить сообщение, но удерживать NULL Действие, действие, которое я создал, не перешло к нему.
Любой может помочь? Есть ли какие-либо проблемы с этим механизмом?
==== EDIT =====
код RedisMessageConfig ниже:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
==== ==== решаемые
Наконец я получил это решена за мп-идею, слегка изменил myredismessagesподписчик на myredismessageconfig, потому что поток от redismessageconfig до redismessagesubscriber, поэтому в redismessageconfig мне нужно сначала ввести действие t o это, то redismessageconfig создаст новый redismessagesubscriber и проведет новое созданное действие. Код ниже:
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
Значит, вы имеете в виду, что многопоточная проблема, вызвавшая эту проблему? – CharlieShi
Нет. Несколько потоков делают его просто видимым для вас. Проблема вызвана совместным изменчивым состоянием. – mp911de
Хорошо, не могли бы вы поделиться с нами некоторыми убедительными идеями, чтобы повлиять на этот сценарий? – CharlieShi