Использовать Reactive Extensions (Rx) для программирования сокетов?
Каков наиболее сукцинный способ записи функции GetMessages
с помощью Rx:
static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));
Console.ReadKey();
}
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it length.
// the rest of the message is a string
// ????????????? Now What ?????????????
}
Простой сервер как драйвер для вышеуказанного примера: http://gist.github.com/452893#file_program.cs
Использование Rx для программирования сокетов
Я изучаю использование Reactive Extensions для какой-либо работы по программированию сокетов, которую я выполняю. Моей мотивацией для этого было бы то, что это каким-то образом упростит код. Будет ли это означать меньше кода, чем меньше вложенных в эти строки.
Однако пока это не выглядит так:
- Я не нашел очень много примеров использования Rx с сокетами
- пример
s Я нашел не менее сложный, чем мой существующий код BeginXXXX, EndXXXX
- Хотя
Observable
имеет методы расширения для FromAsyncPattern, это не распространяется на SocketEventArgs
Async API.
Текущее нерабочее решение
Вот что я до сих пор. Это не работает, он терпит неудачу с переполнением стека (heh). Я не понял семантики, чтобы создать IObservable
, который будет читать указанное количество байтов.
static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();
var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}
Ответы
Ответ 1
Что-то в этих строках могло бы работать. Это не проверено, не учитывает исключения и случай, когда сообщение возвращается частично. Но в остальном я считаю, что это правильное направление.
public static IObservable<T> GetSocketData<T>(this Socket socket,
int sizeToRead, Func<byte[], T> valueExtractor)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var readSize = Observable
.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
socket.BeginReceive,
socket.EndReceive);
var buffer = new byte[sizeToRead];
return readSize(buffer, 0, sizeToRead, SocketFlags.None)
.Subscribe(
x => observer.OnNext(valueExtractor(buffer)),
observer.OnError,
observer.OnCompleted);
});
}
public static IObservable<int> GetMessageSize(this Socket socket)
{
return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
}
public static IObservable<string> GetMessageBody(this Socket socket,
int messageSize)
{
return socket.GetSocketData(messageSize, buf =>
Encoding.UTF8.GetString(buf, 0, messageSize));
}
public static IObservable<string> GetMessage(this Socket socket)
{
return
from size in socket.GetMessageSize()
from message in Observable.If(() => size != 0,
socket.GetMessageBody(size),
Observable.Return<string>(null))
select message;
}
public static IObservable<string> GetMessagesFromConnected(
this Socket socket)
{
return socket
.GetMessage()
.Repeat()
.TakeWhile(msg => !string.IsNullOrEmpty(msg));
}
public static IObservable<string> GetMessages(this Socket socket,
IPAddress addr, int port)
{
return Observable.Defer(() =>
{
var whenConnect = Observable
.FromAsyncPattern<IPAddress, int>(
socket.BeginConnect, socket.EndConnect);
return from _ in whenConnect(addr, port)
from msg in socket.GetMessagesFromConnected()
.Finally(socket.Close)
select msg;
});
}
Изменить: обрабатывать неполные чтения, Observable.While можно использовать (в GetSockedData), как предложил Дейв Секстон в тот же поток на форуме RX.
Изменить: Также взгляните на эту статью Джеффри Ван Гога: Асинхронное чтение System.IO.Stream
Ответ 2
Хорошо, так что это, пожалуй, "обман", но я полагаю, вы могли бы повторно использовать мой не-Rx ответ и обернуть его с помощью Observable.Create
.
Я уверен, что возвращение сокета в качестве IDisposable
является неправильной семантикой, но не уверен, что будет.
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
return Observable.CreateWithDisposable<string>(
o =>
{
byte[] buffer = new byte[1024];
Action<int, Action<int>> readIntoBuffer = (length, callback) =>
{
var totalRead = 0;
AsyncCallback receiveCallback = null;
AsyncCallback temp = r =>
{
var read = socket.EndReceive(r);
if (read == 0)
{
socket.Close();
o.OnCompleted();
return;
}
totalRead += read;
if (totalRead < length)
{
socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null);
}
else
{
callback(length);
}
};
receiveCallback = temp;
socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null);
};
Action<int> sizeRead = null;
Action<int> messageRead = x =>
{
var message = Encoding.UTF8.GetString(buffer, 0, x);
o.OnNext(message);
readIntoBuffer(4, sizeRead);
};
Action<int> temp2 = x =>
{
var size = BitConverter.ToInt32(buffer, 0);
readIntoBuffer(size, messageRead);
};
sizeRead = temp2;
AsyncCallback connectCallback = r =>
{
socket.EndConnect(r);
readIntoBuffer(4, sizeRead);
};
socket.BeginConnect(addr, port, connectCallback, null);
return socket;
});
}