Как выполнить повторение RDD в искры apache (scala)
Я использую следующую команду, чтобы заполнить RDD множеством массивов, содержащих 2 строки [ "filename", "content" ].
Теперь я хочу перебирать все эти вхождения, чтобы что-то делать с каждым именем и содержимым.
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
Кажется, я не могу найти документацию о том, как это сделать.
Итак, я хочу:
foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD
}
Ответы
Ответ 1
Основные операции в Spark: map
и filter
.
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
txtRDD
теперь будет содержать только файлы с расширением ".txt"
И если вы хотите, чтобы слова подсчитывали эти файлы, вы можете сказать
//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)
Вы хотите использовать mapPartitions
, когда у вас есть некоторая дорогостоящая инициализация, которую вы должны выполнить, например, если вы хотите сделать Именованное распознавание объектов с помощью библиотеки, такой как инструменты coreNLP в Stanford.
Мастер map
, filter
, flatMap
и reduce
, и вы хорошо на пути к освоению Искры.
Ответ 2
Вы вызываете различные методы на RDD, которые принимают функции как параметры.
// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)
// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))
// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)
// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))
// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)
// Print each remaining array.
bigRDD.collect().foreach(a => {
a.foreach(e => print(e + " "))
println()
})
}
Обратите внимание, что функции, которые вы пишете, принимают один элемент RDD в качестве входных данных и возвращают данные некоторого однородного типа, поэтому вы создаете RDD последнего типа. Например, countRDD
является RDD[Int]
, а bigRDD
по-прежнему является RDD[Array[Int]]
.
В какой-то момент, вероятно, возникнет соблазн написать foreach
, который изменяет некоторые другие данные, но вы должны сопротивляться по причинам, описанным в этом вопросе и ответе.
Изменить: не пытайтесь печатать большие RDD
s
Несколько читателей попросили об использовании collect()
и println()
, чтобы увидеть их результаты, как в приведенном выше примере. Конечно, это работает только в том случае, если вы работаете в интерактивном режиме, таком как Spark REPL (read-eval-print-loop.) Лучше всего называть collect()
на RDD, чтобы получить последовательный массив для упорядоченной печати. Но collect()
может вернуть слишком много данных, и в любом случае слишком много может быть напечатано. Вот несколько альтернативных способов получить представление о вашем RDD
, если они большие:
-
RDD.take()
: Это дает вам прекрасный контроль над количеством элементов, которые вы получаете, но не там, откуда они появились, - определяется как "первый", который представляет собой концепцию, рассматриваемую различными другими вопросами и ответами здесь,
// take() returns an Array so no need to collect()
myHugeRDD.take(20).foreach(a => println(a))
-
RDD.sample()
: Это позволяет вам (грубо) контролировать долю полученных результатов, независимо от того, использует ли выборка замещение и даже произвольно случайное число.
// sample() does return an RDD so you may still want to collect()
myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
-
RDD.takeSample()
: Это гибрид: использование случайной выборки, которую вы можете контролировать, но позволяющее указать точное количество результатов и вернуть Array
.
// takeSample() returns an Array so no need to collect()
myHugeRDD.takeSample(true, 20).foreach(a => println(a))
-
RDD.count()
: Иногда лучшая проницательность исходит из того, сколько элементов вы получили - я часто делаю это сначала.
println(myHugeRDD.count())
Ответ 3
Я бы попытался использовать функцию отображения разделов. В приведенном ниже коде показано, как весь набор данных RDD можно обрабатывать в цикле, чтобы каждый вход проходил через ту же самую функцию. Я боюсь, что у меня нет знаний о Scala, поэтому все, что я могу предложить, это java-код. Однако перевести его в scala не очень сложно.
JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){
@Override
public Iterable<String> call(Iterator <String> t) throws Exception {
ArrayList<String[]> tmpRes = new ArrayList <>();
String[] fillData = new String[2];
fillData[0] = "filename";
fillData[1] = "content";
while(t.hasNext()){
tmpRes.add(fillData);
}
return Arrays.asList(tmpRes);
}
}).cache();
Ответ 4
что возвращение wholeTextFiles
- это пара RDD:
def wholeTextFiles (путь: String, minPartitions: Int): RDD [(String, String)]
Прочитайте каталог текстовых файлов из HDFS, локальной файловой системы (доступной на всех узлах) или любого поддерживаемого Hadoop URI файловой системы. Каждый файл считывается как одна запись и возвращается в пару ключ-значение, где ключ - это путь к каждому файлу, значение - это содержимое каждого файла.
Вот пример чтения файлов по локальному пути, затем печать каждого имени файла и содержимого.
val conf = new SparkConf().setAppName("scala-test").setMaster("local")
val sc = new SparkContext(conf)
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
.collect
.foreach(t => println(t._1 + ":" + t._2));
результат:
file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}
file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}
file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}
или преобразование пары RDD в RDD сначала
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
.map(t => t._2)
.collect
.foreach { x => println(x)}
результат:
{"name":"tom","age":12}
{"name":"john","age":22}
{"name":"leon","age":18}
И я думаю, что wholeTextFiles
более подходит для небольших файлов.