草庐IT

2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

daxuanzi515 2023-10-04 原文

写在前面

在学习自动驾驶领域上的强化学习过程中,我决定使用highwy-env库建设的模拟器来进行环境构建,但是翻阅了众多教程(包含国内国外)之后,发现教程内容过旧,因为随着2023年的到来,highway-env库也进行了更新,前两年的教程无一例外都使用了老旧版本的函数和返回值。

highway-env是什么东西?

安装方式:(默认最新版)pip install highway-env

首先先列出我发现的新库中的改动:

以前返回值有四个:

        observation, reward, done, info = env.step(action)

现在返回值有五个:

        observation, reward, terminated, truncated, info = env.step(action)

我推测以前的环境数据形式是ndarray数组:

        data = env.reset()

        data = (arragry([[ndarray],[],[],...,[]]),type==dtype32)

现在的环境数据形式是元组:

        data = env.reset()

        data = ((arragry([[ndarray],[],[],...,[]]),type==dtype32,{reward:{},terminated:{},...,})

基于以上改动,那么在代码中的数据处理部分也会相应地发生改变。特别是在使用多个库的时候,需要注意版本关联问题。

参考的一些代码

我的虚拟环境配置:(GPU)

虚拟环境是什么东西?来人,喂它吃九转大肠

其中必须用到的主要有以下几个:

基于 python 3.8.0

pytorch

gym

highway

tqdm

matplotlib

pygame

numpy

highway-env

使用DoubleDQN算法进行训练,此后还有在此基础上的其他改动。

默认创建python文件double_dqn.py,以下为文件中代码,拼在一起就是完整的。

注释是英文是因为我做的是英文的项目,简单翻译即可。

所使用的库

import os
import copy
import random
import time
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F

import gym
import highway_env

检测设备并初始化默认十字路口环境

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Author: Da Xuanzi 2023-2-17

# Define the environment
env = gym.make("intersection-v0")
# details
env.config["duration"] = 13
env.config["vehicles_count"] = 20
env.config["vehicles_density"] = 1.3
env.config["reward_speed_range"] = [7.0, 10.0]
env.config["initial_vehicle_count"] = 10
env.config["simulation_frequency"] = 15
env.config["arrived_reward"] = 2
env.reset()

十字路口环境的结构:

env.config
{
    "observation": {
        "type": "Kinematics",
        "vehicles_count": 15,
        "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
        "features_range": {
            "x": [-100, 100],
            "y": [-100, 100],
            "vx": [-20, 20],
            "vy": [-20, 20],
        },
        "absolute": True,
        "flatten": False,
        "observe_intentions": False
    },
    "action": {
        "type": "DiscreteMetaAction",
        "longitudinal": False,
        "lateral": True
    },
    "duration": 13,  # [s]
    "destination": "o1",
    "initial_vehicle_count": 10,
    "spawn_probability": 0.6,
    "screen_width": 600,
    "screen_height": 600,
    "centering_position": [0.5, 0.6],
    "scaling": 5.5 * 1.3,
    "collision_reward": IntersectionEnv.COLLISION_REWARD,
    "normalize_reward": False
}

构建网络

可以自定义隐藏层节点个数

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # super class
        super(Net, self).__init__()
        # hidden nodes define
        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # define forward pass of the actor
        x = state # state
        # Relu function double
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

构建学习器

class Replay: # learning
    def __init__(self,
                 buffer_size, init_length, state_dim, action_dim, env):
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)
    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)  # remove the last one in dict

    def buffer_sample(self, n):
        # random n samples from exp buffer
        return random.sample(self._storage, n)

构建学习对象

PATH = 你的文件夹绝对路径/相对路径

class DOUBLEDQN(nn.Module):
    def __init__(
        self,
            env, # gym environment
            state_dim, # state size
            action_dim, # action size
        lr = 0.001, # learning rate
        gamma = 0.99, # discount factor
        batch_size = 5, # batch size for each training
        timestamp = "",):
        # super class
        super(DOUBLEDQN, self).__init__()
        self.env = env
        self.env.reset()
        self.timestamp = timestamp
        # for evaluation purpose
        self.test_env = copy.deepcopy(env)
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend = False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)#TODO
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)#TODO
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)#TODO
        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

    def choose_our_action(self, state, epsilon = 0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target,i)
        state = torch.FloatTensor(target).to(device)
        # randn() return a set of samples which are Gaussian distribution
        # no argments -> return a float number
        if np.random.randn() <= epsilon:
            # when random number smaller than epsilon: do these things
            # put a state array into estimate net to obtain their value array
            # choose max values in value array -> obtain action
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            # when random number bigger than epsilon: randomly choose a action
            action = np.random.randint(0, self.action_dim)

        return action

    def train(self, num_episode):
        # num_eposide: total turn number for train
        loss_list = [] # loss set
        avg_reward_list = [] # reward set
        episode_reward = 0
        rend = 0
        # tqdm : a model for showing process bar
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step +=1
                action = self.choose_our_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch in replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)
                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)


                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # obtain estimate Q value gather(dim, index) dim==1:column index
                estimate_Q_value = self.estimate_net(state_batch).gather(1, action_batch)
                # obtain target Q value detach:remove the matched element
                max_action_index = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q_value = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_index.unsqueeze(1))# squeeze(1) n*1->1*1, unsqueeze(1) 1*1->n*1

                # mse_loss: mean loss
                loss = F.mse_loss(estimate_Q_value, target_Q_value)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                # load parameters into model
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter +=1

            reward, count = self.eval()
            episode_reward += reward

            # you can update these variables
            if episode_reward % 100 == 0:
                rend += 1
                if rend % 5 == 0:
                    self.is_rend = True
                else:
                    self.is_rend = False
            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # episode_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                # saving as timestamp file
                np.save(path + "/DOUBLE_DQN_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
     
        while not done:
            action = self.choose_our_action(state, epsilon = 1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count

构建运行函数

超参数可以自己设置 lr gamma

if __name__ == "__main__":

    # timestamp
    named_tuple = time.localtime()
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)
    print(time_string)
    # create a doubledqn object
    double_dqn_object = DOUBLEDQN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    # your chosen train times
    iteration = 20
    # start training
    avg_loss, avg_reward_list = double_dqn_object.train(iteration)
    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")
    torch.save(double_dqn_object.state_dict(), path + "/DOUBLE_DQN_MODEL.pt")

使用数据进行绘制图片

新建文件draw_figures.py

?处自己替换成自己的路径

import matplotlib.pyplot as plt
import numpy as np
Loss = r"?\?\DOUBLE_DQN_LOSS.npy"
Reward = r"?\?\DOUBLE_DQN_EACH_REWARD.npy"
avg_loss = np.load(Loss)
avg_reward_list = np.load(Reward)
# print("loss", avg_loss)
# print("reward", avg_reward_list)
plt.figure(figsize=(10, 6))
plt.plot(avg_loss)
plt.grid()
plt.title("Double DQN Loss")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.savefig(r"?\figures\double_dqn_loss.png", dpi=150)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(avg_reward_list)
plt.grid()
plt.title("Double DQN Training Reward")
plt.xlabel("epochs")
plt.ylabel("reward")
plt.savefig(r"?\figures\double_dqn_train_reward.png", dpi=150)
plt.show()

提纳里手动分割线

Dueling_DQN

以上基本稍作改动即可

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        """
        Initialize the network

        : param state_dim: int, size of state space
        : param action_dim: int, size of action space
        """
        super(Net, self).__init__()

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim + 1)

    def forward(self, state):
        """
        Define the forward pass of the actor

        : param state: ndarray, the state of the environment
        """
        x = state
        # print(x.shape)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

class Replay: # learning
    def __init__(self,
                 buffer_size, init_length, state_dim, action_dim, env):
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)
    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)  # remove the last one in dict

    def buffer_sample(self, n):
        # random n samples from exp buffer
        return random.sample(self._storage, n)

class DUELDQN(nn.Module):
    def __init__(
        self,
        env,
        state_dim,
        action_dim,
        lr=0.001,
        gamma=0.99,
        batch_size=5,
        timestamp="",
    ):
        """
        : param env: object, a gym environment
        : param state_dim: int, size of state space
        : param action_dim: int, size of action space
        : param lr: float, learning rate
        : param gamma: float, discount factor
        : param batch_size: int, batch size for training
        """
        super(DUELDQN, self).__init__()

        self.env = env
        self.env.reset()
        self.timestamp = timestamp

        self.test_env = copy.deepcopy(env)  # for evaluation purpose
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend =False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

    def choose_action(self, state, epsilon=0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target, i)
        state = torch.FloatTensor(target).to(device)
        # randn() return a set of samples which are Gaussian distribution
        # no argments -> return a float number
        if np.random.randn() <= epsilon:
            # when random number smaller than epsilon: do these things
            # put a state array into estimate net to obtain their value array
            # choose max values in value array -> obtain action
            action_value = self.estimate_net(state)
            action_value = action_value[:-1]
            action = torch.argmax(action_value).item()
        else:
            # when random number bigger than epsilon: randomly choose a action
            action = np.random.randint(0, self.action_dim)

        return action

    def calculate_duelling_q_values(self, duelling_q_network_output):
        """
        Calculate the Q values using the duelling network architecture. This is equation (9) in the paper.

        :param duelling_q_network_output: tensor, output of duelling q network
        :return: Q values
        """
        state_value = duelling_q_network_output[:, -1]
        avg_advantage = torch.mean(duelling_q_network_output[:, :-1], dim=1)
        q_values = state_value.unsqueeze(1) + (
            duelling_q_network_output[:, :-1] - avg_advantage.unsqueeze(1)
        )
        return q_values

    def train(self, num_episode):
        # num_eposide: total turn number for train
        loss_list = [] # loss set
        avg_reward_list = [] # reward set
        episode_reward = 0

        # tqdm : a model for showing process bar
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step += 1
                action = self.choose_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch in replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)
                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)


                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_net_output = self.estimate_net(state_batch)
                estimate_Q = self.calculate_duelling_q_values(estimate_net_output)
                estimate_Q = estimate_Q.gather(1, action_batch)

                # get target Q value
                max_action_idx = (
                    self.estimate_net(state_next_batch)[:, :-1].detach().argmax(1)
                )
                target_net_output = self.target_net(state_next_batch)
                target_Q = self.calculate_duelling_q_values(target_net_output).gather(
                    1, max_action_idx.unsqueeze(1)
                )
                target_Q = reward_batch + done_batch * self.gamma * target_Q

                # compute mse loss
                loss = F.mse_loss(estimate_Q, target_Q)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter += 1

            reward, count = self.eval()
            episode_reward += reward

            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # epoch_reward = 0
                path = PATH + "/" + self.timestamp
                # create a new directory for saving
                try:
                    os.makedirs(path)
                except OSError:
                    pass

                np.save(path + "/DUELING_DQN_LOSS.npy", loss_list)
                np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            state_next, reward, done, _, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = state_next

        return total_reward, count


if __name__ == "__main__":

    # timestamp for saving
    named_tuple = time.localtime()  # get struct_time
    time_string = time.strftime(
        "%Y-%m-%d-%H-%M", named_tuple
    )  # have a folder of "date+time ex: 1209_20_36 -> December 12th, 20:36"

    duel_dqn_object = DUELDQN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    path = PATH + "/" + time_string
    # Train the policy
    iterations = 10
    avg_loss, avg_reward_list = duel_dqn_object.train(iterations)
    np.save(path + "/DUELING_DQN_LOSS.npy", avg_loss)
    np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)
    torch.save(duel_dqn_object.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")
    torch.save(duel_dqn_object.state_dict(), path + "/DUELING_DQN_MODEL.pt")

DDQN+OtherChanges

三层2D卷积

# add CNN structure
class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # initalize the network
        # state_dim: state space
        # action_dim: action space
        super(Net, self).__init__()

        # nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # in_channel : input size = in_channels * in_N * in_N
        # out_channel : define
        # kernel_size : rules or define
        # stride: step length
        # padding: padding size
        # out_N = (in_N - Kernel_size + 2 * Padding)/ Stride +1
        self.cnn = nn.Sequential(
            # the first 2D convolutional layer
            nn.Conv2d(1, 4, kernel_size=3, padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
            # the second 2D convolutional layer
            nn.Conv2d(4, 8, kernel_size=3, padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
            # the third 2D convolutional layer ---- my test and try or more convolutional layers
            nn.Conv2d(8, 4, kernel_size=3, padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
        )

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(4 * 1 * 9, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # define forward pass of the actor
        x = state # state
        x = self.cnn(x)
        x = x.view(x.size(0), -1)
        # Relu function double
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out


class Replay:
    def __init__(self, buffer_size, init_length, state_dim, action_dim, env):

        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)

    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0) # remove the last one in dict

    def buffer_sample(self, N):
        # random n samples from exp buffer
        return random.sample(self._storage, N)



class DOUBLEDQN_CNN(nn.Module):
    def __init__(
        self,
            env,  # gym environment
            state_dim,  # state size
            action_dim,  # action size
            lr=0.001,  # learning rate
            gamma=0.99,  # discount factor
            batch_size=5,  # batch size for each training
            timestamp="", ):
        # super class
        super(DOUBLEDQN_CNN, self).__init__()

        self.env = env
        self.env.reset()
        self.timestamp = timestamp
        # for evaluation purpose
        self.test_env = copy.deepcopy(env)
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend = False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)


    def choose_action(self, state, epsilon=0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        #TODO
        state = (
            torch.FloatTensor(state).to(device).reshape(-1, 1, 7, self.state_dim // 7)
        )
        if np.random.randn() <= epsilon:
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            action = np.random.randint(0, self.action_dim)
        return action

    def train(self, num_episode):
        # num_eposide: total turn number for train
        count_list = []
        loss_list = []
        total_reward_list = []
        avg_reward_list = []
        episode_reward = 0
        rend = 0
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step += 1
                action = self.choose_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch from replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)

                # extract batch data

                action_batch = torch.LongTensor([exp["action"] for exp in exp_batch])
                reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch])
                done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch])
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list)
                state_batch = torch.FloatTensor(state_temp_list)

                # reshape
                state_batch = state_batch.to(device).reshape(
                    self.batch_size, 1, 7, self.state_dim // 7
                )
                action_batch = action_batch.to(device).reshape(self.batch_size, -1)
                reward_batch = reward_batch.to(device).reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.to(device).reshape(
                    self.batch_size, 1, 7, self.state_dim // 7
                )
                done_batch = done_batch.to(device).reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

                # get target Q value
                max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_idx.unsqueeze(1))

                # compute mse loss
                loss = F.mse_loss(estimate_Q, target_Q)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter += 1

            reward, count = self.eval()
            episode_reward += reward

            # you can update these variables
            if episode_reward % 100 == 0:
                rend += 1
                if rend % 5 == 0:
                    self.is_rend = True
                else:
                    self.is_rend = False
            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # epoch_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                # saving as timestamp file
                np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count


if __name__ == "__main__":

    # timestamp
    named_tuple = time.localtime()
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)
    print(time_string)
    # create a doubledqn object
    double_dqn_cnn_object = DOUBLEDQN_CNN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    # your chosen train times
    iteration = 20
    # start training
    avg_loss, avg_reward_list = double_dqn_cnn_object.train(iteration)
    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_cnn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")
    torch.save(double_dqn_cnn_object.state_dict(), path + "/DOUBLE_DQN_CNN_MODEL.pt")

经验池

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # state_dim: state space
        # action_dim: action space
        super(Net, self).__init__()

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # state: ndarray
        x = state
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

# Priortized_Replay
class Prioritized_Replay:
    def __init__(
        self,
        buffer_size,
        init_length,
        state_dim,
        action_dim,
        est_Net,
        tar_Net,
        gamma,
    ):
        # state_dim: state space
        # action_dim: action space
        # env: env
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.is_rend = False

        self.priority = deque(maxlen=buffer_size)
        self._storage = []
        self._init_buffer(init_length, est_Net, tar_Net)

    def _init_buffer(self, n, est_Net, tar_Net):
        # n: sample number
        state = env.reset()
        for i in range(n):
            action = env.action_space.sample()
            observation, reward, done, truncated, info = env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self.prioritize(est_Net, tar_Net, exp, alpha=0.6)
            self._storage.append(exp)
            state = observation

            if done:
                state = env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)
    # add prioritize
    def prioritize(self, est_Net, tar_Net, exp, alpha=0.6):
        state = torch.FloatTensor(exp["state"]).to(device).reshape(-1)

        q = est_Net(state)[exp["action"]].detach().cpu().numpy()
        q_next = exp["reward"] + self.gamma * torch.max(est_Net(state).detach())
        # TD error
        p = (np.abs(q_next.cpu().numpy() - q) + (np.e ** -10)) ** alpha
        self.priority.append(p.item())

    def get_prioritized_batch(self, N):
        prob = self.priority / np.sum(self.priority)
        # random.choices(list,weights=None,*,cum_weights=None,k=1)
        # weight: set the chosen item rate
        # k: times for choice
        # cum_weight: sum of weight
        sample_idxes = random.choices(range(len(prob)), k=N, weights=prob)
        importance = (1 / prob) * (1 / len(self.priority))
        sampled_importance = np.array(importance)[sample_idxes]
        sampled_batch = np.array(self._storage)[sample_idxes]
        return sampled_batch.tolist(), sampled_importance

    def buffer_sample(self, N):
        # random n samples from exp buffer
        return random.sample(self._storage, N)


class DDQNPB(nn.Module):
    def __init__(
        self,
        env,
        state_dim,
        action_dim,
        lr=0.001,
        gamma=0.99,
        buffer_size=1000,
        batch_size=50,
        beta=1,
        beta_decay=0.995,
        beta_min=0.01,
        timestamp="",
    ):
        # env: environment
        # state_dim: state space
        # action_dim: action space
        # lr: learning rate
        # gamma: loss/discount factor
        # batch_size: training batch size
        super(DDQNPB, self).__init__()

        self.timestamp = timestamp

        self.test_env = copy.deepcopy(env)  # for evaluation purpose
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

        self.ReplayBuffer = Prioritized_Replay(
            buffer_size,
            100,
            self.state_dim,
            self.action_dim,
            self.estimate_net,
            self.target_net,
            gamma,
        )
        self.priority = self.ReplayBuffer.priority
        # NOTE: right here beta is equal to (1-beta) in most of website articles, notation difference
        # start from 1 and decay
        self.beta = beta
        self.beta_decay = beta_decay
        self.beta_min = beta_min

    def choose_action(self, state, epsilon=0.9):
        # state: env state
        # epsilon: [0,1]
        # return action you choose
        # get a 1D array
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target, i)
        state = torch.FloatTensor(target).to(device)
        if np.random.randn() <= epsilon:
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            action = np.random.randint(0, self.action_dim)
        return action

    def train(self, num_episode):
        # num_epochs: training times
        loss_list = []
        avg_reward_list = []
        episode_reward = 0

        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = env.reset()
            each_loss = 0
            step = 0
            rend = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                action = self.choose_action(state)
                observation, reward, done, _, info = env.step(action)
                # self.env.render()
                # store experience to replay memory
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # importance weighting
                if self.beta > self.beta_min:
                    self.beta *= self.beta_decay

                # sample random batch from replay memory
                exp_batch, importance = self.ReplayBuffer.get_prioritized_batch(
                    self.batch_size
                )
                importance = torch.FloatTensor(importance ** (1 - self.beta)).to(device)

                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)

                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

                # get target Q value
                max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_idx.unsqueeze(1))

                # compute mse loss
                # loss = F.mse_loss(estimate_Q, target_Q)
                loss = torch.mean(
                    torch.multiply(torch.square(estimate_Q - target_Q), importance)
                )
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                #TODO
                # update target network
                if self.learn_step_counter % 10 == 0:
                    # self.update_target_networks()
                    self.target_net.load_state_dict(self.estimate_net.state_dict())

                self.learn_step_counter += 1
                step += 1

                env.render()
                # you can update these variables
                # if episode_reward % 100 == 0:
                #     rend += 1
                #     if rend % 5 == 0:
                #         self.is_rend = True
                #     else:
                #         self.is_rend = False

            reward, count = self.eval()
            episode_reward += reward


            # save
            period = 1
            if episode % period == 0:
                # log
                each_loss /= period
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print(
                    "\nepoch: [{}/{}], \tavg loss: {:.4f}, \tavg reward: {:.3f}, \tsteps: {}".format(
                        episode, num_episode, each_loss, episode_reward, count
                    )
                )
                # episode_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(),path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")

        env.close()
        return loss_list, avg_reward_list

    def eval(self):
        """
        Evaluate the policy
        """
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count


if __name__ == "__main__":

    # timestamp for saving
    named_tuple = time.localtime()  # get struct_time
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)

    double_dqn_prioritized_object = DDQNPB(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        buffer_size=1000,
        batch_size=64,
        timestamp=time_string,
    )

    # Train the policy
    iterations = 10000
    avg_loss, avg_reward_list = double_dqn_prioritized_object.train(iterations)

    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_prioritized_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")
    torch.save(double_dqn_prioritized_object.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_MODEL.pt")

有些东西可以自己改掉,自己调出的bug才是好bug!(大雾)

写在后面:

关于自定义环境,刚刚花30分钟研究了一下,官方写的教程稀烂(狗头),我自己得到的攻略如下:

  1. 找到你的highway-env安装包位置,我的在:E:\formalFiles\Anaconda3-2020.07\envs\autodrive_38\Lib\site-packages\highway_env
  2. 在highway-env里的envs可以看到多个场景的定义文件,此处拿出intersection_env.py举例,其他的同理。新建一个文件test_env.py,把intersection_env.py的所有内容复制粘贴到里面。

  3. 在test_env.py里,重命名如下:
    class test(AbstractEnv):
        #
        # ACTIONS: Dict[int, str] = {
        #     0: 'SLOWER',
        #     1: 'IDLE',
        #     2: 'FASTER'
        # }
        ACTIONS: Dict[int, str] = {
            0: 'LANE_LEFT',
            1: 'IDLE',
            2: 'LANE_RIGHT',
            3: 'FASTER',
            4: 'SLOWER'
        }

    删除除了第一个class以外的所有class定义。这里是把动作区间改成5个。

  4. 在envs/_init_.py的末尾加上
    from highway_env.envs.test_env import *
  5. 在highway-env文件夹里找到一个单独的_init_.py,不是上一步的python文件!修改如下:
    def register_highway_envs():
        """Import the envs module so that envs register themselves."""
        # my test environment
        register(
            id='test-v0',# 引用名
            entry_point='highway_env.envs:test'#环境类名
        )
  6. 修改奖励,来到你的定义环境文件highway-env/envs/test_env.py里面,看到_reward函数,以及和它有关的_agent_reward函数等,可自行改掉算子。utils.py中有函数lmap()。
    def _reward(self, action: int) -> float:
        """Aggregated reward, for cooperative agents."""
        return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles
                       ) / len(self.controlled_vehicles)
    
    def _agent_reward(self, action: int, vehicle: Vehicle) -> float:
        """Per-agent reward signal."""
        rewards = self._agent_rewards(action, vehicle)
        reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())
        reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward
        reward *= rewards["on_road_reward"]
        if self.config["normalize_reward"]:
            reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])
        return reward
    
    def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:
        """Per-agent per-objective reward signal."""
        scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])
        return {
                "collision_reward": vehicle.crashed,
                "high_speed_reward": np.clip(scaled_speed, 0, 1),
                "arrived_reward": self.has_arrived(vehicle),
                "on_road_reward": vehicle.on_road
            }
  7. 引用自定义环境如下:
    import highway-env
    import gym
    
    env = gym.make("test-v0")
    env.reset()
  8. 我自定义的环境文件,个人设定,不代表最佳结果:
    from typing import Dict, Tuple, Text
    
    import numpy as np
    from highway_env import utils
    from highway_env.envs.common.abstract import AbstractEnv, MultiAgentWrapper
    from highway_env.road.lane import LineType, StraightLane, CircularLane, AbstractLane
    from highway_env.road.regulation import RegulatedRoad
    from highway_env.road.road import RoadNetwork
    from highway_env.vehicle.kinematics import Vehicle
    from highway_env.vehicle.controller import ControlledVehicle
    
    
    class test(AbstractEnv):
        #
        # ACTIONS: Dict[int, str] = {
        #     0: 'SLOWER',
        #     1: 'IDLE',
        #     2: 'FASTER'
        # }
        ACTIONS: Dict[int, str] = {
            0: 'LANE_LEFT',
            1: 'IDLE',
            2: 'LANE_RIGHT',
            3: 'FASTER',
            4: 'SLOWER'
        }
        ACTIONS_INDEXES = {v: k for k, v in ACTIONS.items()}
    
        @classmethod
        def default_config(cls) -> dict:
            config = super().default_config()
            config.update({
                "observation": {
                    "type": "Kinematics",
                    "vehicles_count": 15,
                    "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
                    "features_range": {
                        "x": [-100, 100],
                        "y": [-100, 100],
                        "vx": [-20, 20],
                        "vy": [-20, 20],
                    },
                    "absolute": True,
                    "flatten": False,
                    "observe_intentions": False
                },
                "action": {
                    "type": "DiscreteMetaAction",
                    "longitudinal": True,
                    "lateral": True,
                    "target_speeds": [0, 4.5, 9]
                },
                "duration": 13,  # [s]
                "destination": "o1",
                "controlled_vehicles": 1,
                "initial_vehicle_count": 10,
                "spawn_probability": 0.6,
                "screen_width": 600,
                "screen_height": 600,
                "centering_position": [0.5, 0.6],
                "scaling": 5.5 * 1.3,
                "collision_reward": -10,
                "high_speed_reward": 2,
                "arrived_reward": 5,
                "reward_speed_range": [7.0, 9.0],# change
                "normalize_reward": False,
                "offroad_terminal": False
            })
            return config
    
        def _reward(self, action: int) -> float:
            """Aggregated reward, for cooperative agents."""
            return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles
                       ) / len(self.controlled_vehicles)
    
        def _rewards(self, action: int) -> Dict[Text, float]:
            """Multi-objective rewards, for cooperative agents."""
            agents_rewards = [self._agent_rewards(action, vehicle) for vehicle in self.controlled_vehicles]
            return {
                name: sum(agent_rewards[name] for agent_rewards in agents_rewards) / len(agents_rewards)
                for name in agents_rewards[0].keys()
            }
        # edit your reward
        def _agent_reward(self, action: int, vehicle: Vehicle) -> float:
            """Per-agent reward signal."""
            rewards = self._agent_rewards(action, vehicle)
            reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())
            reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward
            reward *= rewards["on_road_reward"]
            if self.config["normalize_reward"]:
                reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])
            return reward
    
        def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:
            """Per-agent per-objective reward signal."""
            scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])
            return {
                "collision_reward": vehicle.crashed,
                "high_speed_reward": np.clip(scaled_speed, 0, 1),
                "arrived_reward": self.has_arrived(vehicle),
                "on_road_reward": vehicle.on_road
            }
    
        def _is_terminated(self) -> bool:
            return any(vehicle.crashed for vehicle in self.controlled_vehicles) \
                   or all(self.has_arrived(vehicle) for vehicle in self.controlled_vehicles) \
                   or (self.config["offroad_terminal"] and not self.vehicle.on_road)
    
        def _agent_is_terminal(self, vehicle: Vehicle) -> bool:
            """The episode is over when a collision occurs or when the access ramp has been passed."""
            return (vehicle.crashed or
                    self.has_arrived(vehicle) or
                    self.time >= self.config["duration"])
    
        def _is_truncated(self) -> bool:
            return
    
        def _info(self, obs: np.ndarray, action: int) -> dict:
            info = super()._info(obs, action)
            info["agents_rewards"] = tuple(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles)
            info["agents_dones"] = tuple(self._agent_is_terminal(vehicle) for vehicle in self.controlled_vehicles)
            return info
    
        def _reset(self) -> None:
            self._make_road()
            self._make_vehicles(self.config["initial_vehicle_count"])
    
        def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]:
            obs, reward, terminated, truncated, info = super().step(action)
            self._clear_vehicles()
            self._spawn_vehicle(spawn_probability=self.config["spawn_probability"])
            return obs, reward, terminated, truncated, info
    
        def _make_road(self) -> None:
            """
            Make an 4-way intersection.
    
            The horizontal road has the right of way. More precisely, the levels of priority are:
                - 3 for horizontal straight lanes and right-turns
                - 1 for vertical straight lanes and right-turns
                - 2 for horizontal left-turns
                - 0 for vertical left-turns
    
            The code for nodes in the road network is:
            (o:outer | i:inner + [r:right, l:left]) + (0:south | 1:west | 2:north | 3:east)
    
            :return: the intersection road
            """
            lane_width = AbstractLane.DEFAULT_WIDTH
            right_turn_radius = lane_width + 5  # [m}
            left_turn_radius = right_turn_radius + lane_width  # [m}
            outer_distance = right_turn_radius + lane_width / 2
            access_length = 50 + 50  # [m]
    
            net = RoadNetwork()
            n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
            for corner in range(4):
                angle = np.radians(90 * corner)
                is_horizontal = corner % 2
                priority = 3 if is_horizontal else 1
                rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])
                # Incoming
                start = rotation @ np.array([lane_width / 2, access_length + outer_distance])
                end = rotation @ np.array([lane_width / 2, outer_distance])
                net.add_lane("o" + str(corner), "ir" + str(corner),
                             StraightLane(start, end, line_types=[s, c], priority=priority, speed_limit=10))
                # Right turn
                r_center = rotation @ (np.array([outer_distance, outer_distance]))
                net.add_lane("ir" + str(corner), "il" + str((corner - 1) % 4),
                             CircularLane(r_center, right_turn_radius, angle + np.radians(180), angle + np.radians(270),
                                          line_types=[n, c], priority=priority, speed_limit=10))
                # Left turn
                l_center = rotation @ (np.array([-left_turn_radius + lane_width / 2, left_turn_radius - lane_width / 2]))
                net.add_lane("ir" + str(corner), "il" + str((corner + 1) % 4),
                             CircularLane(l_center, left_turn_radius, angle + np.radians(0), angle + np.radians(-90),
                                          clockwise=False, line_types=[n, n], priority=priority - 1, speed_limit=10))
                # Straight
                start = rotation @ np.array([lane_width / 2, outer_distance])
                end = rotation @ np.array([lane_width / 2, -outer_distance])
                net.add_lane("ir" + str(corner), "il" + str((corner + 2) % 4),
                             StraightLane(start, end, line_types=[s, n], priority=priority, speed_limit=10))
                # Exit
                start = rotation @ np.flip([lane_width / 2, access_length + outer_distance], axis=0)
                end = rotation @ np.flip([lane_width / 2, outer_distance], axis=0)
                net.add_lane("il" + str((corner - 1) % 4), "o" + str((corner - 1) % 4),
                             StraightLane(end, start, line_types=[n, c], priority=priority, speed_limit=10))
    
            road = RegulatedRoad(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])
            self.road = road
    
        def _make_vehicles(self, n_vehicles: int = 10) -> None:
            """
            Populate a road with several vehicles on the highway and on the merging lane
    
            :return: the ego-vehicle
            """
            # Configure vehicles
            vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
            vehicle_type.DISTANCE_WANTED = 5  # Low jam distance
            vehicle_type.COMFORT_ACC_MAX = 6
            vehicle_type.COMFORT_ACC_MIN = -3
    
            # Random vehicles
            simulation_steps = 3
            for t in range(n_vehicles - 1):
                self._spawn_vehicle(np.linspace(0, 80, n_vehicles)[t])
            for _ in range(simulation_steps):
                [(self.road.act(), self.road.step(1 / self.config["simulation_frequency"])) for _ in range(self.config["simulation_frequency"])]
    
            # Challenger vehicle
            self._spawn_vehicle(60, spawn_probability=1, go_straight=True, position_deviation=0.1, speed_deviation=0)
    
            # Controlled vehicles
            self.controlled_vehicles = []
            for ego_id in range(0, self.config["controlled_vehicles"]):
                ego_lane = self.road.network.get_lane(("o{}".format(ego_id % 4), "ir{}".format(ego_id % 4), 0))
                destination = self.config["destination"] or "o" + str(self.np_random.randint(1, 4))
                ego_vehicle = self.action_type.vehicle_class(
                                 self.road,
                                 ego_lane.position(60 + 5*self.np_random.normal(1), 0),
                                 speed=ego_lane.speed_limit,
                                 heading=ego_lane.heading_at(60))
                try:
                    ego_vehicle.plan_route_to(destination)
                    ego_vehicle.speed_index = ego_vehicle.speed_to_index(ego_lane.speed_limit)
                    ego_vehicle.target_speed = ego_vehicle.index_to_speed(ego_vehicle.speed_index)
                except AttributeError:
                    pass
    
                self.road.vehicles.append(ego_vehicle)
                self.controlled_vehicles.append(ego_vehicle)
                for v in self.road.vehicles:  # Prevent early collisions
                    if v is not ego_vehicle and np.linalg.norm(v.position - ego_vehicle.position) < 20:
                        self.road.vehicles.remove(v)
    
        def _spawn_vehicle(self,
                           longitudinal: float = 0,
                           position_deviation: float = 1.,
                           speed_deviation: float = 1.,
                           spawn_probability: float = 0.6,
                           go_straight: bool = False) -> None:
            if self.np_random.uniform() > spawn_probability:
                return
    
            route = self.np_random.choice(range(4), size=2, replace=False)
            route[1] = (route[0] + 2) % 4 if go_straight else route[1]
            vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
            vehicle = vehicle_type.make_on_lane(self.road, ("o" + str(route[0]), "ir" + str(route[0]), 0),
                                                longitudinal=(longitudinal + 5
                                                              + self.np_random.normal() * position_deviation),
                                                speed=8 + self.np_random.normal() * speed_deviation)
            for v in self.road.vehicles:
                if np.linalg.norm(v.position - vehicle.position) < 15:
                    return
            vehicle.plan_route_to("o" + str(route[1]))
            vehicle.randomize_behavior()
            self.road.vehicles.append(vehicle)
            return vehicle
    
        def _clear_vehicles(self) -> None:
            is_leaving = lambda vehicle: "il" in vehicle.lane_index[0] and "o" in vehicle.lane_index[1] \
                                         and vehicle.lane.local_coordinates(vehicle.position)[0] \
                                         >= vehicle.lane.length - 4 * vehicle.LENGTH
            self.road.vehicles = [vehicle for vehicle in self.road.vehicles if
                                  vehicle in self.controlled_vehicles or not (is_leaving(vehicle) or vehicle.route is None)]
    
        def has_arrived(self, vehicle: Vehicle, exit_distance: float = 25) -> bool:
            return "il" in vehicle.lane_index[0] \
                   and "o" in vehicle.lane_index[1] \
                   and vehicle.lane.local_coordinates(vehicle.position)[0] >= exit_distance
    
    

哦,都要一个可视化是吧?来了来了。

在test-v0下,用double_dqn.py训练的图:(action_dim==5)

目前是单智能体,后续的多智能体需要调整输入的数据和动作,以及控制小车的数量,做为后续的待定改进点。

其他?等我写好 多智能体 0-0!

待好心人补充....毕竟这里是无人区啊(悲)

 终有一日,我会成为神一样的提纳里先生!

有关2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了的更多相关文章

  1. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  2. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  3. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

  4. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  5. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  6. 华为OD机试真题 C++ 实现【带传送阵的矩阵游离】【2023 Q2 | 200分】 - 2

            所有题目均有五种语言实现。C实现目录、C++实现目录、Python实现目录、Java实现目录、JavaScript实现目录题目n行m列的矩阵,每个位置上有一个元素你可以上下左右行走,代价是前后两个位置元素值差的绝对值.另外,你最多可以使用一次传送阵(只能从一个数跳到另外一个相同的数)求从走上角走到右下角最少需要多少时间。输入描述:第一行两个整数n,m,分别代表矩阵的行和列。后面n行,每行m个整数,分别代表矩阵中的元素。输出描述:一个整数,表示最少需要多少时间。

  7. 区块链入门教程(6)--WeBASE-Front节点前置服务安装 - 2

    文章目录1.任务背景2.任务目标3.相关知识点4.任务实操4.1安装配置JDK4.2启动FISCOBCOS4.3下载解压WeBASE-Front4.4拷贝sdk证书文件4.5启动节点4.6访问节点4.7检查运行状态5.任务总结1.任务背景FISCOBCOS其实是有控制台管理工具,用来对区块链系统进行各种管理操作。但是对于初学者来说,还是可视化界面更友好,本节就来介绍WeBASE管理平台,这是一款微众银行开源的自研区块链中间件平台,可以降低区块链使用的门槛,大幅提高区块链应用的开发效率。微众银行是腾讯牵头设立的民营银行,在国内民营银行里还是比较出名的。微众银行参与FISCOBCOS生态建设,一定

  8. Tcl脚本入门笔记详解(一) - 2

    TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是

  9. Simulink方法总结和避坑指南(一)——Simulink入门与基本调试方法 - 2

    文章目录一、项目场景二、基本模块原理与调试方法分析——信源部分:三、信号处理部分和显示部分:四、基本的通信链路搭建:四、特殊模块:interpretedMATLABfunction:五、总结和坑点提醒一、项目场景  最近一个任务是使用simulink搭建一个MIMO串扰消除的链路,并用实际收到的数据进行测试,在搭建的过程中也遇到了不少的问题(当然这比vivado里面的debug好不知道多少倍)。准备趁着这个机会,先以一个很基本的通信链路对simulink基础和相关的debug方法进行总结。  在本篇中,主要记录simulink的基本原理和基本的SISO通信传输链路(QPSK方式),计划在下篇记

  10. IDEA 2023.1 正式发布,新特性简介 - 2

     昨晚看到IDEA官推宣布IntelliJIDEA2023.1正式发布了。简单看了一下,发现这次的新版本包含了许多改进,进一步优化了用户体验,提高了便捷性。至于是否升级最新版本完全是个人意愿,如果觉得新版本没有让自己感兴趣的改进,完全就不用升级,影响不大。软件的版本迭代非常正常,正确看待即可,不持续改进就会慢慢被淘汰!根据官方介绍:IntelliJIDEA2023.1针对新的用户界面进行了大量重构,这些改进都是基于收到的宝贵反馈而实现的。官方还实施了性能增强措施,使得Maven导入更快,并且在打开项目时IDE功能更早地可用。由于后台提交检查,新版本提供了简化的提交流程。IntelliJIDEA

随机推荐