使用强化学习教学AI以将派饼分发到商店

介绍



有一次,在阅读《强化学习:入门》一书时,我曾考虑过用实用知识来补充我的理论知识,但是却没有解决下一个平衡杠铃,教经纪人下棋或发明另一辆自行车的问题。



同时,这本书包含了一个优化客户队列的有趣示例,一方面在实现/理解流程方面并不太复杂,另一方面,这也很有趣并且可以在现实生活中成功实现。



在稍微更改了此示例后,我想到了这个想法,将对此进行进一步讨论。



问题的提法



因此,请想象一下以下图片:







我们拥有一家面包店,该面包店每天生产6吨(有条件)覆盆子派,并将这些产品每天分配给三个商店。



但是,如何最好地做到这一点,以使过期产品越少越好(前提是馅饼的保质期为三天),如果我们在每个销售点只有三辆分别载有1吨,2吨和3吨的卡车,那么这是最有利可图的仅发送一辆卡车(因为它们彼此之间的距离足够远),而且,每天仅在烘烤馅饼后才发送一次,此外,我们不知道商店的购买力(因为业务刚刚开始)?



我们同意FIFO布局策略在商店中的效果很好,在该商店中,顾客只拿到比其他顾客晚生产的商品,但是如果没有在三天内购买覆盆子派,商店的工作人员就会摆脱它。



我们(有条件地)不知道特定商店在特定日期对馅饼的需求量,但是,在我们的模拟中,我们为三个商店中的每一个设置如下:3±0.1、1±0.1、2±0.1。



显然,对我们来说,最有利可图的选择是分别向第一家商店发送三吨,向第二家商店发送三吨,向第三家商店发送两吨馅饼。



为了解决这个问题,我们使用了自定义的体育馆环境以及Deep Q Learning(Keras实现)。



自定义环境



环境状态将用三个真实的正数来描述-三个商店中每一个商店当天剩余的产品。代理的动作是从0到5(含0和5)的数字,表示整数1、2和3的排列索引。很显然,最有益的动作将在第4个索引(3、1、2)之下。在30天的一集中,我们认为该任务是情节性的。



import gym
from gym import error, spaces, utils
from gym.utils import seeding

import itertools
import random
import time

class ShopsEnv(gym.Env):
  metadata = {'render.modes': ['human']}
  
  #  ,   
  #  
  def __init__(self):
    self.state = [0, 0, 0]  #  
    self.next_state = [0, 0, 0]  #  
    self.done = False  #   
    self.actions = list(itertools.permutations([1, 2, 3]))  #    
    self.reward = 0  #    
    self.time_tracker = 0  #   
    
    self.remembered_states = []  #     
    
    #   
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
  
  #       ()  
  def step(self, action_num):
    #      
    if self.done:
        return [self.state, self.reward, self.done, self.next_state]
    else:
        #    
        self.state = self.next_state
        
        #  
        self.remembered_states.append(self.state) 
    
        #  
        self.time_tracker += 1
        
        #       
        action = self.actions[action_num]
        
        #  ,    ( )
        self.next_state = [x + y for x, y in zip(action, self.state)]
        
        #    
        self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
        self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
        self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
        
        #    
        if any([x < 0 for x in self.next_state]):
            self.reward = sum([x for x in self.next_state if x < 0])
        else:
            self.reward = 1
            
        #       
        #     
        #       (    ),
        #      
        if self.time_tracker >= 3:
            remembered_state = self.remembered_states.pop(0)
            self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
        else:
            self.next_state = [max(x, 0) for x in self.next_state]
        
        
        #     30 
        self.done = self.time_tracker == 30
        
        #      
        return [self.state, self.reward, self.done, self.next_state]
  
  #   
  def reset(self):
    #      
    self.state = [0, 0, 0]
    self.next_state = [0, 0, 0]
    self.done = False
    self.reward = 0
    self.time_tracker = 0
    
    self.remembered_states = []
    
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
    
    #   
    return self.state
  
  #     :
  #      
  def render(self, mode='human', close=False):
    print('-'*20)
    print('First shop')
    print('Pies:', self.state[0])

    print('Second shop')
    print('Pies:', self.state[1])

    print('Third shop')
    print('Pies:', self.state[2])
    print('-'*20)
    print('')


主要进口



import numpy as np #  
import pandas as pd #  
import gym #  
import gym_shops #    
from tqdm import tqdm #   

#  
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()

#  
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #   


定义代理



class DQLAgent(): 
    
    def __init__(self, env):
        #    
       
        self.state_size = 3 #    
        self.action_size = 6 #    
        
        #    replay()
        self.gamma = 0.99
        self.learning_rate = 0.01
        
        #    adaptiveEGreedy()
        self.epsilon = 0.99
        self.epsilon_decay = 0.99
        self.epsilon_min = 0.0001
        
        self.memory = deque(maxlen = 5000) #   5000  ,    -   
        
        #   (NN)
        self.model = self.build_model()
    
    #      Deep Q Learning
    def build_model(self):
        model = Sequential()
        model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #   
        model.add(Dense(50, activation = 'sigmoid')) #  
        model.add(Dense(10, activation = 'sigmoid')) #  
        model.add(Dense(self.action_size, activation = 'sigmoid')) #  
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model
    
    #    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    #   
    def act(self, state):
        #     0  1  epsilon
        #     (exploration)
        if random.uniform(0,1) <= self.epsilon:
            return random.choice(range(6))
        else:
            #          
            act_values = self.model.predict(state)
            return np.argmax(act_values[0])
            
    
    #     
    def replay(self, batch_size):
        
        #   ,        
        if len(self.memory) < batch_size:
            return
        
        minibatch = random.sample(self.memory, batch_size) #  batch_size    
        #     
        for state, action, reward, next_state, done in minibatch:
            if done: #    -      
                target = reward
            else:
                #       
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0]) 
                # target = R(s,a) + gamma * max Q`(s`,a`)
                # target (max Q` value)     ,   s`  
            train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
            train_target[0][action] = target
            self.model.fit(state, train_target, verbose = 0)
    
    #    exploration rate,
    #   epsilon
    def adaptiveEGreedy(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay




培训代理商



#  gym   
env = gym.make('shops-v0')
agent = DQLAgent(env)

#   
batch_size = 100
episodes = 1000

#  
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
    #  
    state = env.reset()
    state = np.reshape(state, [1, 3])

    #    , id       
    time = 0
    taken_actions = []
    sum_rewards = 0


    #   
    while True:
        #  
        action = agent.act(state)

        #  
        taken_actions.append(action)

        #     
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, 3])

        #     
        sum_rewards += reward

        #   
        agent.remember(state, action, reward, next_state, done)

        #    
        state = next_state

        #  replay
        agent.replay(batch_size)

        #  epsilon
        agent.adaptiveEGreedy()

        #   
        time += 1

        #   
        progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)

        #     
        if done:
            #       
            clear_output(wait=True)
            sns.distplot(taken_actions, color="y")
            plt.title('Episode: ' + str(e))
            plt.xlabel('Action number')
            plt.ylabel('Occurrence in %')
            plt.show()
            break






测试代理



import time
trained_model = agent  #     
state = env.reset()  #  
state = np.reshape(state, [1,3])

#        
time_t = 0
MAX_EPISOD_LENGTH = 1000  #   
taken_actions = []
mean_reward = 0

#   
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
    #     
    action = trained_model.act(state)
    next_state, reward, done, _ = env.step(action)
    next_state = np.reshape(next_state, [1,3])
    state = next_state
    taken_actions.append(action)

    #   
    clear_output(wait=True)
    env.render()
    progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
    print('Reward:', round(env.reward, 3))
    time.sleep(0.5)

    mean_reward += env.reward

    if done:
        break

#    
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()    








因此,代理迅速了解如何最有利地采取行动。



总的来说,仍有很大的试验空间:您可以增加商店数量,使操作多样化,甚至只是更改训练模型的超参数-这仅仅是列表的开始。



资料来源






All Articles