Фильтрация DataFrame Pyspark с помощью SQL-подобной инструкции IN
Я хочу отфильтровать DataFrame Pyspark с SQL-подобным предложением IN
, как в
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
где a
- это набор (1, 2, 3)
. Я получаю эту ошибку:
java.lang.RuntimeException: [1.67] failure: `` ('' ожидаемый, но идентификатор найденный
который в основном говорит, что он ожидал чего-то вроде "(1, 2, 3)" вместо a.
Проблема в том, что я не могу вручную записывать значения в a, как извлеченные из другого задания.
Как я буду фильтровать в этом случае?
Ответы
Ответ 1
Строка, передаваемая в SQLContext
, оценивается в области среды SQL. Он не фиксирует закрытие. Если вы хотите передать переменную, вам придется делать это явно с использованием форматирования строк:
df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
## 2
Очевидно, что это не то, что вы использовали бы в "реальной" среде SQL из-за соображений безопасности, но здесь это не имеет значения.
На практике DataFrame
DSL - это большой выбор, если вы хотите создавать динамические запросы:
from pyspark.sql.functions import col
df.where(col("v").isin({"foo", "bar"})).count()
## 2
Легко создавать и компоновать и обрабатывать все детали HiveQL/Spark SQL для вас.
Ответ 2
повторение того, что упомянуто выше @zero323: мы можем сделать то же самое, используя список (не только set
), как показано ниже
from pyspark.sql.functions import col
df.where(col("v").isin(["foo", "bar"])).count()
Ответ 3
Просто небольшое дополнение/обновление:
choice_list = ["foo", "bar", "jack", "joan"]
Если вы хотите отфильтровать ваш фрейм данных "df" так, чтобы сохранить строки на основе столбца "v", принимая только значения из choice_list, то
df_filtered = df.where( ( col("v").isin (choice_list) ) )
Ответ 4
Немного другой подход, который работал для меня, состоит в том, чтобы фильтровать с пользовательской функцией фильтра.
def filter_func(a):
"""wrapper function to pass a in udf"""
def filter_func_(col):
"""filtering function"""
if col in a.value:
return True
return False
return udf(filter_func_, BooleanType())
# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) \
Ответ 5
Вы также можете сделать это для целочисленных столбцов:
df_filtered = df.filter("field1 in (1,2,3)")
или это для строковых столбцов:
df_filtered = df.filter("field1 in ('a','b','c')")