Как загрузить Spark Cassandra Connector в оболочку?
Я пытаюсь использовать Spark Cassandra Connector в Spark 1.1.0.
Я успешно создал файл jar из ведущей ветки на GitHub и получил включенные демоверсии. Однако, когда я пытаюсь загрузить файлы jar в spark-shell
, я не могу импортировать ни один из классов из пакета com.datastax.spark.connector
.
Я попытался использовать параметр --jars
на spark-shell
и добавил каталог с файлом jar в Java CLASSPATH. Ни один из этих вариантов не работает. Фактически, когда я использую параметр --jars
, вывод журнала показывает, что барабан Datastax загружается, но я все еще ничего не могу импортировать из com.datastax
.
Мне удалось загрузить соединитель Tuplejump Calliope Cassandra в spark-shell
с помощью --jars
, поэтому я знаю, что работаю. Это просто разъем Datastax, который не работает для меня.
Ответы
Ответ 1
Я понял. Вот что я сделал:
$ git clone https://github.com/datastax/spark-cassandra-connector.git
$ cd spark-cassandra-connector
$ sbt/sbt assembly
$ $SPARK_HOME/bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/connector-assembly-1.2.0-SNAPSHOT.jar
В приглашении scala
scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "my cassandra host")
scala> val sc = new SparkContext("spark://spark host:7077", "test", conf)
Ответ 2
Изменить: теперь немного легче
Для углубленных инструкций проверьте веб-сайт проекта
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md
Или не используйте Spark-Packages для загрузки библиотеки (не все версии опубликованы)
http://spark-packages.org/package/datastax/spark-cassandra-connector
> $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3-s_2.10
Предполагается, что вы работаете с OSS Apache C *
Вам нужно запустить класс с набором -driver-class-path, чтобы включить все ваши соединительные библиотеки
Я приведу сообщение в блоге из знаменитого Аль Тоби
Самый простой способ найти Ive - это установить путь к классу с помощью перезапустите контекст в REPL с необходимыми классами, импортированными в сделать sc.cassandraTable() видимым. Новые загруженные методы не будут отображаться при завершении табуляции. Я не знаю, почему.
/opt/spark/bin/spark-shell --driver-class-path $(echo /path/to/connector/*.jar |sed 's/ /:/g')
Он выведет пустую информацию журнала, а затем представит приглашение scala > .
scala> sc.stop
Теперь, когда контекст остановлен, его время для импорта соединителя.
scala> import com.datastax.spark.connector._
scala> val conf = new SparkConf()
scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com")
scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
scala> val table = sc.cassandraTable("keyspace", "table")
scala> table.count
Если вы работаете с DSE < 4.5.1
Существует небольшая проблема с DSE Classloader и предыдущими соглашениями об именах пакетов, которые не позволят вам найти новые библиотеки искрового коннектора. Вы можете обойти это, удалив строку, указывающую загрузчик класса DSE, в скриптах, запускающих искровую оболочку.
Ответ 3
Если вы хотите избежать остановки/начала контекста в оболочке, вы также можете добавить его в свои свойства искры в:
{spark_install}/conf/spark-defaults.conf
spark.cassandra.connection.host=192.168.10.10
Ответ 4
Чтобы получить доступ к Cassandra из искровой оболочки, я построил сборку из искрового драйвера cassandra со всеми зависимостями ( "uberjar" ). Предоставляя его искровой оболочке с помощью опции -jars, например:
spark-shell --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar
Я столкнулся с той же проблемой, описанной здесь, и этот метод является простым и удобным (вместо загрузки длинного списка зависимостей)
Я создал сущность с файлом POM, который вы можете скачать. Используя pom для создания uberjar, вы должны сделать:
mvn package
Если вы используете sbt, загляните в плагин sbt-assembly.
Ответ 5
Следующие шаги описывают, как настроить сервер как с помощью Spark Node, так и с Cassandra Node.
Настройка Искры с открытым исходным кодом
Это предполагает, что у вас уже есть настройка Cassandra.
Шаг 1: Загрузка и настройка Spark
Go to http://spark.apache.org/downloads.html.
a) Чтобы сделать вещи простыми, мы будем использовать один из готовых пакетов Spark.
Выберите Spark версии 2.0.0 и предварительно построенный для Hadoop 2.7, затем Direct Download. Это загрузит архив со встроенными двоичными файлами для Spark.
b) Извлеките это в каталог по вашему выбору. Я поставлю свое в ~/apps/spark-1.2
c) Test Spark работает, открыв оболочку
Шаг 2: Проверьте, что Spark Works
a) cd в каталог Spark
Запустите "./bin/spark-shell". Это откроет интерактивную программу оболочки Spark
b) Если все сработало, оно должно отобразить это приглашение: "scala > "
Запустите простой расчет:
sc.parallelize(от 1 до 50).sum(+)
который должен выводить 1250.
c) Поздравляем Спарк работает!
Выйдите из оболочки Spark с командой "exit"
Разъем Spark Cassandra
Чтобы подключить Spark к кластеру Cassandra, соединитель Cassandra нужно будет добавить в проект Spark. DataStax предоставляет собственный коннектор Cassandra на GitHub, и мы будем использовать его.
Это должно выводить скомпилированные файлы jar в каталог с именем "target". Будут два файла jar, один для Scala и один для Java.
Мы заинтересованы в следующем: "spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar" для Scala.
Переместите файл jar в удобный каталог: я поместил свой файл в ~/apps/spark-1.2/jars
Чтобы загрузить соединитель в оболочку искры:
запустите оболочку с помощью этой команды:
../bin/spark-shell -jars ~/Приложения/искровой 1.2/банки/искровая Cassandra-разъем сборка-1.1.1-SNAPSHOT.jar
Соедините контекст искры с кластером Cassandra и остановите контекст по умолчанию:
sc.stop
Импортируйте необходимые файлы jar:
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
Создайте новый SparkConf с деталями соединения Cassandra:
val conf = новый SparkConf (true).set( "spark.cassandra.connection.host", "Локальный" )
Создайте новый контекст искры:
val sc = новый SparkContext (conf)
Теперь у вас есть новый SparkContext, который связан с вашим кластером Cassandra.
Ответ 6
Spark-Cassandra-Connector Полный код в JAVA с окном-7,8,10 Полезно.
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import spark_conn.Spark_connection;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App implements Serializable
{
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
compute(sc);
showResults(sc);
sc.stop();
}
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
try{
Session session=connector.openSession();
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products
(id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales
(id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries
(product INT PRIMARY KEY, summary DECIMAL)");
}catch(Exception e){System.out.println(e);}
// Prepare the products hierarchy
List<Product> products = Arrays.asList(
new Product(0, "All products", Collections.<Integer>emptyList()),
new Product(1, "Product A", Arrays.asList(0)),
new Product(4, "Product A1", Arrays.asList(0, 1)),
new Product(5, "Product A2", Arrays.asList(0, 1)),
new Product(2, "Product B", Arrays.asList(0)),
new Product(6, "Product B1", Arrays.asList(0, 2)),
new Product(7, "Product B2", Arrays.asList(0, 2)),
new Product(3, "Product C", Arrays.asList(0)),
new Product(8, "Product C1", Arrays.asList(0, 3)),
new Product(9, "Product C2", Arrays.asList(0, 3))
);
JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD, Product.class).
saveToCassandra("java_api", "products");
JavaRDD<Sale> salesRDD = productsRDD.filter
(new Function<Product, Boolean>() {
@Override
public Boolean call(Product product) throws Exception {
return product.getParents().size() == 2;
}
}).flatMap(new FlatMapFunction<Product, Sale>() {
@Override
public Iterable<Sale> call(Product product) throws Exception {
Random random = new Random();
List<Sale> sales = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
sales.add(new Sale(UUID.randomUUID(),
product.getId(), BigDecimal.valueOf(random.nextDouble())));
}
return sales;
}
});
javaFunctions(salesRDD, Sale.class).saveToCassandra
("java_api", "sales");
}
private void compute(JavaSparkContext sc) {
JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
.cassandraTable("java_api", "products", Product.class)
.keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
.cassandraTable("java_api", "sales", Sale.class)
.keyBy(new Function<Sale, Integer>() {
@Override
public Integer call(Sale sale) throws Exception {
return sale.getProduct();
}
});
JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);
JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
@Override
public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
Tuple2<Sale, Product> saleWithProduct = input._2();
List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
for (Integer parentProduct : saleWithProduct._2().getParents()) {
allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
}
return allSales;
}
});
JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
@Override
public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
return v1.add(v2);
}
}).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
@Override
public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
return new Summary(input._1(), input._2());
}
});
javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
}
private void showResults(JavaSparkContext sc) {
JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
.cassandraTable("java_api", "summaries", Summary.class)
.keyBy(new Function<Summary, Integer>() {
@Override
public Integer call(Summary summary) throws Exception {
return summary.getProduct();
}
});
JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
.cassandraTable("java_api", "products", Product.class)
.keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();
for (Tuple2<Product, Optional<Summary>> result : results) {
System.out.println(result);
}
}
public static void main(String[] args) {
// if (args.length != 2) {
// System.err.println("Syntax: com.datastax.spark.demo.App <Spark Master URL> <Cassandra contact point>");
// System.exit(1);
// }
// SparkConf conf = new SparkConf(true)
// .set("spark.cassandra.connection.host", "127.0.1.1")
// .set("spark.cassandra.auth.username", "cassandra")
// .set("spark.cassandra.auth.password", "cassandra");
//SparkContext sc = new SparkContext("spark://127.0.1.1:9045", "test", conf);
//return ;
/* try{
SparkConf conf = new SparkConf(true);
conf.setAppName("Spark-Cassandra Integration");
conf.setMaster("yarn-cluster");
conf.set("spark.cassandra.connection.host", "192.168.1.200");
conf.set("spark.cassandra.connection.rpc.port", "9042");
conf.set("spark.cassandra.connection.timeout_ms", "40000");
conf.set("spark.cassandra.read.timeout_ms", "200000");
System.out.println("Hi.......Main Method1111...");
conf.set("spark.cassandra.auth.username","cassandra");
conf.set("spark.cassandra.auth.password","cassandra");
System.out.println("Connected Successful...!\n");
App app = new App(conf);
app.run();
}catch(Exception e){System.out.println(e);}*/
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
// conf.setMaster(args[0]);
// conf.set("spark.cassandra.connection.host", args[1]);
conf.setMaster("spark://192.168.1.117:7077");
conf.set("spark.cassandra.connection.host", "192.168.1.200");
conf.set("spark.cassandra.connection.port", "9042");
conf.set("spark.ui.port","4040");
conf.set("spark.cassandra.auth.username","cassandra");
conf.set("spark.cassandra.auth.password","cassandra");
App app = new App(conf);
app.run();
}
public static class Product implements Serializable {
private Integer id;
private String name;
private List<Integer> parents;
public Product() { }
public Product(Integer id, String name, List<Integer> parents) {
this.id = id;
this.name = name;
this.parents = parents;
}
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public List<Integer> getParents() { return parents; }
public void setParents(List<Integer> parents) { this.parents = parents; }
@Override
public String toString() {
return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
}
}
public static class Sale implements Serializable {
private UUID id;
private Integer product;
private BigDecimal price;
public Sale() { }
public Sale(UUID id, Integer product, BigDecimal price) {
this.id = id;
this.product = product;
this.price = price;
}
public UUID getId() { return id; }
public void setId(UUID id) { this.id = id; }
public Integer getProduct() { return product; }
public void setProduct(Integer product) { this.product = product; }
public BigDecimal getPrice() { return price; }
public void setPrice(BigDecimal price) { this.price = price; }
@Override
public String toString() {
return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
}
}
public static class Summary implements Serializable {
private Integer product;
private BigDecimal summary;
public Summary() { }
public Summary(Integer product, BigDecimal summary) {
this.product = product;
this.summary = summary;
}
public Integer getProduct() { return product; }
public void setProduct(Integer product) { this.product = product; }
public BigDecimal getSummary() { return summary; }
public void setSummary(BigDecimal summary) { this.summary = summary; }
@Override
public String toString() {
return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
}
}
}