Как отфильтровать нулевое значение из блока данных искры

Я создал блок данных в искровом режиме со следующей схемой:

root
 |-- user_id: long (nullable = false)
 |-- event_id: long (nullable = false)
 |-- invited: integer (nullable = false)
 |-- day_diff: long (nullable = true)
 |-- interested: integer (nullable = false)
 |-- event_owner: long (nullable = false)
 |-- friend_id: long (nullable = false)

И данные приведены ниже:

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null|
| 437050836|1238456614|      0|       2|         0|  991277599|     null|
| 447244169|2095085551|      0|      -1|         0| 1579858878|     null|
| 516353916|1076364848|      0|       3|         1| 3597645735|     null|
| 528218683|1151525474|      0|       1|         0| 3433080956|     null|
| 531967718|3632072502|      0|       1|         0| 3863085861|     null|
| 627948360|2823119321|      0|       0|         0| 4092665803|     null|
| 811791433|3513954032|      0|       2|         0|  415464198|     null|
| 830686203|  99027353|      0|       0|         0| 3549822604|     null|
|1008893291|1115453150|      0|       2|         0| 2245155244|     null|
|1239364869|2824096896|      0|       2|         1| 2579294650|     null|
|1287950172|1076364848|      0|       0|         0| 3597645735|     null|
|1345896548|2658555390|      0|       1|         0| 2025118823|     null|
|1354205322|2564682277|      0|       3|         0| 2563033185|     null|
|1408344828|1255629030|      0|      -1|         1|  804901063|     null|
|1452633375|1334001859|      0|       4|         0| 1488588320|     null|
|1625052108|3297535757|      0|       3|         0| 1972598895|     null|
+----------+----------+-------+--------+----------+-----------+---------+

Я хочу отфильтровать строки, имеющие нулевые значения в поле "friend_id".

scala> val aaa = test.filter("friend_id is null")

scala> aaa.count

Я получил: res52: Long = 0, что очевидно не правильно. Каков правильный способ его получить?

Еще один вопрос, я хочу заменить значения в поле friend_id. Я хочу заменить null на 0 и 1 для любого другого значения, кроме null. Код, который я могу выяснить, это:

val aaa = train_friend_join.select($"user_id", $"event_id", $"invited", $"day_diff", $"interested", $"event_owner", ($"friend_id" != null)?1:0)

Этот код также не работает. Может ли кто-нибудь сказать мне, как я могу это исправить? Благодаря

Ответы

Ответ 1

Скажем, у вас есть эта настройка данных (чтобы результаты были воспроизводимыми):

// declaring data types
case class Company(cName: String, cId: String, details: String)
case class Employee(name: String, id: String, email: String, company: Company)

// setting up example data
val e1 = Employee("n1", null, "[email protected]", Company("c1", "1", "d1"))
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1"))
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1"))
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2"))
val e5 = Employee("n5", null, "[email protected]", Company("c2", "2", "d2"))
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2"))
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3"))
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3"))
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8)
val df = sc.parallelize(employees).toDF

Данные:

+----+----+---------+---------+
|name|  id|    email|  company|
+----+----+---------+---------+
|  n1|null|[email protected]|[c1,1,d1]|
|  n2|   2|[email protected]|[c1,1,d1]|
|  n3|   3|[email protected]|[c1,1,d1]|
|  n4|   4|[email protected]|[c2,2,d2]|
|  n5|null|[email protected]|[c2,2,d2]|
|  n6|   6|[email protected]|[c2,2,d2]|
|  n7|   7|[email protected]|[c3,3,d3]|
|  n8|   8|[email protected]|[c3,3,d3]|
+----+----+---------+---------+

Теперь, чтобы отфильтровать сотрудников с помощью null ids, вы сделаете -

df.filter("id is null").show

который будет правильно показывать вам следующее:

+----+----+---------+---------+
|name|  id|    email|  company|
+----+----+---------+---------+
|  n1|null|[email protected]|[c1,1,d1]|
|  n5|null|[email protected]|[c2,2,d2]|
+----+----+---------+---------+

Приступая ко второй части вашего вопроса, вы можете заменить идентификаторы null на 0 и другие значения на 1 с помощью этого -

df.withColumn("id", when($"id".isNull, 0).otherwise(1)).show

Это приводит к:

+----+---+---------+---------+
|name| id|    email|  company|
+----+---+---------+---------+
|  n1|  0|[email protected]|[c1,1,d1]|
|  n2|  1|[email protected]|[c1,1,d1]|
|  n3|  1|[email protected]|[c1,1,d1]|
|  n4|  1|[email protected]|[c2,2,d2]|
|  n5|  0|[email protected]|[c2,2,d2]|
|  n6|  1|[email protected]|[c2,2,d2]|
|  n7|  1|[email protected]|[c3,3,d3]|
|  n8|  1|[email protected]|[c3,3,d3]|
+----+---+---------+---------+

Ответ 2

Или как df.filter($"friend_id".isNotNull)

Ответ 3

df.where(df.col("friend_id").isNull)

Ответ 4

Хорошим решением для меня было удалить строки с любыми нулевыми значениями:

Dataset<Row> filtered = df.filter(row => !row.anyNull);

В случае, если один заинтересован в другом случае, просто позвоните row.anyNull. (Spark 2.1.0 с использованием Java API)

Ответ 5

Есть два способа сделать это: создать условие фильтра 1) Вручную 2) Динамически.

Пример DataFrame:

val df = spark.createDataFrame(Seq(
  (0, "a1", "b1", "c1", "d1"),
  (1, "a2", "b2", "c2", "d2"),
  (2, "a3", "b3", null, "d3"),
  (3, "a4", null, "c4", "d4"),
  (4, null, "b5", "c5", "d5")
)).toDF("id", "col1", "col2", "col3", "col4")

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
|  2|  a3|  b3|null|  d3|
|  3|  a4|null|  c4|  d4|
|  4|null|  b5|  c5|  d5|
+---+----+----+----+----+

1) Создание условия фильтра вручную, т.е. с помощью функции DataFrame where или filter

df.filter(col("col1").isNotNull && col("col2").isNotNull).show

или

df.where("col1 is not null and col2 is not null").show

Результат:

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
|  2|  a3|  b3|null|  d3|
+---+----+----+----+----+

2) Динамическое создание условия фильтра: это полезно, когда мы не хотим, чтобы какой-либо столбец имел нулевое значение, и имеется большое количество столбцов, что в большинстве случаев имеет место.

Создание условия фильтра вручную в этих случаях будет тратить много времени. В коде ниже мы включаем все столбцы динамически, используя функции map и reduce для столбцов DataFrame:

val filterCond = df.columns.map(x=>col(x).isNotNull).reduce(_ && _)

Как выглядит filterCond:

filterCond: org.apache.spark.sql.Column = (((((id IS NOT NULL) AND (col1 IS NOT NULL)) AND (col2 IS NOT NULL)) AND (col3 IS NOT NULL)) AND (col4 IS NOT NULL))

Фильтрация:

val filteredDf = df.filter(filterCond)

Результат:

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
+---+----+----+----+----+

Ответ 6

Из намека Михаила Копаниева ниже работ

df.where(df("id").isNotNull).show

Ответ 7

Вот решение для искры в Java. Чтобы выбрать строки данных , содержащие нули. Когда у вас есть данные Dataset, вы делаете:

Dataset<Row> containingNulls =  data.where(data.col("COLUMN_NAME").isNull())

Чтобы отфильтровать нулевые значения без:

Dataset<Row> withoutNulls = data.where(data.col("COLUMN_NAME").isNotNull())

Часто dataframes содержат столбцы типа String, где вместо нулей мы имеем пустые строки типа "". Чтобы отфильтровать такие данные, выполните следующие действия:

Dataset<Row> withoutNullsAndEmpty = data.where(data.col("COLUMN_NAME").isNotNull().and(data.col("COLUMN_NAME").notEqual("")))

Ответ 8

по первому вопросу верно, что вы отфильтровываете нули и, следовательно, количество равно нулю.

для второй замены: используйте как показано ниже:

val options = Map("path" -> "...\\ex.csv", "header" -> "true")
val dfNull = spark.sqlContext.load("com.databricks.spark.csv", options)

scala> dfNull.show

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null|
| 437050836|1238456614|      0|       2|         0|  991277599|     null|
| 447244169|2095085551|      0|      -1|         0| 1579858878|        a|
| 516353916|1076364848|      0|       3|         1| 3597645735|        b|
| 528218683|1151525474|      0|       1|         0| 3433080956|        c|
| 531967718|3632072502|      0|       1|         0| 3863085861|     null|
| 627948360|2823119321|      0|       0|         0| 4092665803|     null|
| 811791433|3513954032|      0|       2|         0|  415464198|     null|
| 830686203|  99027353|      0|       0|         0| 3549822604|     null|
|1008893291|1115453150|      0|       2|         0| 2245155244|     null|
|1239364869|2824096896|      0|       2|         1| 2579294650|        d|
|1287950172|1076364848|      0|       0|         0| 3597645735|     null|
|1345896548|2658555390|      0|       1|         0| 2025118823|     null|
|1354205322|2564682277|      0|       3|         0| 2563033185|     null|
|1408344828|1255629030|      0|      -1|         1|  804901063|     null|
|1452633375|1334001859|      0|       4|         0| 1488588320|     null|
|1625052108|3297535757|      0|       3|         0| 1972598895|     null|
+----------+----------+-------+--------+----------+-----------+---------+

dfNull.withColumn("friend_idTmp", when($"friend_id".isNull, "1").otherwise("0")).drop($"friend_id").withColumnRenamed("friend_idTmp", "friend_id").show

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|        1|
|  78065188| 498404626|      0|       0|         0| 2904922087|        1|
| 282487230|2520855981|      0|      28|         0| 3749735525|        1|
| 335269852|1641491432|      0|       2|         0| 1490350911|        1|
| 437050836|1238456614|      0|       2|         0|  991277599|        1|
| 447244169|2095085551|      0|      -1|         0| 1579858878|        0|
| 516353916|1076364848|      0|       3|         1| 3597645735|        0|
| 528218683|1151525474|      0|       1|         0| 3433080956|        0|
| 531967718|3632072502|      0|       1|         0| 3863085861|        1|
| 627948360|2823119321|      0|       0|         0| 4092665803|        1|
| 811791433|3513954032|      0|       2|         0|  415464198|        1|
| 830686203|  99027353|      0|       0|         0| 3549822604|        1|
|1008893291|1115453150|      0|       2|         0| 2245155244|        1|
|1239364869|2824096896|      0|       2|         1| 2579294650|        0|
|1287950172|1076364848|      0|       0|         0| 3597645735|        1|
|1345896548|2658555390|      0|       1|         0| 2025118823|        1|
|1354205322|2564682277|      0|       3|         0| 2563033185|        1|
|1408344828|1255629030|      0|      -1|         1|  804901063|        1|
|1452633375|1334001859|      0|       4|         0| 1488588320|        1|
|1625052108|3297535757|      0|       3|         0| 1972598895|        1|
+----------+----------+-------+--------+----------+-----------+---------+

Ответ 9

Я использую следующий код для решения моего вопроса. Оно работает. Но, как мы все знаем, я работаю вокруг одной мили, чтобы решить эту проблему. Итак, есть ли сокращение для этого? Благодаря

def filter_null(field : Any) : Int = field match {
    case null => 0
    case _    => 1
}

val test = train_event_join.join(
    user_friends_pair,
    train_event_join("user_id") === user_friends_pair("user_id") &&
    train_event_join("event_owner") === user_friends_pair("friend_id"),
    "left"
).select(
    train_event_join("user_id"),
    train_event_join("event_id"),
    train_event_join("invited"),
    train_event_join("day_diff"),
    train_event_join("interested"),
    train_event_join("event_owner"),
    user_friends_pair("friend_id")
).rdd.map{
    line => (
        line(0).toString.toLong,
        line(1).toString.toLong,
        line(2).toString.toLong,
        line(3).toString.toLong,
        line(4).toString.toLong,
        line(5).toString.toLong,
        filter_null(line(6))
        )
    }.toDF("user_id", "event_id", "invited", "day_diff", "interested", "event_owner", "creator_is_friend")