2017-01-28 17 views
1

Я пытаюсь настроить простой процесс с помощью Spark Streaming, используя Apache Bahir для подключения к Akka. Я попытался следовать their example вместе с этим older one. У меня есть простой форвардер актерApache Bahir, отправить материал ActorReceiver

class ForwarderActor extends ActorReceiver { 
    def receive = { 
    case data: MyData => store(data) 
    } 
} 

и создать поток с

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName) 

конфигурация выглядит следующим образом:

akka { 
    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 
    } 
    remote { 
    enabled-transports = ["akka.remote.netty.tcp"] 
    netty.tcp { 
     hostname = "localhost" 
     port = 7777 
    } 
    } 
} 

и моя проблема: как я могу отправлять сообщения Экспедитор? Возможно, я не понимаю, как Akka Remote используется в этом случае. При запуске приложения, я вижу бревно

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:7777] 

и позже я вижу

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:52369] 

который, кажется, напомнить описанию в ScalaDoc:

/** 
    * A default ActorSystem creator. It will use a unique system name 
    * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote 
    * communication. 
    */ 

В целом я я не уверен, как я должен отправлять сообщения Экспедитору. Спасибо за любую помощь!

ответ

0

Активные актеры Akka могут отправлять сообщения другим актерам Akka, работающим на удаленной JVM. Итак ... когда игроку-отправителю нужно знать адрес предполагаемого приемника-актера.

AkkaUtil (Bahir) позволяет создавать искровой поток из сообщений, получаемых ReceiverActor. Но откуда будет получать сообщения? Ну ... какой-то удаленный актер. И для отправки сообщений этому удаленному актеру понадобится адрес вашего ReceiverActor, который работает в вашем искрообразовании.

В общем, вы не можете быть уверены в том, что ip-устройство будет работать с вашим искровым приложением. Итак, мы сделаем так, чтобы актер, работающий с искровым светом, расскажет продюсерскому актеру свою ссылку и попросит его отправить свои вещи.

Просто убедитесь, что оба приложения написаны с использованием той же версии Scala и работают с той же JRE.

Теперь ... давайте сначала написать актера, который будет источником данных,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 

case class SendMeYourStringsRequest(requesterRef: ActorRef) 
case class RequestedString(s: String) 

class MyActor extends Actor with ActorLogging { 

    val theListOfMyStrings = List("one", "two", "three") 

    override def receive: Receive = { 
    case SendMeYourStringsRequest(requesterRef) => { 
     theListOfMyStrings.foreach(s => { 
     requesterRef ! RequestedString(s) 
     }) 
    } 
    } 
} 

object MyApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="my-ip-address" 
     |  port=18000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("my-actor-system", config) 

    var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor") 

} 

Теперь ... позволяет написать простую свечу приложение,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} 

case class SendMeYourStringsRequest(requesterRef: ActorRef) 
case class RequestedString(s: String) 

class YourStringRequesterActor extends ActorReceiver { 
    def receive = { 
    case RequestedString(s) => store(s) 
    } 

    override def preStart(): Unit = { 
    val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor") 
    val myActorSelection = context.actorSelection(myActorPath) 

    myActorSelection ! SendMeYourStringsRequest(self) 
    } 
} 

object YourSparkApp { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("ActorWordCount") 

    if (!sparkConf.contains("spark.master")) { 
     sparkConf.setMaster("local[2]") 
    } 

    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    val stringStream = AkkaUtils.createStream[String](
     ssc, 
     Props(classOf[YourStringRequesterActor]), 
     "your-string-requester-actor" 
    ) 

    stringStream.foreach(println) 

    } 
} 

Примечание :: Просто уход за my-ip-address. Если есть другие проблемы, пожалуйста, дайте мне знать в комментариях.

+0

@SaSarvesh Kumar Singh Рад, что я нашел это. Я пытался исправить ту же проблему. Здесь я попробовал ваш код. сталкиваясь с проблемой при использовании 'actorScelection'https: //stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-w-wor, пожалуйста, проверьте эту проблему? – Mahesh