Какой наиболее эффективный способ фильтрации DataFrame
..., проверив, находится ли значение столбца в seq
.
Возможно, я не очень хорошо объясняю это, я в основном хочу этого (чтобы выразить это с помощью обычного SQL): DF_Column IN seq
?
Сначала я сделал это с помощью broadcast var
(где я поместил seq), UDF
(это проверили) и registerTempTable
.
Проблема в том, что я не смог ее протестировать, так как я столкнулся с известной ошибкой, которая, по-видимому, появляется только при использовании registerTempTable
с ScalaIDE.
Я закончил создание нового DataFrame
из seq
и выполнял внутреннее соединение с ним (пересечение), но я сомневаюсь, что это самый эффективный способ выполнения задачи.
Спасибо
РЕДАКТИРОВАТЬ: (в ответ на @YijieShen):
Как сделать filter
на основе того, находятся ли элементы одного столбца DataFrame
в другом столбце DF (например, SQL select * from A where login in (select username from B)
)?
например:
Первый DF:
login count
login1 192
login2 146
login3 72
Второй DF:
username
login2
login3
login4
Результат:
login count
login2 146
login3 72
Попытки:
EDIT-2: Я думаю, теперь, когда ошибка исправлена, они должны работать. END EDIT-2
ordered.select("login").filter($"login".contains(empLogins("username")))
и
ordered.select("login").filter($"login" in empLogins("username"))
которые оба бросают Exception in thread "main" org.apache.spark.sql.AnalysisException
, соответственно:
resolved attribute(s) username#10 missing from login#8 in operator
!Filter Contains(login#8, username#10);
и
resolved attribute(s) username#10 missing from login#8 in operator
!Filter login#8 IN (username#10);
Ответы
Ответ 1
-
Вы должны транслировать Set
вместо Array
, намного быстрее, чем линейный.
-
Вы можете заставить Eclipse запустить приложение Spark. Вот как:
Как указано в списке рассылки, spark-sql предполагает, что его классы загружаются изначальным загрузчиком классов. Это не тот случай, когда в Eclipse были библиотеки Java и Scala, загружались как часть пути класса загрузки, тогда как код пользователя и его зависимости находятся в другом. Вы можете легко исправить это в диалоговом окне конфигурации запуска:
- удалить Scala Библиотека и Scala Компилятор из записей "Bootstrap"
- добавить (как внешние банки)
scala-reflect
, scala-library
и scala-compiler
к пользовательской записи.
Диалог должен выглядеть так:
![enter image description here]()
Изменить: Исправлена ошибка Исправлена ошибка, и это обходное решение больше не нужно (начиная с версии 1.4.0)
Ответ 2
Мой код (после описания вашего первого метода) обычно работает в Spark 1.4.0-SNAPSHOT
для этих двух конфигураций:
-
Intellij IDEA test
-
Spark Standalone cluster
с 8 узлами (1 мастер, 7 человек)
Пожалуйста, проверьте, существуют ли какие-либо различия.
val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")
val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))
xdf.show()
filtered.show()
Выход
имя cnt
login1 192
login2 146
login3 72
имя cnt
login3 72