Удаление дубликатов из строк на основе определенных столбцов в RDD/Spark DataFrame
Скажем, у меня довольно большой набор данных в следующем виде:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
Что я хотел бы сделать, это удалить повторяющиеся строки, основываясь только на значениях первого, третьего и четвертого столбцов.
Удаление полностью повторяющихся строк является простым:
data = data.distinct()
и либо строка 5, либо строка 6 будут удалены
Но как мне удалять только повторяющиеся строки только на столбцах 1, 3 и 4? то есть удалить один из них:
('Baz',22,'US',6)
('Baz',36,'US',6)
В Python это можно сделать, указав столбцы с .drop_duplicates()
. Как я могу добиться того же в Spark/Pyspark?
Ответы
Ответ 1
Pyspark включает метод dropDuplicates()
. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
Возможно, это было введено в более поздней версии, чем то, что использовал @Jason (OP)?
edit: да, это было введено в 1.4
Ответ 2
Из вашего вопроса неясно, какие столбцы вы хотите использовать для определения дубликатов. Общая идея решения заключается в создании ключа, основанного на значениях столбцов, которые идентифицируют дубликаты. Затем вы можете использовать операции reduceByKey или уменьшить операции для устранения дубликатов.
Вот вам какой код, чтобы вы начали:
def get_key(x):
return "{0}{1}{2}".format(x[0],x[2],x[3])
m = data.map(lambda x: (get_key(x),x))
Теперь у вас есть ключевое значение RDD
, которое задается столбцами 1,3 и 4.
Следующим шагом будет либо reduceByKey
, либо groupByKey
и filter
.
Это устранит дубликаты.
r = m.reduceByKey(lambda x,y: (x))
Ответ 3
Согласитесь с Дэвидом. Чтобы добавить, может быть не так, что мы хотим groupBy все столбцы, отличные от столбца (ов) в агрегированной функции, то есть, если мы хотим удалить дубликаты, основанные только на поднаборе столбцов и сохранить все столбцы в исходном фрейме. Таким образом, лучшим способом сделать это может быть использование dropDuplicates Dataframe api, доступный в Spark 1.4.0
Для справки см. https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame
Ответ 4
Я знаю, что вы уже приняли другой ответ, но если вы хотите сделать это как
DataFrame, просто используйте groupBy и agg. Предполагая, что у вас уже создан DF (с столбцами с именем "col1", "col2" и т.д.), Вы можете сделать:
myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
Обратите внимание, что в этом случае я выбрал Max of col2, но вы можете сделать avg, min и т.д.
Ответ 5
Я использовал встроенную функцию dropDuplicates(). Scala приведенный ниже код
val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")
data.dropDuplicates(Array("x","count")).show()
Выход:
+---+---+---+-----+
| x| y| z|count|
+---+---+---+-----+
|Baz| 22| US| 6|
|Foo| 39| UK| 1|
|Foo| 41| US| 3|
|Bar| 57| CA| 2|
+---+---+---+-----+
Ответ 6
Это мой Df, содержащий 4, повторяется дважды, поэтому здесь будут удалены повторяющиеся значения.
scala> df.show
+-----+
|value|
+-----+
| 1|
| 4|
| 3|
| 5|
| 4|
| 18|
+-----+
scala> val newdf=df.dropDuplicates
scala> newdf.show
+-----+
|value|
+-----+
| 1|
| 3|
| 5|
| 4|
| 18|
+-----+