10
SparkSession 
    .builder 
    .master("local[*]") 
    .config("spark.sql.warehouse.dir", "C:/tmp/spark") 
    .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint") 
    .appName("my-test") 
    .getOrCreate 
    .readStream 
    .schema(schema) 
    .json("src/test/data") 
    .cache 
    .writeStream 
    .start 
    .awaitTermination 

Выполняя этот образец в искры 2.1.0, я получил ошибку. Без опции .cache он работал, как предполагалось, но с .cache вариант я получил:Почему использование кеша при потоковой передаче данных не выполняется с помощью «AnalysisException: запросы с потоковыми источниками должны выполняться с помощью writeStream.start()»?

Исключение в потоке «основные» org.apache.spark.sql.AnalysisException: Запросы с источниками потоковой передачи должны быть выполнены с writeStream.start () ;; FileSource [SRC/тест/данные] на org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ апаша $ искровым $ $ SQL катализатора $ анализа $ UnsupportedOperationChecker $$ throwError (UnsupportedOperationChecker.scala: 196) at org.apache.spark.sql.catalyst.analysis.UsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala: 35) at org.apache.spark.sql.catalyst.analysis.UsupportedOperationChecker $$ anonfun $ checkForBatch $ 1 .apply (UnsupportedOperationChecker.scala: 33) по адресу org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala: 128) at org.apache.spark.sql.catalyst.analysis.UsupportedOperationChecker $. checkForBatch (UnsupportedOperationChecker.scala: 33) по адресу org.apache.spark.sql.execution.QueryExecution.assertSupported (QueryExecut ion.scala: 58) at org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute (QueryExecution.scala: 69) at org.apache.spark.sql.execution.QueryExecution.withCachedData (QueryExecution.scala: 67) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (QueryExecution.scala: 73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 73) at org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution.scala: 79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 75) at org.apache. spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 84) at org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryEx ecution.scala: 84) at org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply (CacheManager.scala: 102) at org.apache.spark.sql.execution.CacheManager.writeLock (CacheManager.scala: 65) at org.apache.spark.sql.execution.CacheManager.cacheQuery (CacheManager.scala: 89) at org.apache.spark.sql.Dataset.persist (Dataset.scala: 2479) at org.apache.spark.sql.Dataset.cache (Dataset.scala: 2489) at org.me.App $ .main (App.scala: 23) at org.me.App.main (App.scala)

Есть идеи?

+1

Извините, но я не думаю, что просто не использовать кеш является решением. –

+1

Мартин, не стесняйтесь участвовать в комментариях к [SPARK-20927] (https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels % 3Acomment-tabpanel # comment-16334363) о необходимости кэширования при потоковых вычислениях – mathieu

ответ

10

Ваш (очень интересно) дело сводится к следующей строке (которые можно выполнить в spark-shell):

scala> :type spark 
org.apache.spark.sql.SparkSession 

scala> spark.readStream.text("files").cache 
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
FileSource[files] 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104) 
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68) 
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92) 
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603) 
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613) 
    ... 48 elided 

Причина этого оказалось достаточно просто, чтобы объяснить (не каламбур Спарк в SQL explain предназначен).

spark.readStream.text("files") создает так называемый потоковый набор .

scala> val files = spark.readStream.text("files") 
files: org.apache.spark.sql.DataFrame = [value: string] 

scala> files.isStreaming 
res2: Boolean = true 

Streaming Datasets являются основой Спарк SQL-х Structured Streaming.

Как вы можете прочитать в Структурированная Streaming-х Quick Example:

А затем начать вычисление потокового с помощью start().

Цитируя scaladoc из DataStreamWriter-х start:

старта(): StreamingQuery начинает выполнение потокового запроса, который будет постоянно выходные результаты данного пути при поступлении новых данных.

Таким образом, вы должны использовать start (или foreach), чтобы начать выполнение потоковой передачи запроса. Вы уже это знали.

Но ... есть Unsupported Operations в Structured Streaming:

Кроме того, есть некоторые методы набора данных, которые не будут работать на потоковых данных. Это действия, которые будут немедленно запускать запросы и возвращать результаты, что не имеет смысла в потоковом наборе данных.

Если вы попытаетесь выполнить любую из этих операций, вы увидите исключение AnalysisException, такое как «операция XYZ не поддерживается потоковыми DataFrames/Datasets».

Это выглядит знакомым, не так ли?

cache является не в списке неподдерживаемых операций, но это потому, что он просто-напросто забывают (я докладывал SPARK-20927, чтобы исправить это).

cache должно было быть в списке, так как он выполняет запрос, прежде чем запрос будет зарегистрирован в CacheManager Spark SQL.

Давайте углубиться в глубь Спарка SQL ... задержать дыхания ...

iscachepersist в то время как persistrequests the current CacheManager to cache the query:

sparkSession.sharedState.cacheManager.cacheQuery(this) 

Хотя кэширование запросов CacheManagerделаютexecute it:

sparkSession.sessionState.executePlan(planToCache).executedPlan 

, которого мы знаем не допускается, так как это start (или foreach).

Проблема решена!

+1

Я думал, что это ошибка, поэтому я сообщил об этом даже раньше https://issues.apache.org/jira/browse/SPARK-20865, i просто нужно было подтвердить мои жесты. Благодарю. –

+0

Ссылка на мастер не очень актуальна, потому что целевой код может измениться. И я думаю, что это то, что добавляется в ваших ссылках. – crak

+0

@crak Правильно. Я не должен был использовать мастер для ссылок. Как вы думаете, что было бы лучше? Видели ссылки на конкретные версии в прошлом, но не могут понять, как это сделать на github сегодня. Не возражаете, чтобы помочь? Я ценю. –