Left Anti присоединяется к Spark?
Я определил две таблицы:
val tableName = "table1"
val tableName2 = "table2"
val format = new SimpleDateFormat("yyyy-MM-dd")
val data = List(
List("mike", 26, true),
List("susan", 26, false),
List("john", 33, true)
)
val data2 = List(
List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
)
val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("isBoy", BooleanType, false)
))
val schema2 = StructType(Array(
StructField("name", StringType, true),
StructField("grade", StringType, true),
StructField("howold", IntegerType, true),
StructField("hobby", StringType, true),
StructField("birthday", DateType, false)
))
val df = sqlContext.createDataFrame(rdd, schema)
val df2 = sqlContext.createDataFrame(rdd2, schema2)
df.createOrReplaceTempView(tableName)
df2.createOrReplaceTempView(tableName2)
Я пытаюсь построить запрос для возврата строк из таблицы1, у которой нет соответствующей строки в таблице2.
Я попытался сделать это, используя этот запрос:
Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL
но это просто дает мне все строки из таблицы1:
Список ({ "имя": "Джон", "возраст" : 33, "isBoy" : правда}, { "Имя": "Susan", "возраст" : 26, "isBoy" ложь}, { "Имя": "Майк", "возраст" : 26, "isBoy" : истинно})
Как сделать этот тип соединения в Spark эффективно?
Я ищу SQL-запрос, потому что мне нужно указать столбцы, которые можно сравнить между двумя таблицами, а не просто сравнивать строки за строкой, как это делается в других рекомендуемых вопросах. Подобно использованию вычитания, кроме и т.д.
Ответы
Ответ 1
Вы можете использовать тип объединения "left anti" - либо с помощью API DataFrame, либо с помощью SQL (API DataFrame поддерживает все, что поддерживает SQL, включая любое условие соединения):
API DataFrame:
df.as("table1").join(
df2.as("table2"),
$"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",
"leftanti"
)
SQL:
sqlContext.sql(
"""SELECT table1.* FROM table1
| LEFT ANTI JOIN table2
| ON table1.name = table2.name AND table1.age = table2.howold
""".stripMargin)
ПРИМЕЧАНИЕ: также стоит отметить, что существует более короткий и более сжатый способ создания выборочных данных без указания схемы отдельно, с использованием кортежей и неявного метода toDF
, а затем "исправление", автоматически-выводимую схему, где необходимо:
val df = List(
("mike", 26, true),
("susan", 26, false),
("john", 33, true)
).toDF("name", "age", "isBoy")
val df2 = List(
("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType))
Ответ 2
Вы можете сделать это со встроенной функцией except
(Я бы использовал предоставленный вами код, но вы не включили импорт, поэтому я не мог просто c/p его:()
val a = sc.parallelize(Seq((1,"a",123),(2,"b",456))).toDF("col1","col2","col3")
val b= sc.parallelize(Seq((4,"a",432),(2,"t",431),(2,"b",456))).toDF("col1","col2","col3")
scala> a.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a| 123|
| 2| b| 456|
+----+----+----+
scala> b.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 4| a| 432|
| 2| t| 431|
| 2| b| 456|
+----+----+----+
scala> a.except(b).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a| 123|
+----+----+----+
Ответ 3
В SQL вы можете просто выполнить свой запрос ниже (не уверен, что он работает в SPARK)
Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold where table2.name IS NULL
Это вернет все строки таблицы1, для которых не удалось выполнить соединение
Ответ 4
Вы можете использовать левый анти.
dfRcc20.as("a").join(dfClientesDuplicados.as("b")
,col("a.eteerccdiid")===col("b.eteerccdiid")&&
col("a.eteerccdinr")===col("b.eteerccdinr")
,"left_anti")