Есть ли в потоке памяти, который блокируется как поток файлов
Я использую библиотеку, для которой требуется предоставить объект, реализующий этот интерфейс:
public interface IConsole {
TextWriter StandardInput { get; }
TextReader StandardOutput { get; }
TextReader StandardError { get; }
}
Затем читатели объектов будут использоваться библиотекой с помощью:
IConsole console = new MyConsole();
int readBytes = console.StandardOutput.Read(buffer, 0, buffer.Length);
Обычно класс, реализующий IConsole, имеет поток StandardOutput, исходящий из внешнего процесса. В этом случае console.StandardOutput.Read вызывает работу, блокируя до тех пор, пока не будут записаны некоторые данные в поток StandardOutput.
То, что я пытаюсь сделать, это создать тестовую реализацию IConsole, которая использует MemoryStreams и повторить все, что появляется в StandardInput, на StandardInput. Я пробовал:
MemoryStream echoOutStream = new MemoryStream();
StandardOutput = new StreamReader(echoOutStream);
Но проблема в том, что это console.StandardOutput.Read вернет 0, а не заблокирует, пока не появится некоторая информация. Есть ли в любом случае, я могу заставить MemoryStream блокировать, если нет данных или есть другой в потоке памяти, который я мог бы использовать?
Ответы
Ответ 1
В конце концов, я нашел простой способ сделать это, наследуя от MemoryStream и перехватывая методы чтения и записи.
public class EchoStream : MemoryStream {
private ManualResetEvent m_dataReady = new ManualResetEvent(false);
private byte[] m_buffer;
private int m_offset;
private int m_count;
public override void Write(byte[] buffer, int offset, int count) {
m_buffer = buffer;
m_offset = offset;
m_count = count;
m_dataReady.Set();
}
public override int Read(byte[] buffer, int offset, int count) {
if (m_buffer == null) {
// Block until the stream has some more data.
m_dataReady.Reset();
m_dataReady.WaitOne();
}
Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, (count < m_count) ? count : m_count);
m_buffer = null;
return (count < m_count) ? count : m_count;
}
}
Ответ 2
Вдохновленный вашим ответом, здесь моя многопоточная, многозадачная версия:
public class EchoStream : MemoryStream
{
private readonly ManualResetEvent _DataReady = new ManualResetEvent(false);
private readonly ConcurrentQueue<byte[]> _Buffers = new ConcurrentQueue<byte[]>();
public bool DataAvailable{get { return !_Buffers.IsEmpty; }}
public override void Write(byte[] buffer, int offset, int count)
{
_Buffers.Enqueue(buffer);
_DataReady.Set();
}
public override int Read(byte[] buffer, int offset, int count)
{
_DataReady.WaitOne();
byte[] lBuffer;
if (!_Buffers.TryDequeue(out lBuffer))
{
_DataReady.Reset();
return -1;
}
if (!DataAvailable)
_DataReady.Reset();
Array.Copy(lBuffer, buffer, lBuffer.Length);
return lBuffer.Length;
}
}
С вашей версией вы должны прочитать Stream on Write, без какой-либо последующей записи. Моя версия буферизует любой письменный буфер в ConcurrentQueue (довольно просто изменить его на простое Queue и заблокировать его)
Ответ 3
Я собираюсь добавить еще одну улучшенную версию EchoStream. Это сочетание двух других версий, а также некоторые предложения из комментариев.
ОБНОВЛЕНИЕ - Я проверил этот EchoStream с более чем 50 терабайтами данных, проходящими через него в течение нескольких дней подряд. Тест проводился между сетевым потоком и потоком сжатия ZStandard. Асинхронность также была проверена, что привело к появлению редких условий зависания на поверхности. Похоже, что встроенный System.IO.Stream не ожидает, что он будет одновременно вызывать ReadAsync и WriteAsync в одном и том же потоке, что может привести к зависанию, если нет доступных данных, поскольку оба вызова используют один и тот же внутренний переменные. Поэтому мне пришлось переопределить те функции, которые решили проблему зависания.
В этой версии следующие функции.
1) Это было написано с нуля с использованием базового класса System.IO.Stream вместо MemoryStream.
2) Конструктор может установить максимальную глубину очереди, и если этот уровень достигнут, тогда запись потока будет блокироваться до тех пор, пока не будет выполнено чтение, которое опустит глубину очереди ниже максимального уровня (без ограничения = 0, по умолчанию = 10).
3) При чтении/записи данных смещение буфера и число теперь учитываются. Кроме того, вы можете вызвать Read с меньшим буфером, чем Write, не выбрасывая исключение и не теряя данные. BlockCopy используется в цикле для заполнения байтов до тех пор, пока счетчик не будет удовлетворен.
4) Существует открытое свойство AlwaysCopyBuffer, которое создает копию буфера в функции Write. Установка этого в true безопасно позволит байтовому буферу быть повторно использован после вызова Write.
5) Существует открытое свойство ReadTimeout/WriteTimeout, которое контролирует, как долго функция чтения/записи будет блокироваться, прежде чем она вернет 0 (по умолчанию = бесконечность, -1).
6) Используется класс BlockingQueue, который объединяет классы ConcurrentQueue и AutoResetEvent. В классе ConcurrentQueue существует редкое условие, когда вы обнаружите, что после того, как данные были помещены в очередь, они не доступны сразу, когда AutoResetEvent пропускает поток в Read(). Это происходит примерно один раз каждые 500 ГБ данных, которые проходят через него. Лекарство заключается в том, чтобы выспаться и снова проверить данные. Иногда Sleep (0) работает, но в крайних случаях, когда загрузка процессора была высокой, он может достигать Sleep (1000) до того, как данные появятся. ConcurrentQueue обрабатывает все это без каких-либо проблем.
7) Это было проверено, чтобы быть потокобезопасным для одновременного чтения и записи асинхронного.
using System;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
public class EchoStream : Stream
{
public override bool CanTimeout { get; } = true;
public override int ReadTimeout { get; set; } = Timeout.Infinite;
public override int WriteTimeout { get; set; } = Timeout.Infinite;
public override bool CanRead { get; } = true;
public override bool CanSeek { get; } = false;
public override bool CanWrite { get; } = true;
public bool CopyBufferOnWrite { get; set; } = false;
private readonly object _lock = new object();
// Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
private readonly BlockingCollection<byte[]> _Buffers;
private int _maxQueueDepth = 10;
private byte[] m_buffer = null;
private int m_offset = 0;
private int m_count = 0;
private bool m_Closed = false;
public override void Close()
{
m_Closed = true;
// release any waiting writes
_Buffers.CompleteAdding();
}
public bool DataAvailable
{
get
{
return _Buffers.Count > 0;
}
}
private long _Length = 0L;
public override long Length
{
get
{
return _Length;
}
}
private long _Position = 0L;
public override long Position
{
get
{
return _Position;
}
set
{
throw new NotImplementedException();
}
}
public EchoStream() : this(10)
{
}
public EchoStream(int maxQueueDepth)
{
_maxQueueDepth = maxQueueDepth;
_Buffers = new BlockingCollection<byte[]>(_maxQueueDepth);
}
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
public new Task WriteAsync(byte[] buffer, int offset, int count)
{
return Task.Run(() => Write(buffer, offset, count));
}
// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
public new Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
return Task.Run(() =>
{
return Read(buffer, offset, count);
});
}
public override void Write(byte[] buffer, int offset, int count)
{
if (m_Closed || buffer.Length - offset < count || count <= 0)
return;
byte[] newBuffer;
if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
newBuffer = buffer;
else
{
newBuffer = new byte[count];
System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
}
if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
throw new TimeoutException("EchoStream Write() Timeout");
_Length += count;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0)
return 0;
lock (_lock)
{
if (m_count == 0 && _Buffers.Count == 0)
{
if (m_Closed)
return -1;
if (_Buffers.TryTake(out m_buffer, ReadTimeout))
{
m_offset = 0;
m_count = m_buffer.Length;
}
else
return m_Closed ? -1 : 0;
}
int returnBytes = 0;
while (count > 0)
{
if (m_count == 0)
{
if (_Buffers.TryTake(out m_buffer, 0))
{
m_offset = 0;
m_count = m_buffer.Length;
}
else
break;
}
var bytesToCopy = (count < m_count) ? count : m_count;
System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
m_offset += bytesToCopy;
m_count -= bytesToCopy;
offset += bytesToCopy;
count -= bytesToCopy;
returnBytes += bytesToCopy;
}
_Position += returnBytes;
return returnBytes;
}
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
}