Есть ли поток FIFO в Scala?
Я ищу поток FIFO в Scala, то есть что-то, что обеспечивает функциональность
- immutable.Stream (поток, который может быть конечным и запоминает элементы, которые уже были прочитаны)
- mutable.Queue (который позволяет добавлять элементы в FIFO)
Поток должен быть закрыт и должен блокировать доступ к следующему элементу до тех пор, пока элемент не будет добавлен или поток не будет закрыт.
На самом деле я немного удивлен тем, что библиотека коллекции не (кажется) не включает такую структуру данных, так как это ИМО довольно классическая.
Мои вопросы:
-
1) Я что-то упустил? Есть ли класс, предоставляющий эту функциональность?
-
2) Хорошо, если он не включен в библиотеку коллекции, то он может быть просто тривиальной комбинацией существующих классов коллекции. Тем не менее, я попытался найти этот тривиальный код, но моя реализация выглядит довольно сложной для такой простой проблемы. Есть ли более простое решение для такого FifoStream?
class FifoStream[T] extends Closeable {
val queue = new Queue[Option[T]]
lazy val stream = nextStreamElem
private def nextStreamElem: Stream[T] = next() match {
case Some(elem) => Stream.cons(elem, nextStreamElem)
case None => Stream.empty
}
/** Returns next element in the queue (may wait for it to be inserted). */
private def next() = {
queue.synchronized {
if (queue.isEmpty) queue.wait()
queue.dequeue()
}
}
/** Adds new elements to this stream. */
def enqueue(elems: T*) {
queue.synchronized {
queue.enqueue(elems.map{Some(_)}: _*)
queue.notify()
}
}
/** Closes this stream. */
def close() {
queue.synchronized {
queue.enqueue(None)
queue.notify()
}
}
}
Парадигматическое решение (видимо модифицированное)
Спасибо за ваши предложения. Я слегка модифицировал парадигматическое решение, так что toStream возвращает неизменный поток (допускает повторяемые чтения), чтобы он соответствовал моим потребностям. Для полноты, вот код:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
Ответы
Ответ 1
В Scala потоки являются "функциональными итераторами". Люди ожидают, что они будут чистыми (без побочных эффектов) и непреложными. В вашем случае, каждый раз, когда вы выполняете итерацию в потоке, вы изменяете очередь (поэтому она не чиста). Это может создать много недоразумений, поскольку повторение в два раза того же потока будет иметь два разных результата.
При этом вам следует скорее использовать Java BlockingQueues, а не выполнять собственную реализацию. Они считаются хорошо реализованными с точки зрения безопасности и производительности. Вот самый чистый код, который я могу придумать (используя ваш подход):
import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
def toStream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, toStream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
object FIFOStream {
def apply[A]() = new LinkedBlockingQueue
}
Ответ 2
Я предполагаю, что вы ищете что-то вроде java.util.concurrent.BlockingQueue?
Akka имеет BoundedBlockingQueue реализацию этого интерфейса. Конечно, существуют версии, доступные в java.util.concurrent.
Вы также можете использовать Akka actors за все, что вы делаете. Использовать Actors для уведомления или нажатия нового события или сообщения вместо того, чтобы тянуть.
Ответ 3
1) Кажется, вы ищете поток потока данных, который отображается на таких языках, как Oz, который поддерживает шаблон производителя-потребителя. Такая коллекция недоступна в API коллекций, но вы всегда можете создать ее самостоятельно.
2) Поток потока данных основан на концепции переменных с одним присваиванием (чтобы их не нужно было инициализировать после объявления точка и чтение их до инициализации вызывает блокировку):
val x: Int
startThread {
println(x)
}
println("The other thread waits for the x to be assigned")
x = 1
Было бы просто реализовать такой поток, если бы в нем поддерживались переменные с одним присваиванием (или потоком данных) (см. ссылку ), Поскольку они не являются частью Scala, вы должны использовать шаблон wait
- synchronized
- notify
так же, как и вы.
Параллельные очереди из Java могут быть использованы для достижения этого, как и предложил другой пользователь.