Активные актеры Akka могут отправлять сообщения другим актерам Akka, работающим на удаленной JVM. Итак ... когда игроку-отправителю нужно знать адрес предполагаемого приемника-актера.
AkkaUtil (Bahir) позволяет создавать искровой поток из сообщений, получаемых ReceiverActor
. Но откуда будет получать сообщения? Ну ... какой-то удаленный актер. И для отправки сообщений этому удаленному актеру понадобится адрес вашего ReceiverActor
, который работает в вашем искрообразовании.
В общем, вы не можете быть уверены в том, что ip-устройство будет работать с вашим искровым приложением. Итак, мы сделаем так, чтобы актер, работающий с искровым светом, расскажет продюсерскому актеру свою ссылку и попросит его отправить свои вещи.
Просто убедитесь, что оба приложения написаны с использованием той же версии Scala и работают с той же JRE.
Теперь ... давайте сначала написать актера, который будет источником данных,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class MyActor extends Actor with ActorLogging {
val theListOfMyStrings = List("one", "two", "three")
override def receive: Receive = {
case SendMeYourStringsRequest(requesterRef) => {
theListOfMyStrings.foreach(s => {
requesterRef ! RequestedString(s)
})
}
}
}
object MyApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="my-ip-address"
| port=18000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("my-actor-system", config)
var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")
}
Теперь ... позволяет написать простую свечу приложение,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class YourStringRequesterActor extends ActorReceiver {
def receive = {
case RequestedString(s) => store(s)
}
override def preStart(): Unit = {
val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor")
val myActorSelection = context.actorSelection(myActorPath)
myActorSelection ! SendMeYourStringsRequest(self)
}
}
object YourSparkApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ActorWordCount")
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stringStream = AkkaUtils.createStream[String](
ssc,
Props(classOf[YourStringRequesterActor]),
"your-string-requester-actor"
)
stringStream.foreach(println)
}
}
Примечание :: Просто уход за my-ip-address
. Если есть другие проблемы, пожалуйста, дайте мне знать в комментариях.
@SaSarvesh Kumar Singh Рад, что я нашел это. Я пытался исправить ту же проблему. Здесь я попробовал ваш код. сталкиваясь с проблемой при использовании 'actorScelection'https: //stackoverflow.com/questions/46724732/actorsystem-actorselection-is-not-working-for-remote-actors-where-actorof-is-w-wor, пожалуйста, проверьте эту проблему? – Mahesh