Ответ 1
Кассандра не имеет понятия null. Столбец пуст или заполнен. Я решил эту проблему в scala следующим образом: я использовал метод карты и проверил нулевые значения. Я обнуляю null с пустой строкой. Это. Работает очень хорошо.
Я пытаюсь сохранить данные потока в Cassandra, используя Spark и Cassandra Spark Connector.
Я сделал что-то вроде следующего:
Создать класс модели:
public class ContentModel {
String id;
String available_at; //may be null
public ContentModel(String id, String available_at){
this.id=id;
this.available_at=available_at,
}
}
Сопоставление потокового контента с моделью:
JavaDStream<ContentModel> contentsToModel = myStream.map(new Function<String, ContentModel>() {
@Override
public ContentModel call(String content) throws Exception {
String[] parts = content.split(",");
return new ContentModel(parts[0], parts[1]);
}
});
Сохранить
CassandraStreamingJavaUtil.javaFunctions(contentsToModel).writerBuilder("data", "contents", CassandraJavaUtil.mapToRow(ContentModel.class)).saveToCassandra();
Если некоторые значения null
, я получаю следующую ошибку:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to struct.ValueRepr.
Есть ли способ сохранить нулевые значения, используя Spark Cassandra Connector?
Кассандра не имеет понятия null. Столбец пуст или заполнен. Я решил эту проблему в scala следующим образом: я использовал метод карты и проверил нулевые значения. Я обнуляю null с пустой строкой. Это. Работает очень хорошо.
В scala вы также можете использовать параметры для этого.
Можем ли мы узнать версию ваших зависимостей (Spark, Connector, Cassandra и т.д.)
Да, есть способ хранить нули с помощью Cassandra Connector. Я получил ваш пример для правильной работы с Простым приложением и несколькими изменениями (Добавление Serializabe + преобразование свойств вашей модели в Camel Case + Относительные методы получения и установки). Я менее знаком с Java API (вам действительно следует использовать Scala при работе с Spark, это значительно упрощает работу), но у меня сложилось впечатление, что размышления о классах Model были сделаны на уровне получателя/установщика... Может быть неправильно.
Модель
public class ModelClass implements Serializable {
String id;
String availableAt; //may be null
public ModelClass(String id, String availableAt){
this.id=id;
this.availableAt=availableAt;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAvailableAt() {
return availableAt;
}
public void setAvailableAt(String availableAt) {
this.availableAt = availableAt;
}
}
Водитель
public static void main(String ... args) {
SparkConf conf = new SparkConf();
conf.setAppName("Local App");
conf.setMaster("local[*]");
JavaSparkContext context = new JavaSparkContext(conf);
List<ModelClass> modelList = new ArrayList<>();
modelList.add(new ModelClass("Test", null));
modelList.add(new ModelClass("Test2", "test"));
context.parallelize(modelList);
JavaRDD<ModelClass> modelRDD = context.parallelize(modelList);
javaFunctions(modelRDD).writerBuilder("test", "model", mapToRow(ModelClass.class))
.saveToCassandra();
}
Производит
cqlsh:test> select * from model;
id | available_at
-------+--------------
Test | null
Test2 | test
Однако важно знать, как вы "пишете" нули. Вообще говоря, мы хотим избежать выписывания нулей из-за того, как Кассандра генерирует надгробия. Если это начальные записи, вы можете рассматривать их как "Unset".
Глобально рассматривает все нули как Unset
Глобально обрабатывая все нули как Unset WriteConf теперь также содержит параметр ignoreNulls, который можно установить с помощью ключа SparkConf spark.cassandra.output.ignoreNulls. По умолчанию установлено значение false, которое будет заставить нули обрабатываться как в предыдущих версиях (вставляется в Кассандра как есть). При значении true все нули будут рассматриваться как неустановленные. Это можно использовать с DataFrames, чтобы пропустить пустые записи и избежать надгробия.
ОБНОВЛЕНИЕ: Я должен уточнить, внутренне Кассандра не хранит фактическое нулевое значение - оно просто сбрасывается. Но мы можем рассуждать, что Cassandra использует нули на уровне приложения.