Генератор/блок для преобразования итератора/потока
В основном я хочу преобразовать это:
def data(block: T => Unit)
для потока (dataToStream - это гипотетическая функция, которая выполняет это преобразование):
val dataStream: Stream[T] = dataToStream(data)
Я полагаю, что эта проблема может быть решена продолжениями:
// let assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }
// here we can print all data integers
data { i => println(i) }
// >> but what we really want is to convert data to the stream <<
// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream
// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }
// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?
Спасибо, Давид
Ответы
Ответ 1
EDITED: Изменены примеры, показывающие лень traversable.view
scala> def data(f : Int => Unit) = for(i <- 1 to 10) {
| println("Generating " + i)
| f(i)
| }
data: (f: (Int) => Unit)Unit
scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
| def foreach[X]( f : T => X) = func(f(_) : Unit)
| }
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]
Метод toTraversable преобразует вашу функцию данных в коллекцию Traversable. Сам по себе он ничего не громадный, но вы можете преобразовать его в TraversableView, который ленив. Вот пример:
scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6
Несчастный характер метода принятия заключается в том, что он должен идти мимо последнего значения, сгенерированного для правильной работы, но оно будет прекращено раньше. Вышеупомянутый код будет выглядеть одинаково без вызова ".view". Однако здесь более убедительный пример:
scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3
Итак, в заключение, я считаю, что коллекция, которую вы ищете, - TraversableView, которая проще всего создать представление, создающее Traversable, а затем называя его "view". Если вам действительно нужен тип Stream, вот метод, который работает в 2.8.0.final и сделает "Stream" без потоков:
scala> def dataToStream( data : (Int => Unit) => Unit) = {
| val x = new Traversable[Int] {
| def foreach[U](f : Int => U) = {
| data( f(_) : Unit)
| }
| }
| x.view.toList.toStream
| }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]
scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)
Несчастный характер этого метода заключается в том, что он будет перебирать всю проходимую перед созданием потока. Это также означает, что все значения должны буферизироваться в памяти. Единственная альтернатива - прибегать к потокам.
В стороне: это была мотивационная причина предпочитать Traversables как прямую отдачу от методов scalax.io.File: "lines" "chars" и "bytes".
Ответ 2
Здесь простое решение, которое порождает поток, который потребляет данные. Он отправляет данные в SynchronousQueue. Создается и возвращается поток, который извлекает данные из очереди:
def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
val callbackthread = new Runnable {
def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
}
new Thread(callbackthread).start()
Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}
Ответ 3
Мне еще нужно выяснить, как это сделать. Я подозреваю, что ответ лежит где-то здесь:
Изменить: удаленный код, который показал, как решить другую проблему.
Edit2: Используя код http://gist.github.com/580157, который был первоначально отправлен http://gist.github.com/574873, вы можете сделать это:
object Main {
import Generator._
def data = generator[Int] { yld =>
for (i <- suspendable(List.range(0, 11))) yld(i)
}
def main(args: Array[String]) {
for( i <- data.toStream ) println(i)
}
}
data
не принимает блок-код, но я думаю, что это нормально, потому что с продолжением блок может обрабатываться вызывающим. Код для генератора можно увидеть в gistub на gistub.
Ответ 4
Здесь реализована ограниченная реализация на основе продолжения, адаптированная из @Geoff Reedy, предлагающая:
import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue
def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
val queue = new SynchronousQueue[Option[A]]
queue.put(Some(shift { k: (A=>Unit) =>
new Thread() {
override def run() {
data(k)
// when (if) the data source stops pumping, add None
// to signal that the stream is dead
queue.put(None)
}
}.start()
continually(queue.take).takeWhile(_.isDefined).map(_.get)
})
}