Асинхронное повторное подключение клиента к серверу в бесконечном цикле

Я не могу создать клиента, который пытается подключиться к серверу, и:

  • Если сервер не работает, он должен повторить попытку в бесконечном цикле
  • Если сервер встал и соединение прошло успешно, когда соединение потеряно (например, сервер отключает клиент), клиент должен перезапустить бесконечный цикл, чтобы попытаться подключиться к серверу.

Здесь код для подключения к серверу; в настоящее время, когда соединение потеряно, программа завершает работу. Я не уверен, что лучший способ его реализовать; возможно, мне нужно создать Future с бесконечным циклом?

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle();                                                                                                                                                                                     
    let client = get_connection(&handle);                                                                                                                                                                           

    let client = client.and_then(|c| {                                                                                                                                                                              
        println!("Try to reconnect");                                                                                                                                                                               
        get_connection(&handle);                                                                                                                                                                                    
        Ok(())                                                                                                                                                                                                      
    });                                                                                                                                                                                                             

    core.run(client).unwrap();                                                                                                                                                                                      
}

Добавьте ящик tokio-line с помощью:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }

Ответы

Ответ 1

Ключевой вопрос: как мне реализовать бесконечный цикл с помощью Tokio? Отвечая на этот вопрос, мы можем решить проблему бесконечного присоединения к отключению. Из моего опыта написания асинхронного кода рекурсия кажется прямым решением этой проблемы.

UPDATE: как указал Шепмастер (и люди Токио Гиттера), мой первоначальный ответ утечки памяти, поскольку мы строим цепочку фьючерсов, которая растет на каждой итерации. Здесь следует новый:

Обновленный ответ: используйте loop_fn

В ящике futures есть функция, которая делает именно то, что вам нужно. Он называется loop_fn. Вы можете использовать его, изменив основную функцию на следующее:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = future::loop_fn((), |_| {
        // Run the get_connection function and loop again regardless of its result
        get_connection(&handle).map(|_| -> Loop<(), ()> {
            Loop::Continue(())
        })
    });

    core.run(client).unwrap();
}

Функция напоминает цикл for, который может продолжаться или прерываться в зависимости от результата get_connection (см. документацию для Loop перечисление). В этом случае мы всегда будем продолжать, поэтому он будет бесконечно продолжать переподключение.

Обратите внимание, что ваша версия get_connection будет паниковать, если есть ошибка (например, если клиент не может подключиться к серверу). Если вы также захотите повторить попытку после ошибки, вы должны удалить вызов panic!.


Старый ответ: используйте рекурсию

Вот мой старый ответ, если кто-то найдет это интересным.

ПРЕДУПРЕЖДЕНИЕ: использование приведенного ниже кода приводит к неограниченному росту памяти.

Выполнение бесконечного цикла get_connection

Мы хотим вызвать функцию get_connection каждый раз, когда клиент отключен, так что это именно то, что мы собираемся делать (посмотрите на комментарий после reader.and_then):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);
    let handle_clone = handle.clone();

    let client = tcp.and_then(|stream| {
        let (sink, from_server) = stream.framed(LineCodec).split();
        let reader = from_server.for_each(|message| {
            println!("{}", message);
            Ok(())
        });

        reader.and_then(move |_| {
            println!("CLIENT DISCONNECTED");
            // Attempt to reconnect in the future
            get_connection(&handle_clone)
        })
    });

    let client = client.map_err(|_| { panic!()});
    Box::new(client)
}

Помните, что get_connection не блокирует. Он просто конструирует a Box<Future>. Это означает, что при вызове рекурсивно мы все равно не блокируем. Вместо этого мы получаем новое будущее, которое мы можем связать с предыдущим, используя and_then. Как вы можете видеть, это отличается от обычной рекурсии, так как стек не растет на каждой итерации.

Обратите внимание, что нам нужно клонировать handle (см. handle_clone) и переместить его в закрытие, переданное в reader.and_then. Это необходимо, потому что закрытие будет жить дольше, чем функция (она будет содержаться в будущем, мы возвращаемся).

Обработка ошибок

Предоставленный вами код не обрабатывает случай, когда клиент не может подключиться к серверу (и никаких других ошибок). Следуя описанному выше принципу, мы можем обрабатывать ошибки, изменяя конец get_connection на следующее:

let handle_clone = handle.clone();
let client = client.or_else(move |err| {
    // Note: this code will infinitely retry, but you could pattern match on the error
    // to retry only on certain kinds of error
    println!("Error connecting to server: {}", err);
    get_connection(&handle_clone)
});
Box::new(client)

Обратите внимание, что or_else похож на and_then, но он работает с ошибкой, создаваемой будущим.

Удаление ненужного кода из main

Наконец, нет необходимости использовать and_then в функции main. Вы можете заменить main на следующий код:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = get_connection(&handle);
    core.run(client).unwrap();
}