Как получить доступ к элементу столбца VectorUDT в Spark DataFrame?
У меня есть dataframe df
с столбцом VectorUDT
с именем features
. Как получить элемент столбца, например, первый элемент?
Я пробовал сделать следующее
from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
но я получаю ошибку net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)
. Такая же ошибка, если я делаю first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])
.
Я также пробовал explode()
, но получаю ошибку, потому что для этого требуется массив или тип карты.
Это, по-моему, общая операция.
Ответы
Ответ 1
Преобразовать вывод в float
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
Пример использования:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.select(ith("features", lit(1))).show()
## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## | 2.0|
## | 9.0|
## +-----------------+
Пояснение:
Выходные значения должны быть повторно инициализированы для эквивалентных объектов Java. Если вы хотите получить доступ к values
(остерегайтесь SparseVectors
), вы должны использовать метод item
:
v.values.item(0)
которые возвращают стандартные скаляры Python. Аналогично, если вы хотите получить доступ ко всем значениям в виде плотной структуры:
v.toArray().tolist()
Ответ 2
Если вы предпочитаете использовать spark.sql, вы можете использовать следующую пользовательскую функцию to_array, чтобы преобразовать вектор в произвольный. Затем вы можете манипулировать им как массивом.
from pyspark.sql.types import ArrayType, DoubleType
def to_array_(v):
return v.toArray().tolist()
from pyspark.sql import SQLContext
sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None)
sqlContext.udf.register("to_array",to_array_, ArrayType(DoubleType()))
пример
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.createOrReplaceTempView("tb")
spark.sql("""select * , to_array(features)[1] Second from tb """).toPandas()
выход
id features Second
0 1 [1.0, 2.0, 3.0] 2.0
1 2 (0.0, 9.0, 0.0) 9.0