2

Я пытаюсь подключиться к Redis для публикации-Subscribe с помощью Typesafe Redis Play plugin.Блокировка плагина Redis в публикации/Подписка

У меня есть следующий сценарий тест, состоящий из актера, который генерирует сообщения каждый второй:

// Initialization happens in Application.scala, 
    private lazy val fakeStreamActor = Akka.system.actorOf(Props[FakeStreamActor]) 

    val actorPut = Akka.system.scheduler.schedule(
    Duration(1000, MILLISECONDS), 
    Duration(1000, MILLISECONDS), 
    fakeStreamActor, 
    Put("This is a sample message")) 

Источник актер:

class FakeStreamActor extends Actor { 
    implicit val timeout = Timeout(1, SECONDS) 

    val CHANNEL = "channel1" 
    val plugin = Play.application.plugin(classOf[RedisPlugin]).get 
    val listener = new MyListener() 

    val pool = plugin.sedisPool 

    pool.withJedisClient{ client => 
    client.subscribe(listener, CHANNEL) 
    } 

    def receive = { 

    case Put(msg: String) => { 
     //send data to Redis 
     Logger.info("Push %s".format(msg)) 
     pool.withJedisClient { client => 
     client.publish(CHANNEL, msg) 
     } 

    } 
    } 
} 

/** Messages */ 
case class Put(msg: String) 

И подписываться слушатель:

case class MyListener() extends JedisPubSub { 
    def onMessage(channel: String, message: String): Unit = { 
    Logger.info("onMessage[%s, %s]".format(channel, message)) 
    } 

    def onSubscribe(channel: String, subscribedChannels: Int): Unit = { 
    Logger.info("onSubscribe[%s, %d]".format(channel, subscribedChannels)) 
    } 

    def onUnsubscribe(channel: String, subscribedChannels: Int): Unit = { 
    Logger.info("onUnsubscribe[%s, %d]".format(channel, subscribedChannels)) 
    } 

    def onPSubscribe(pattern: String, subscribedChannels: Int): Unit = { 
    Logger.info("onPSubscribe[%s, %d]".format(pattern, subscribedChannels)) 
    } 

    def onPUnsubscribe(pattern: String, subscribedChannels: Int): Unit = { 
    Logger.info("onPUnsubscribe[%s, %d]".format(pattern, subscribedChannels)) 
    } 

    def onPMessage(pattern: String, channel: String, message: String): Unit = { 
    Logger.info("onPMessage[%s, %s, %s]".format(pattern, channel, message)) 
    } 
} 

Теперь, в идеале, я должен подписываться на определенный канал и видеть в журналах, как Слушатель обрабатывает принятые сообщения каждую секунду. Но этого не происходит, поскольку акт подписки блокирует поток.

Мой вопрос:

Существует ли способ воспользоваться Play асинхронной природы, чтобы иметь нелипкая подписку?

ответ

4

Yup. Вот как я это делаю в Global.scala:

Akka.future { 
    val j = new RedisPlugin(app).jedisPool.getResource 
    j.subscribe(PubSub, "*") 
} 

У меня была проблема создания экземпляра плагина, но вы по существу положить немного withJedisClient внутри будущего блока.

Спасибо, что показали мне, как создать экземпляр плагина в scala!

+0

нет, спасибо вам за то, что вы показали мне будущую часть, я совсем забыл об этом: D –

 Смежные вопросы

  • Нет связанных вопросов^_^