Adrianistán

El blog de Adrián Arroyo


Futures y Tokio: programar de forma asíncrona en Rust

- Adrián Arroyo Calle

Rust es un lenguaje muy potente y uno de sus principales focos de diseño es la concurrencia. Sin embargo, si damos un repaso por la librería estándar veremos un par de cosas nada más, algo que puede resultar decepcionante. Esto se debe a que Rust no impone ningún modelo específico de concurrencia como sí lo hacen Go o Erlang. Rust nos provee de los ladrillos de construcción necesarios.

De forma estándar, Rust nos provee de Mutex, atómicos, threads, variables de condición, RwLock y un modelo algo más avanzado de mensajería mediante canales de múltiples productores y un único consumidor (mpsc). En el exterior tenemos crates como Actix que nos proveen del modelo de actores en Rust (dicen que es similar a Akka, yo lo desconozco), modelos de mensajería por canales más flexibles (mpmc) y una cosa muy interesante llamado Futures. Los futures o futuros no son esos contratos que se realizan en bolsa sobre el valor futuro de una acción, sino que son una manera cómoda de manejar valores que existirán en el futuro. Si has usado JavaScript o C# igual te suenan las Promises o Promesas y los Task respectivamente. Los futuros de Rust son exactamente lo mismo.

¿Qué es un futuro?


Un futuro es una variable que representa un dato que todavía no existe. Tenemos la promesa de que ese valor existirá en el futuro. La ventaja es que podemos usarlo aun cuando todavía no tenga un valor. Esto además permite escribir código asíncrono con facilidad.

Una diferencia con respecto a los Promises de JavaScript es que los futuros se basan en poll en vez de en push. Es decir, los futuros van a ir preguntando si el valor ya está disponible. Ante esta pregunta se puede responder con un error, con todavía no está disponible y con ya está disponible, aquí tienes.

Veamos un ejemplo muy tonto pero que puede servirnos para entender algunas cosas.
extern crate futures;

use futures::*;
use std::time;
use std::thread;

fn suma(a: i32, b: i32) -> SumFuture {
SumFuture{
a: a,
b: b
}
}

struct SumFuture{
a: i32,
b: i32
}

impl Future for SumFuture {
type Item = i32;
type Error = String;

fn poll(&mut self) -> Result<Async<i32>,String> {
thread::sleep(time::Duration::from_secs(1));
Ok(Async::Ready(self.a+self.b))
}
}

fn main() {
let c = suma(4,5);
println!("Suma: {}",c.wait().unwrap());
}

Definimos un nuevo futuro, SumFuture, que devuelve el resultado de una suma. El futuro en su función poll duerme el hilo 1 segundo y después devuelve el resultado correcto. En la función main llamamos a suma que devuelve un futuro en vez del resultado. Con el futuro, esperamos con wait a que se resuelva y lo mostramos. Cuando un futuro se ejecuta se convierte en una tarea. Las tareas necesitan ejecutores. Wait ejecuta los futuros en el mismo hilo desde el que se hace la llamada, pero existen otras opciones. El programa, tarda un segundo en imprimir el número.

Pero esto no sirve para nada

Bueno, quizá ahora parezca una tontería, pero en cuanto introduzcamos más elementos, todo tendrá más sentido.

Una característica de los futuros es que se pueden encadenar, tal que así:
fn main() {
let c = suma(4,5)
.and_then(|v|{
suma(v,40)
}).and_then(|v|{
suma(v,40)
}).wait().unwrap();
println!("Suma: {}",c);
}

Ejecutores


Los futuros nos ayudan con la concurrencia, esto es porque se puede esperar a varios futuros a la vez sin problema. Una manera de hacerlo es con CpuPool, un ejecutor que tiene un pool de hilos ya creados. Su uso es muy sencillo, en este ejemplo vemos como hago dos operaciones en paralelo:
fn main() {
let c = suma(4,5)
.and_then(|v|{
suma(v,40)
}).and_then(|v|{
suma(v,40)
});
let d = suma(15,14);

let pool = CpuPool::new_num_cpus();
let c = pool.spawn(c);
let d = pool.spawn(d);
let (c,d) = c.join(d).wait().unwrap();
println!("SUMAS: {},{}",c,d);
}

Con spawn añadimos un nuevo futuro a la ejecución y nos devuelve otro futuro. Con join podemos unir dos futuros en uno solo que se resuelva cuando ambos hayan resuelto. Finalmente hacemos wait en el hilo principal para esperar a que las sumas se hagan e imprimir el resultado. Existen otros ejecutores. Con Tokio usaremos otro.

Tokio




Tokio es una librería que nos permite ejecutar código de entrada/salida de forma asíncrona, usando futuros por debajo. Son especialmente útiles, la parte de red de Tokio y sus ejecutores correspondientes.
extern crate tokio;
extern crate tokio_io;
extern crate futures;

use futures::prelude::*;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
use std::net::*;
use tokio::prelude::*;

fn main(){
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket|{
println!("Cliente recibido");
Ok(())
}).map_err(|err|{
println!("error = {:?}",err);
});
tokio::run(server);
}

En este ejemplo tenemos un pequeño servidor TCP que se mantiene a la espera de clientes, imprime un mensaje y cierra la conexión. Este servidor solo utiliza un hilo, no obstante, ya usa futuros. server es un futuro infinito, que nunca acaba, que ejecutamos en el hilo actual con run->spawn. El run solo es necesario para ejecutar el primer futuro, el que mantendrá viva la aplicación. spawn empieza a ejecutar el futuro, que entonces se pasa a denominar tarea. Aquí es Tokio en vez de CpuPool, el planificador de tareas.

Servidor de Echo en Tokio


Ahora vamos a ir con un ejemplo más interesante, ahora el cliente y el servidor estarán conectados durante un tiempo, mandando el cliente un mensaje y el servidor respondiendo. Pero queremos que haya varios clientes simultáneamente. Usando futuros y Tokio podemos hacerlo en un mismo hilo (al estilo Node.js).
extern crate tokio;
extern crate tokio_io;
extern crate futures;

use futures::prelude::*;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
use std::net::*;
use tokio::prelude::*;

fn main(){
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket|{
let (writer,reader) = socket.framed(LinesCodec::new()).split();
let action = reader
.map(move |line|{
println!("ECHO: {}",line);
line
})
.forward(writer)
.map(|_|{
})
.map_err(|err|{
println!("error");
});
tokio::spawn(action);
Ok(())
}).map_err(|err|{
println!("error = {:?}",err);
});
tokio::run(server);
}

¿Qué hacemos ahora? Ahora a cada socket de cliente asignamos una tarea, que se encarga, de forma independiente, de ir leyendo línea a línea (gracias a LineCodec).

A partir de aquí se programa en forma de stream, un patrón muy común en la programación con futuros.

En el primer map imprimimos la línea en el servidor y la seguimos pasando por el stream. El siguiente paso es escribir en writer, con forward imprimimos y mandamos esa línea al cliente. Forward a su vez devuelve una tupla con datos (útil para seguir haciendo cosas). Como no los necesitamos, hacemos un map cuya única finalidad sea descartar los valores y finalmente un map_err para capturar posibles errores. Una vez hecho esto tenemos un futuro listo para ser ejecutado. Iniciamos la tarea con spawn y nos olvidamos, pasando a esperar a por un nuevo cliente.

Ahora, en el servidor podemos manejar varios clientes a la vez, cada uno en una tarea distinta, dentro de un mismo hilo.

Cancelando la tarea


Esta tarea que maneja el cliente es infinita, es decir, no se para. La razón es que reader es un lector que genera un futuro infinito esperando a nuevas líneas. Pero, ¿existe algún método para parar la tarea desde el servidor?

Esto no es algo demasiado trivial, pero se puede hacer de manera sencilla usando canales MPSC de Rust y futuros.
extern crate tokio;
extern crate tokio_io;
extern crate futures;

use futures::prelude::*;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
use tokio::prelude::*;

struct Cancellable{
rx: std::sync::mpsc::Receiver<()>,
}

impl Future for Cancellable {
type Item = ();
type Error = std::sync::mpsc::RecvError;

fn poll(&mut self) -> Result<Async<Self::Item>,Self::Error> {
match self.rx.try_recv() {
Ok(_) => Ok(Async::Ready(())),
Err(_) => Ok(Async::NotReady)
}
}
}

fn main() {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket|{
let (writer,reader) = socket.framed(LinesCodec::new()).split();
let (tx,rx) = std::sync::mpsc::channel();
let cancel = Cancellable {
rx: rx,
};
let action = reader
.map(move |line|{
println!("ECHO: {}",line);
if line == "bye"{
println!("BYE");
tx.send(()).unwrap();
}
line
})
.forward(writer)
.select2(cancel)
.map(|_|{

})
.map_err(|err|{
println!("error");
});
tokio::spawn(action);

Ok(())
}).map_err(|err|{
println!("error = {:?}",err);
});
tokio::run(server);
}

La idea aquí es hacer que de alguna manera, el futuro pueda resolverse y así finalizar la tarea. Una función interesante es select2 que devuelve un futuro fusión de dos futuros. Este futuro se resuelve (devuelve valor y acaba la tarea) cuando alguno de ellos lo haga. Como el futuro de reader nunca acabará, entonces basta con poner un futuro que cuando queramos cerrar la conexión resolveremos.

Este futuro es cancel de tipo Cancellable. No viene en la librería, lo he creado arriba y lo que hace es esperar a que el extremo del canal reciba una comunicación. El valor nos da igual, simplemente que se haya producido un ping. Una vez hecho eso, el futuro resuelve y la conexión se cierra.

En el ejemplo, cuando el cliente manda bye la conexión se cierra.



Y esto es una breve introducción a los futuros y a Tokio en Rust. Todavía queda ver como podemos usar async/await para no tener que escribir todo esto en forma de stream.

Comentarios

Añadir comentario

Todos los comentarios están sujetos a moderación