SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
Выполняя этот образец в искры 2.1.0, я получил ошибку. Без опции .cache
он работал, как предполагалось, но с .cache
вариант я получил:Почему использование кеша при потоковой передаче данных не выполняется с помощью «AnalysisException: запросы с потоковыми источниками должны выполняться с помощью writeStream.start()»?
Исключение в потоке «основные» org.apache.spark.sql.AnalysisException: Запросы с источниками потоковой передачи должны быть выполнены с writeStream.start () ;; FileSource [SRC/тест/данные] на org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ апаша $ искровым $ $ SQL катализатора $ анализа $ UnsupportedOperationChecker $$ throwError (UnsupportedOperationChecker.scala: 196) at org.apache.spark.sql.catalyst.analysis.UsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala: 35) at org.apache.spark.sql.catalyst.analysis.UsupportedOperationChecker $$ anonfun $ checkForBatch $ 1 .apply (UnsupportedOperationChecker.scala: 33) по адресу org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala: 128) at org.apache.spark.sql.catalyst.analysis.UsupportedOperationChecker $. checkForBatch (UnsupportedOperationChecker.scala: 33) по адресу org.apache.spark.sql.execution.QueryExecution.assertSupported (QueryExecut ion.scala: 58) at org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute (QueryExecution.scala: 69) at org.apache.spark.sql.execution.QueryExecution.withCachedData (QueryExecution.scala: 67) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (QueryExecution.scala: 73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 73) at org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution.scala: 79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 75) at org.apache. spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 84) at org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryEx ecution.scala: 84) at org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply (CacheManager.scala: 102) at org.apache.spark.sql.execution.CacheManager.writeLock (CacheManager.scala: 65) at org.apache.spark.sql.execution.CacheManager.cacheQuery (CacheManager.scala: 89) at org.apache.spark.sql.Dataset.persist (Dataset.scala: 2479) at org.apache.spark.sql.Dataset.cache (Dataset.scala: 2489) at org.me.App $ .main (App.scala: 23) at org.me.App.main (App.scala)
Есть идеи?
Извините, но я не думаю, что просто не использовать кеш является решением. –
Мартин, не стесняйтесь участвовать в комментариях к [SPARK-20927] (https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels % 3Acomment-tabpanel # comment-16334363) о необходимости кэширования при потоковых вычислениях – mathieu