Ответ 1
Ваш (очень интересный) случай сводится к следующей строке (которую вы можете выполнить в spark-shell
):
scala> :type spark
org.apache.spark.sql.SparkSession
scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
... 48 elided
Причиной этого оказалось довольно простое объяснение (без каламбура Spark SQL explain
).
spark.readStream.text("files")
создает так называемый потоковый набор данных.
scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]
scala> files.isStreaming
res2: Boolean = true
Потоковые наборы данных являются основой Spark SQL Structured Streaming.
Как вы могли прочитать в Structured Streaming Быстрый пример:
И затем начните вычисление потоковой передачи с помощью
start()
.
Цитирование скалядока DataStreamWriter start:
start(): StreamingQuery Запускает выполнение потокового запроса, который будет постоянно выводить результаты на указанный путь по мере поступления новых данных.
Итак, вы должны использовать start
(или foreach
) для запуска выполнения потокового запроса. Вы уже это знали.
Но... есть Неподдерживаемые операции в Structured Streaming:
Кроме того, существуют некоторые методы набора данных, которые не будут работать с потоковыми наборами данных. Это действия, которые будут немедленно запускать запросы и возвращать результаты, что не имеет смысла в потоковом наборе данных.
Если вы попытаетесь выполнить любую из этих операций, вы увидите исключение AnalysisException, такое как "операция XYZ не поддерживается потоковыми DataFrames/Datasets".
Это выглядит знакомым, не так ли?
cache
не в списке неподдерживаемых операций, но это потому, что он просто был пропущен (я сообщил SPARK-20927, чтобы исправить его).
cache
должен был быть в списке, так как выполняет выполнение запроса до того, как запрос будет зарегистрирован в Spark SQL CacheManager.
Отпустите глубже в глубины Spark SQL... задерживайте дыхание...
cache
persist
, а persist
запрашивает текущий CacheManager для кэширования запроса:
sparkSession.sharedState.cacheManager.cacheQuery(this)
При кешировании запроса CacheManager
делает выполнить его:
sparkSession.sessionState.executePlan(planToCache).executedPlan
которому мы знаем, не разрешено, так как это start
(или foreach
) для этого.
Проблема решена!