Как обновить столбец на основе условия (значение в группе)?
У меня есть один df
+---+----+-----+
|sno|dept|color|
+---+----+-----+
| 1| fn| red|
| 2| fn| blue|
| 3| fn|green|
+---+----+-----+
Если любое значение столбца цвета красное, тогда я должен обновить все значения столбцов как красные
как показано ниже
+---+----+-----+
|sno|dept|color|
+---+----+-----+
| 1| fn| red|
| 2| fn| red|
| 3| fn| red|
+---+----+-----+
Я не мог понять. Любая помощь пожалуйста
Я устал от кода
val gp=jdbcDF.filter($"dept".contains("fn"))
//.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
gp.show()
gp.map(
row=>{
val row1=row.getAs[String](1)
var row2=row.getAs[String](2)
val make=if(row1 =="fn") row2="red"
Row(row(0),row(1),make)
}
).collect().foreach(println)
Ответы
Ответ 1
Дано:
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "fn", "color")
Сделайте расчет:
val redOrNot = df.groupBy("fn")
.agg(collect_set('color) as "values")
.withColumn("hasRed", array_contains('values, "red"))
// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
.withColumn("resultColor", colorPicker)
.withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
.select('id, 'fn, 'color)
result
выглядит следующим образом (это кажется ответом):
scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
| 1| fn| red|
| 2| fn| red|
| 3| fn| red|
| 4| aa| blue|
| 5| aa|green|
| 6| bb| red|
| 7| bb| red|
| 8| aa| blue|
+---+---+-----+
Вы можете привязать операторы when
и иметь значение по умолчанию с otherwise
. Обратитесь к scaladoc оператора when
.
Я думаю, вы могли бы сделать что-то очень похожее (и, возможно, более эффективное), используя оконные операторы или пользовательские агрегированные функции (UDAF), но... ну... в настоящее время не знаю, как это сделать. Оставляя комментарий здесь, чтобы вдохновить других; -)
p.s. Узнал много! Спасибо за идею!
Ответ 2
Эффективное решение, которое не требует дорогостоящей группировки:
// All groups with `red`
df.where($"color" === "red").select($"fn".alias("fn_")).distinct
// Join with input
.join(df.as("df"), $"fn" === $"fn_", "rightouter")
// Replace `color`
.withColumn("color", when($"fn_"isNull, $"color").otherwise(lit("red")))
.drop("fn_")
Ответ 3
Вы условно обновляете DataFrame, если он удовлетворяет определенному свойству. В этом случае свойство "цветной столбец содержит" красный ". Идиоматическим способом выразить это является фильтрация с искомым предикатом, а затем определение того, удовлетворяют ли какие-либо строки. Нет необходимости в соединении.
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame
def makeAllRedIfAnyAreRed(df: DataFrame) = {
val containsRed = df.filter(df("color") === "red").count() > 0
if (containsRed) df.withColumn("color", lit("red")) else df
}
Ответ 4
Поскольку в отфильтрованном фрейме данных может быть несколько строк, я добавляю решение с комбинацией isin()
и .withColumn()
.
Пример DataFrame
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "dept", "color")
Теперь выберите только dept
, у которых есть хотя бы одна строка red color
и поместите ее в переменную broadcast
, как показано ниже.
val depts = sc.broadcast(df.filter($"color" === "red").select(collect_set("dept")).first.getSeq[String](0)))
Обновить красный цвет для отфильтрованных записей depts
.
isin()
принимает vararg, поэтому конвертирует список в vararg (depts.value:_*
)
//creating new column by giving diff name (clr) to see the diff
val result = df.withColumn("clr", when($"dept".isin(depts.value:_*),lit("red"))
.otherwise($"color"))
result.show()
+---+----+-----+-----+
| id|dept|color| clr|
+---+----+-----+-----+
| 1| fn| red| red|
| 2| fn| blue| red|
| 3| fn|green| red|
| 4| aa| blue| blue|
| 5| aa|green|green|
| 6| bb| red| red|
| 7| bb| red| red|
| 8| aa| blue| blue|
+---+----+-----+-----+
Ответ 5
Искра 2.2.0:
Пример Dataframe (взятый из приведенных выше примеров)
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "dept", "color")
создал UDF для выполнения обновления, проверив условие.
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()
выход:
![введите описание изображения здесь]()
искра 1.6:
val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// For implicit conversions like converting RDDs to DataFrames
val df = sc.parallelize(Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
) ).toDF("id","dept","color")
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()