Ответ 1
У вас явно есть проблема с огромным правильным перекосом данных. Давайте посмотрим статистику, которую вы предоставили:
df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088]
df2 = [mean=1.0, stddev=0.0, count=18408194]
Со средним значением около 5 и стандартным отклонением более 2000 вы получаете длинный хвост.
Поскольку некоторые ключи намного чаще, чем другие, после перераспределения некоторых исполнителей будет гораздо больше работы, чем остальных.
Кроме того, ваше описание подсказывает, что проблема может быть связана с одним или несколькими ключами, которые имеют хэш в том же разделе.
Итак, сначала определите выбросы (псевдокод):
val mean = 4.989209978967438
val sd = 2255.654165352454
val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache
val frequent = counts
.where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist.
.alias("frequent")
.join(df1, Seq("userId"))
а остальное:
val infrequent = counts
.where($"count" <= mean + 2 * sd)
.alias("infrequent")
.join(df1, Seq("userId"))
Можно ли ожидать чего-то? Если нет, попробуйте определить источник проблемы вверх по течению.
Если ожидается, вы можете попробовать:
-
передача меньшего стола:
val df2 = sqlContext.sql("Select * from Table2") df2.join(broadcast(df1), Seq("userId"), "rightouter")
-
расщепление, объединение (
union
) и широковещательная передача:df2.join(broadcast(frequent), Seq("userId"), "rightouter") .union(df2.join(infrequent, Seq("userId"), "rightouter"))
-
соление
userId
с некоторыми случайными данными
но вы не должны:
- перераспределить все данные и отсортировать их локально (хотя сортировка только локально не должна быть проблемой)
- выполнять стандартные хеш-соединения для полных данных.