Как разрешить атрибут AnalysisException: разрешенные атрибуты в Spark
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")
Операция соединения работает нормально, но когда я повторно использую df2, я столкнулся с неразрешенными атрибутами.
val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")
ERROR: org.apache.spark.sql.AnalysisException: разрешенные атрибуты id # 426
Ответы
Ответ 1
Как упоминалось в моем комментарии, это связано с https://issues.apache.org/jira/browse/SPARK-10925 и, более конкретно, https://issues.apache.org/jira/browse/SPARK-14948. Повторное использование ссылки создаст неоднозначность при именовании, поэтому вам нужно будет клонировать df - см. Последний комментарий в https://issues.apache.org/jira/browse/SPARK-14948 для примера.
Ответ 2
Если у вас есть df1 и df2, полученные из df1, попробуйте переименовать все столбцы в df2, чтобы после объединения не было двух одинаковых имен. Итак, прежде чем присоединиться:
так что вместо df1.join(df2...
делать
# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')
# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)
Ответ 3
Я получил ту же проблему при попытке использовать один DataFrame в двух последовательных объединениях.
Вот проблема: DataFrame A имеет 2 столбца (пусть они называются x и y), а DataFrame B также имеет 2 столбца (пусть они называются w и z). Мне нужно соединить A с B на x = z, а затем соединить их вместе на y = z.
(A join B on A.x=B.z) as C join B on C.y=B.z
Я получил точную ошибку, что во втором соединении он жаловался на " разрешенный атрибут Bz # 1234... ".
Следуя ссылкам, предоставленным @Erik, и другим блогам и вопросам, я понял, что мне нужен клон B.
Вот что я сделал:
val aDF = ...
val bDF = ...
val bCloned = spark.createDataFrame(bDF.rdd, bDF.schema)
aDF.join(bDF, aDF("x") === bDF("z")).join(bCloned, aDF("y") === bCloned("z"))
Ответ 4
В моем случае эта ошибка возникла при самостоятельном соединении одной и той же таблицы.
Я столкнулся с приведенной ниже проблемой с Spark SQL, а не с API-интерфейсом для данных:
org.apache.spark.sql.AnalysisException: Resolved attribute(s) originator#3084,program_duration#3086,originator_locale#3085 missing from program_duration#1525,guid#400,originator_locale#1524,EFFECTIVE_DATETIME_UTC#3157L,device_timezone#2366,content_rpd_id#734L,originator_sublocale#2355,program_air_datetime_utc#3155L,originator#1523,master_campaign#735,device_provider_id#2352 in operator !Deduplicate [guid#400, program_duration#3086, device_timezone#2366, originator_locale#3085, originator_sublocale#2355, master_campaign#735, EFFECTIVE_DATETIME_UTC#3157L, device_provider_id#2352, originator#3084, program_air_datetime_utc#3155L, content_rpd_id#734L]. Attribute(s) with the same name appear in the operation: originator,program_duration,originator_locale. Please check if the right attribute(s) are used.;;
Ранее я использовал запрос ниже,
SELECT * FROM DataTable as aext
INNER JOIN AnotherDataTable LAO
ON aext.device_provider_id = LAO.device_provider_id
Выбор только столбцов перед присоединением решил эту проблему для меня.
SELECT * FROM (
select distinct EFFECTIVE_DATE,system,mso_Name,EFFECTIVE_DATETIME_UTC,content_rpd_id,device_provider_id
from DataTable
) as aext
INNER JOIN AnotherDataTable LAO ON aext.device_provider_id = LAO.device_provider_id
Ответ 5
Для разработчиков Java попробуйте вызвать этот метод:
private static Dataset<Row> cloneDataset(Dataset<Row> ds) {
List<Column> filterColumns = new ArrayList<>();
List<String> filterColumnsNames = new ArrayList<>();
scala.collection.Iterator<StructField> it = ds.exprEnc().schema().toIterator();
while (it.hasNext()) {
String columnName = it.next().name();
filterColumns.add(ds.col(columnName));
filterColumnsNames.add(columnName);
}
ds = ds.select(JavaConversions.asScalaBuffer(filterColumns).seq()).toDF(scala.collection.JavaConverters.asScalaIteratorConverter(filterColumnsNames.iterator()).asScala().toSeq());
return ds;
}
на обоих наборах данных непосредственно перед присоединением, он клонирует наборы данных в новые:
df1 = cloneDataset(df1);
df2 = cloneDataset(df2);
Dataset<Row> join = df1.join(df2, col("column_name"));
// if it didn't work try this
final Dataset<Row> join = cloneDataset(df1.join(df2, columns_seq));
Ответ 6
Это будет работать, если вы сделаете следующее.
Предположим, у вас есть датафрейм. df1 и если вы хотите соединить один и тот же кадр данных, вы можете использовать ниже
df1.toDF("ColA","ColB").as("f_df").join(df1.toDF("ColA","ColB").as("t_df"),
$"f_df.pcmdty_id" ===
$"t_df.assctd_pcmdty_id").select($"f_df.pcmdty_id",$"f_df.assctd_pcmdty_id")
Ответ 7
Из моего опыта у нас есть 2 решения
1) клон DF
2) переименовывать столбцы, которые имеют неоднозначность, до объединения таблиц. (не забудьте удалить дублированный ключ соединения)
Лично я предпочитаю второй метод, потому что клонирование DF в первом методе требует времени, особенно если размер данных большой.