На каждом подчиненном узле через марафон мы запускаем Mesos External Shuffle Service. Когда мы отправляем искр через dcos CLI в грубом зернистом режиме без динамического распределения, все работает как ожидалось. Но когда мы отправляем ту же работу с динамическим распределением, она терпит неудачу.Как запустить искру + кассандра + мезо (dcos) с динамическим распределением ресурсов?
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file:/tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)
Полное описание:
- Мы установили Mesos (ИДК) с Marathon использованием Azure Portal.
- Пакеты Via Universe, которые мы установили: Cassandra, Spark и Marathon-lb
- Мы сгенерировали тестовые данные в Кассандре.
- На ноутбуке я установил DCOS CLI
Когда я отправить задание, как показано ниже, все работает, как ожидалось:
./dcos spark run --submit-args="--properties-file coarse-grained.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
Run job succeeded. Submission id: driver-20161208185927-0043
cqlsh:sp> select count(*) from product_model_per_alerts_by_date ;
count
-------
476
крупнозернистого grained.conf:
spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.executor.instances 2
spark.submit.deployMode cluster
spark.cores.max 4
portal.spark.cassandra.app.ProductModelPerNrOfAlerts:
package portal.spark.cassandra.app
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
object ProductModelPerNrOfAlerts {
def main(args: Array[String]): Unit = {
val conf = new SparkConf(true)
.setAppName("cassandraSpark-ProductModelPerNrOfAlerts")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "asset_history", "keyspace" -> "sp"))
.load()
.select("datestamp","product_model","nr_of_alerts")
val dr = df
.groupBy("datestamp","product_model")
.avg("nr_of_alerts")
.toDF("datestamp","product_model","nr_of_alerts")
dr.write
.mode(SaveMode.Overwrite)
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "product_model_per_alerts_by_date", "keyspace" -> "sp"))
.save()
sc.stop()
}
}
Динамическое распределение
Через марафоне мы бежим Mesos Внешняя Перемешать служба:
{
"id": "spark-mesos-external-shuffle-service-tt",
"container": {
"type": "DOCKER",
"docker": {
"image": "jpavt/mesos-spark-hadoop:mesos-external-shuffle-service-1.0.4-2.0.1",
"network": "BRIDGE",
"portMappings": [
{ "hostPort": 7337, "containerPort": 7337, "servicePort": 7337 }
],
"forcePullImage":true,
"volumes": [
{
"containerPath": "/tmp",
"hostPath": "/tmp",
"mode": "RW"
}
]
}
},
"instances": 9,
"cpus": 0.2,
"mem": 512,
"constraints": [["hostname", "UNIQUE"]]
}
Dockerfile для jpavt/Mesos-искровой Hadoop: Mesos-внешняя перетасовка-сервис -1.0.4-2.0.1:
FROM mesosphere/spark:1.0.4-2.0.1
WORKDIR /opt/spark/dist
ENTRYPOINT ["./bin/spark-class", "org.apache.spark.deploy.mesos.MesosExternalShuffleService"]
Теперь, когда я представляю работу с динамическим распределением он не:
./dcos spark run --submit-args="--properties-file dynamic-allocation.conf --class portal.spark.cassandra.app.ProductModelPerNrOfAlerts http://marathon-lb-default.marathon.mesos:10018/jars/spark-cassandra-assembly-1.0.jar"
Run job succeeded. Submission id: driver-20161208191958-0047
select count(*) from product_model_per_alerts_by_date ;
count
-------
5
динамическое перераспределение.конф:
spark.cassandra.connection.host 10.32.0.17
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.cores 1
spark.executor.memory 1g
spark.submit.deployMode cluster
spark.cores.max 4
spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 5
spark.dynamicAllocation.cachedExecutorIdleTimeout 120s
spark.dynamicAllocation.schedulerBacklogTimeout 10s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 20s
spark.mesos.executor.docker.volumes /tmp:/tmp:rw
spark.local.dir /tmp
журналы из Mesos:
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.0 KB, free 366.0 MB)
16/12/08 19:20:42 INFO TorrentBroadcast: Reading broadcast variable 7 took 21 ms
16/12/08 19:20:42 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 38.6 KB, free 366.0 MB)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:45422)
16/12/08 19:20:42 INFO MapOutputTrackerWorker: Got the output locations
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 58 blocks
16/12/08 19:20:42 INFO TransportClientFactory: Successfully created connection to /10.32.0.11:7337 after 2 ms (0 ms spent in bootstraps)
16/12/08 19:20:42 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 13 ms
16/12/08 19:20:42 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
...
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-d4df5df4-24c9-41a3-9f26-4c1aba096814/30/shuffle_0_0_0.index (No such file or directory)
бревна от марафонских свечей Mesos-внешний перетасовка-сервис-TT:
...
16/12/08 19:20:29 INFO MesosExternalShuffleBlockHandler: Received registration request from app 704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047 (remote address /10.32.0.4:49710, heartbeat timeout 120000 ms).
16/12/08 19:20:31 INFO ExternalShuffleBlockResolver: Registered executor AppExecId{appId=704aec43-1aa3-4971-bb98-e892beeb2c45-0008-driver-20161208191958-0047, execId=2} with ExecutorShuffleInfo{localDirs=[/tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2], subDirsPerLocalDir=64, shuffleManager=org.apache.spark.shuffle.sort.SortShuffleManager}
16/12/08 19:20:38 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8157825166903585542
java.lang.RuntimeException: Failed to open file: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:234)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-14525ef0-22e9-49fb-8e81-dc84e5fba8b2/16/shuffle_0_55_0.index (No such file or directory)
...
но файл существует на данном подчиненном поле: