Итерации по строкам в файле параллельно (Scala)?
Я знаю о параллельных коллекциях в Scala. Они удобны! Тем не менее, я хотел бы перебирать строки файла, который слишком велик для памяти параллельно. Например, я мог бы создавать потоки и настраивать блокировку над сканером, но было бы здорово, если бы я мог запускать код, например:
Source.fromFile(path).getLines.par foreach { line =>
К сожалению, однако
error: value par is not a member of Iterator[String]
Каков самый простой способ выполнить некоторые parallelism здесь? Пока что я буду читать в некоторых строках и обрабатывать их параллельно.
Ответы
Ответ 1
Вы можете использовать группировку, чтобы легко нарезать итератор на куски, которые вы можете загрузить в память, а затем обрабатывать параллельно.
val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines =>
lines.par.foreach { line => process(line) }
}
По-моему, это самый простой способ сделать это.
Ответ 2
Я положу это как отдельный ответ, поскольку он принципиально отличается от моего последнего (и он действительно работает)
Здесь описывается решение с участием актеров, которое в основном описывает комментарий Ким Стебеля. Есть два класса актеров, один актер FileReader, который читает отдельные строки из файла по требованию и несколько действующих лиц. Рабочие все отправляют запросы на строки читателю и обрабатывают строки параллельно, поскольку они считываются из файла.
Я использую актеров Akka здесь, но использование другой реализации в основном та же идея.
case object LineRequest
case object BeginProcessing
class FileReader extends Actor {
//reads a single line from the file or returns None if EOF
def getLine:Option[String] = ...
def receive = {
case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
}
}
class Worker(reader: ActorRef) extends Actor {
def process(line:String) ...
def receive = {
case BeginProcessing => reader ! LineRequest
case Some(line) => {
process(line)
reader ! LineRequest
}
case None => self.stop
}
}
val reader = actorOf[FileReader].start
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...
Таким образом, не более 4 (или хотя бы у многих работающих у вас) необработанных строк в памяти одновременно.
Ответ 3
Комментарии Дэна Саймона заставили меня задуматься. Почему мы не пытаемся обернуть источник в потоке:
def src(source: Source) = Stream[String] = {
if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString)
else Stream.empty
}
Затем вы можете использовать его параллельно:
src(Source.fromFile(path)).par foreach process
Я пробовал это, и он компилируется и запускается в любом случае. Я не уверен, что если он загрузит весь файл в память или нет, но я не думаю, что это так.
Ответ 4
Я понимаю, что это старый вопрос, но вы можете найти реализацию ParIterator
в библиотеке iterata, чтобы быть полезной без сборки - потребовалось выполнить следующее:
scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
Ответ 5
Ниже я помог достичь
source.getLines.toStream.par.foreach( line => println(line))
Ответ 6
В итоге мы создали пользовательское решение в нашей компании, чтобы мы точно поняли parallelism.