Adrianistán

El blog de Adrián Arroyo


Reinforcement Learning (Aprendizaje por refuerzo): DQN, OpenAI Gym, Stable Baselines (parte 4)

- Adrián Arroyo Calle

En el post anterior vimos como podíamos aprender las funciones valor y las funciones Q para todos los estados. Sin embargo, en muchos problemas el número de estados reales es muy elevado y es posible que no lleguemos a pasar por ellos las suficientes veces como para poder aprender correctamente. Una solución es aproximar el valor mediante una función. La idea es que esta función, al aproximar, estará generalizando y podrá derivar el valor de la función valor de cada estado de forma más o menos correcta.

Esta función de aproximación puede ser de muchos tipos, aunque normalmente suelen ser redes neuronales. Esto es debido a que tienen varias ventajas, la principal de ellas, es que es diferenciable. Muy útil para el entrenamiento. Usaremos el descenso estocástico del gradiente para optimizar las redes neuronales.

Vector de características

El primer paso para crear una función de aproximación es definir las entradas. Las entradas deberán ser ciertas características del entorno, que permitan describir el estado. Por ejemplo, en un coche autónomo, las características serían por ejemplo velocidad del coche, distancia del sensor delantero al muro, distancia del sensor lateral izquierdo al muro, etc.

Entrenamiento

Para realizar el entrenamiento de una red neuronal usando el descenso estocástico del gradiente necesitaremos un objetivo. De este modo la técnica será similar que en aprendizaje supervisado. Sin embargo, a diferencia de aprendizaje supervisado, aquí no hay valores de la función valor dados de antemano. ¿Cuáles son los objetivos entonces? En este caso podemos usar lo aprendido anteriormente y tomar las estimaciones de Monte Carlo o TD como si fuese lo que tenemos que aprender.

Como nota a mencionar, TD no funcionará bien en todas las ocasiones (si la aproximación tiene que ser no lineal). Existe una variante llamada Gradient TD que corrige este inconveniente.

DQN

Con lo visto antes ya podemos intuir como funciona DQN (Deep Q-Network). Pero hay algunos detalles importantes. Lo primero es que tiene "experience replay". Esto quiere decir que el entrenamiento precalcula varios escenarios y luego, va tomando trozos de ellos y los va introduciendo en el entrenamiento, repitiéndolos a veces incluso. Esto mejora la eficiencia al requerir menos ejecuciones de episodios y mejora la estabilidad. Además tiene "fixed Q-targets". Esto significa que los valores Q que se usan para comparar lo mucho o poco que hay que desviarse respecto al valor real son fijos. Es decir, tendremos unos valores Q dinámicos que será sobre los que guardamos los resultados y otros, que nos sirven para calcular la diferencia, que son fijos y se actualizan cada cierto número de iteraciones. Esto, que parece contraintuitivo, mejora la estabilidad del algoritmo.

Ejemplo real con OpenAI Gym

Vamos a resolver un problema ya más complejo. Será el problema del Cart Pole. Se trata de un péndulo que está en posición vertical y debemos mover el carro de abajo para que no caiga a ninguno de los lados.

Este entorno es un problema clásico y como tal, ya está diseñado por nosotros en diferentes librerías. Nosotros usaremos OpenAI Gym. Se trata de una librería que provee de varios entornos interesantes para poner a prueba nuestros algoritmos, con representación gráfica de los mismos.

Este entorno se caracteriza por tener cuatro entradas: posición del carro, velocidad del carro, ángulo del péndulo y velocidad angular del péndulo. Y dos acciones: acelerar a la izquierda y acelerar a la derecha. Las recompensas son 1 por cada acción tomada. El episodio finaliza cuando el ángulo del péndulo supera los 12 grados, el carro se aleja demasiado del centro y cuando el episodio alcanza los 200 pasos.

Vamos a ver como funciona la interfaz Gym para que luego seamos capaces de crear nuestros propios entornos. Esta se compone de:

  • Una clase que hereda de gym.Env
  • Un constructor donde inicializamos, como mínimo, self.action_space y self.observation_space y opcionalmente self.reward_range y parámetros internos del entorno nuestro. Estas dos primeras deben ser de algún subtipo gym.spaces. Los más normales son Discrete, que es básicamente un valor activado sobre N posibles y Box, que es una matriz de características (podemos especificar la forma para que sea unidimensional, o que tenga más dimensiones), que contiene números. Es muy habitual que las observaciones sean de tipo Box, ya que se miden varias características a la vez, con números decimales y Discrete sea para las acciones ya que tomamos una acción de N posibles. Pero otras combinaciones son posibles y existen más tipos de gym.spaces. Si vamos a introducir características diferentes dentro del mismo Box es muy conveniente normalizar los datos entre -1 y 1. self.reward_range, al contrario, es una tupla en la que se especifica el valor máximo y el mínimo que pueden alcanzar las recompensas.
  • Un método reset, que reinicia el entorno y devuelve una observación inicial. Estas observaciones, deberán devolverse con NumPy si son de tipo Box y coincidir con lo declarado en el constructor en cuanto a forma y tipos.
  • Un método step que toma una acción y devuelve una tupla con cuatro valores: nueva observación, recompensa, un boolean de si el episodio ha acabado ya o no y un diccionario de información extra (no usado por los algoritmos).
  • Opcionalmente pueden llevar instrucciones sobre como renderizar el entorno y así ver en vivo o en vídeo el funcionamiento de los algoritmos. Principalmente se controla a través del método render.

Identifiquemos estas partes dentro del entorno de CartPole-v1. El código fuente está aquí: https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py


class CartPoleEnv(gym.Env):
    # inicializamos el entorno con parámetros que necesitemos y definimos action_space y observation_space. El primero es de tipo Discrete y el segundo de tipo Box
    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = self.masspole + self.masscart
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = self.masspole * self.length
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = "euler"

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array(
            [
                self.x_threshold * 2,
                np.finfo(np.float32).max,
                self.theta_threshold_radians * 2,
                np.finfo(np.float32).max,
            ],
            dtype=np.float32,
        )

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.seed()
        self.viewer = None
        self.state = None

        self.steps_beyond_done = None

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    # se ejecuta cuando el algoritmo llama a una acción, calcula la nueva observación (nuevo estado si es totalmente observable), la recompensa y si ha acabado ya o no
    def step(self, action):
        err_msg = "%r (%s) invalid" % (action, type(action))
        assert self.action_space.contains(action), err_msg

        x, x_dot, theta, theta_dot = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (
            force + self.polemass_length * theta_dot ** 2 * sintheta
        ) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass)
        )
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == "euler":
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot)

        done = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )

        if not done:
            reward = 1.0
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = 1.0
        else:
            if self.steps_beyond_done == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned done = True. You "
                    "should always call 'reset()' once you receive 'done = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_done += 1
            reward = 0.0

        return np.array(self.state, dtype=np.float32), reward, done, {}

    # resetea el estado interno del CartPole y devuelve la observación
    def reset(self):
        self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps_beyond_done = None
        return np.array(self.state, dtype=np.float32)

En Stable Baselines

Este algoritmo, DQN, es muy popular, y se encuentra implementado en diferentes librerías. Una de las más interesantes es Stable Baselines 3, que intenta ser el sklearn del aprendizaje por refuerzo. Usa PyTorch internamente. Con Stable Baselines 3, debemos proporcionar un entorno que siga la interfaz Gym y ajustar los hiperparámetros. En el caso de DQN los hiperparámetros más importantes son:

  • policy - La política a usar del modelo. Casi siempre MlpPolicy. Si la entrada son imágenes, CnnPolicy.
  • env - El entorno sobre el que vamos a aprender. Debe implementar la interfaz OpenAI Gym
  • learning_rate - Ratio de aprendizaje de la red neuronal
  • buffer_size - El tamaño del buffer que almacenará las transiciones del "experience replay".
  • batch_size - Tamaño del batch que se usa para reentrenar.
  • learning_starts - Cuantos steps debe dar el modelo antes de empezar a aprender la red neuronal.
  • gamma - Gamma, el factor de descuento. En posts anteriores hemos hablado de él.
  • train_freq - Cada cuantos steps se reentrena el modelo.
  • gradient_steps - Cuantos pasos de gradiente se dan al entrenar. Por defecto, 1.
  • target_update_interval - Cada cuantos steps se actualiza el "fixed Q-target".
  • policy_kwargs - Ajustes de la política. En el caso de MlpPolicy podremos ajustar la forma de la red, así como las funciones de activación y más detalles.
  • Más detalles y parámetros aquí

Con estos parámetros podemos hacer un código similar a este:


import gym
from stable_baselines3 import DQN

env = gym.make("CartPole-v1") # también se puede crear la instancia de la clase directamente en vez de usar gym.make

model = DQN(
    "MlpPolicy",
    env,
    learning_rate=1e-3,
    buffer_size=50000,
    learning_starts=10000,
    batch_size=64,
    gamma=0.999,
    gradient_steps=1,
    train_freq=20,
    target_update_interval=2000,
    verbose=1
    )
model.learn(total_timesteps=500_000)
model.save("dqn_cartpole")

Y aquí un pequeño vídeo para comparar. Primero una política aleatoria, en segundo lugar la política aprendida con DQN.

He aquí el código de como cargar el modelo entrenado y visualizarlo.


import gym
from stable_baselines3 import DQN

env = gym.make("CartPole-v1")

model = DQN.load("dqn_cartpole")

obs = env.reset()
for i in range(1000):
    action, _state = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    env.render()
    if done:
        obs = env.reset()

Comentarios

Añadir comentario

Todos los comentarios están sujetos a moderación