Spark структурированная потоковая передача - объединение статического набора данных с потоковым набором данных
Я использую Spark structured streaming
для обработки записей, считанных с Kafka
. Вот чего я пытаюсь достичь:
(a) Каждая запись является Tuple2
типа (Timestamp, DeviceId)
.
(b) Я создал статический Dataset[DeviceId]
, который содержит набор всех допустимых идентификаторов устройств (типа DeviceId
), которые, как ожидается, будут отображаться в потоке Kafka
.
(c) Мне нужно написать запрос Spark structured streaming
, который
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
Например, допустим, что список всех допустимых идентификаторов устройства [A,B,C,D,E]
, а записи кафки в определенном 5-минутном окне содержат идентификаторы устройств [A,B,E]
. Затем для этого окна список невидимых идентификаторов устройств, которые я ищу, это [C,D]
.
Вопрос
- Как этот запрос может быть написан в Spark-структурированной потоковой передаче? Я попытался использовать методы
except()
и join()
, которые предоставляет Dataset
. Тем не менее, они оба бросили исключение во время выполнения, жалуясь, что ни одна из этих операций не поддерживается на streaming Dataset
.
Вот фрагмент моего кода:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L)))
case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)
// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
.count()
.map(row => (row.getLong(0), 1L))
.as[(Long, Long)]
val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
.filter(row => row.isNullAt(1))
.map(row => row.getLong(0))
Последний оператор выдает следующее исключение:
Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;
Спасибо заранее.
Ответы
Ответ 1
Ситуация с join operations
в искровой структурированной потоковой передаче выглядит следующим образом: потоковое DataFrames
можно объединить с static DataFrames
, чтобы в дальнейшем создать новый streaming DataFrames
. Но outer joins
между a streaming
и a static Datasets
поддерживается условно, а right/left joins
с streaming Dataset
не поддерживается вообще структурированной потоковой передачей. В результате вы столкнулись с AnalysisException, который был брошен, когда вы пытались создать статический набор данных соединения с потоковым набором данных. В качестве доказательства моих слов вы можете посмотреть исходный код искры, на этом line исключение бросает, что означает, что операция, которую вы пробовали, не поддерживается.
Я попытался выполнить операцию соединения на stream of DataFrames
со статическим DataFrames
.
val streamingDf = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")
//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")
//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")
Как вы видите, в дополнение к потреблению данных из Kafka
, я читаю данные из сокета, запущенного через nc
(netcat), это значительно упрощает жизнь, пока вы выполняете тестирование потокового приложения.
Этот подход отлично подходит для меня как с Kafka
, так и socket
в качестве источника данных.
Надеюсь, что это поможет.
Ответ 2
Внешние соединения с потоковым набором данных на противоположной стороне просто не поддерживаются:
- Внешние соединения между потоковым и статическим наборами данных условно поддерживаются.
- Полное внешнее соединение с потоковым набором данных не поддерживается
- Левое внешнее соединение с потоковым набором данных справа не поддерживается.
- Правое внешнее соединение с потоковым набором данных слева не поддерживается
Если другой Dataset
мал, вы можете использовать Map
или аналогичную структуру, broadcast
и ссылаться на нее внутри UserDefinedFunction
.
val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))
df.withColumn("foo", lookup($"_1"))