Ответ 1
Ключевой вопрос: как мне реализовать бесконечный цикл с помощью Tokio? Отвечая на этот вопрос, мы можем решить проблему бесконечного присоединения к отключению. Из моего опыта написания асинхронного кода рекурсия кажется прямым решением этой проблемы.
UPDATE: как указал Шепмастер (и люди Токио Гиттера), мой первоначальный ответ утечки памяти, поскольку мы строим цепочку фьючерсов, которая растет на каждой итерации. Здесь следует новый:
Обновленный ответ: используйте loop_fn
В ящике futures
есть функция, которая делает именно то, что вам нужно. Он называется loop_fn
. Вы можете использовать его, изменив основную функцию на следующее:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = future::loop_fn((), |_| {
// Run the get_connection function and loop again regardless of its result
get_connection(&handle).map(|_| -> Loop<(), ()> {
Loop::Continue(())
})
});
core.run(client).unwrap();
}
Функция напоминает цикл for, который может продолжаться или прерываться в зависимости от результата get_connection
(см. документацию для Loop
перечисление). В этом случае мы всегда будем продолжать, поэтому он будет бесконечно продолжать переподключение.
Обратите внимание, что ваша версия get_connection
будет паниковать, если есть ошибка (например, если клиент не может подключиться к серверу). Если вы также захотите повторить попытку после ошибки, вы должны удалить вызов panic!
.
Старый ответ: используйте рекурсию
Вот мой старый ответ, если кто-то найдет это интересным.
ПРЕДУПРЕЖДЕНИЕ: использование приведенного ниже кода приводит к неограниченному росту памяти.
Выполнение бесконечного цикла get_connection
Мы хотим вызвать функцию get_connection
каждый раз, когда клиент отключен, так что это именно то, что мы собираемся делать (посмотрите на комментарий после reader.and_then
):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let handle_clone = handle.clone();
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.and_then(move |_| {
println!("CLIENT DISCONNECTED");
// Attempt to reconnect in the future
get_connection(&handle_clone)
})
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
Помните, что get_connection
не блокирует. Он просто конструирует a Box<Future>
. Это означает, что при вызове рекурсивно мы все равно не блокируем. Вместо этого мы получаем новое будущее, которое мы можем связать с предыдущим, используя and_then
. Как вы можете видеть, это отличается от обычной рекурсии, так как стек не растет на каждой итерации.
Обратите внимание, что нам нужно клонировать handle
(см. handle_clone
) и переместить его в закрытие, переданное в reader.and_then
. Это необходимо, потому что закрытие будет жить дольше, чем функция (она будет содержаться в будущем, мы возвращаемся).
Обработка ошибок
Предоставленный вами код не обрабатывает случай, когда клиент не может подключиться к серверу (и никаких других ошибок). Следуя описанному выше принципу, мы можем обрабатывать ошибки, изменяя конец get_connection
на следующее:
let handle_clone = handle.clone();
let client = client.or_else(move |err| {
// Note: this code will infinitely retry, but you could pattern match on the error
// to retry only on certain kinds of error
println!("Error connecting to server: {}", err);
get_connection(&handle_clone)
});
Box::new(client)
Обратите внимание, что or_else
похож на and_then
, но он работает с ошибкой, создаваемой будущим.
Удаление ненужного кода из main
Наконец, нет необходимости использовать and_then
в функции main
. Вы можете заменить main
на следующий код:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
core.run(client).unwrap();
}