介绍
有一次,在阅读《强化学习:入门》一书时,我曾考虑过用实用知识来补充我的理论知识,但是却没有解决下一个平衡杠铃,教经纪人下棋或发明另一辆自行车的问题。
同时,这本书包含了一个优化客户队列的有趣示例,一方面在实现/理解流程方面并不太复杂,另一方面,这也很有趣并且可以在现实生活中成功实现。
在稍微更改了此示例后,我想到了这个想法,将对此进行进一步讨论。
问题的提法
因此,请想象一下以下图片:
我们拥有一家面包店,该面包店每天生产6吨(有条件)覆盆子派,并将这些产品每天分配给三个商店。
但是,如何最好地做到这一点,以使过期产品越少越好(前提是馅饼的保质期为三天),如果我们在每个销售点只有三辆分别载有1吨,2吨和3吨的卡车,那么这是最有利可图的仅发送一辆卡车(因为它们彼此之间的距离足够远),而且,每天仅在烘烤馅饼后才发送一次,此外,我们不知道商店的购买力(因为业务刚刚开始)?
我们同意FIFO布局策略在商店中的效果很好,在该商店中,顾客只拿到比其他顾客晚生产的商品,但是如果没有在三天内购买覆盆子派,商店的工作人员就会摆脱它。
我们(有条件地)不知道特定商店在特定日期对馅饼的需求量,但是,在我们的模拟中,我们为三个商店中的每一个设置如下:3±0.1、1±0.1、2±0.1。
显然,对我们来说,最有利可图的选择是分别向第一家商店发送三吨,向第二家商店发送三吨,向第三家商店发送两吨馅饼。
为了解决这个问题,我们使用了自定义的体育馆环境以及Deep Q Learning(Keras实现)。
自定义环境
环境状态将用三个真实的正数来描述-三个商店中每一个商店当天剩余的产品。代理的动作是从0到5(含0和5)的数字,表示整数1、2和3的排列索引。很显然,最有益的动作将在第4个索引(3、1、2)之下。在30天的一集中,我们认为该任务是情节性的。
import gym
from gym import error, spaces, utils
from gym.utils import seeding
import itertools
import random
import time
class ShopsEnv(gym.Env):
metadata = {'render.modes': ['human']}
# ,
#
def __init__(self):
self.state = [0, 0, 0] #
self.next_state = [0, 0, 0] #
self.done = False #
self.actions = list(itertools.permutations([1, 2, 3])) #
self.reward = 0 #
self.time_tracker = 0 #
self.remembered_states = [] #
#
t = int( time.time() * 1000.0 )
random.seed( ((t & 0xff000000) >> 24) +
((t & 0x00ff0000) >> 8) +
((t & 0x0000ff00) << 8) +
((t & 0x000000ff) << 24) )
# ()
def step(self, action_num):
#
if self.done:
return [self.state, self.reward, self.done, self.next_state]
else:
#
self.state = self.next_state
#
self.remembered_states.append(self.state)
#
self.time_tracker += 1
#
action = self.actions[action_num]
# , ( )
self.next_state = [x + y for x, y in zip(action, self.state)]
#
self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
#
if any([x < 0 for x in self.next_state]):
self.reward = sum([x for x in self.next_state if x < 0])
else:
self.reward = 1
#
#
# ( ),
#
if self.time_tracker >= 3:
remembered_state = self.remembered_states.pop(0)
self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
else:
self.next_state = [max(x, 0) for x in self.next_state]
# 30
self.done = self.time_tracker == 30
#
return [self.state, self.reward, self.done, self.next_state]
#
def reset(self):
#
self.state = [0, 0, 0]
self.next_state = [0, 0, 0]
self.done = False
self.reward = 0
self.time_tracker = 0
self.remembered_states = []
t = int( time.time() * 1000.0 )
random.seed( ((t & 0xff000000) >> 24) +
((t & 0x00ff0000) >> 8) +
((t & 0x0000ff00) << 8) +
((t & 0x000000ff) << 24) )
#
return self.state
# :
#
def render(self, mode='human', close=False):
print('-'*20)
print('First shop')
print('Pies:', self.state[0])
print('Second shop')
print('Pies:', self.state[1])
print('Third shop')
print('Pies:', self.state[2])
print('-'*20)
print('')
主要进口
import numpy as np #
import pandas as pd #
import gym #
import gym_shops #
from tqdm import tqdm #
#
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()
#
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #
定义代理
class DQLAgent():
def __init__(self, env):
#
self.state_size = 3 #
self.action_size = 6 #
# replay()
self.gamma = 0.99
self.learning_rate = 0.01
# adaptiveEGreedy()
self.epsilon = 0.99
self.epsilon_decay = 0.99
self.epsilon_min = 0.0001
self.memory = deque(maxlen = 5000) # 5000 , -
# (NN)
self.model = self.build_model()
# Deep Q Learning
def build_model(self):
model = Sequential()
model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #
model.add(Dense(50, activation = 'sigmoid')) #
model.add(Dense(10, activation = 'sigmoid')) #
model.add(Dense(self.action_size, activation = 'sigmoid')) #
model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
return model
#
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
#
def act(self, state):
# 0 1 epsilon
# (exploration)
if random.uniform(0,1) <= self.epsilon:
return random.choice(range(6))
else:
#
act_values = self.model.predict(state)
return np.argmax(act_values[0])
#
def replay(self, batch_size):
# ,
if len(self.memory) < batch_size:
return
minibatch = random.sample(self.memory, batch_size) # batch_size
#
for state, action, reward, next_state, done in minibatch:
if done: # -
target = reward
else:
#
target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
# target = R(s,a) + gamma * max Q`(s`,a`)
# target (max Q` value) , s`
train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
train_target[0][action] = target
self.model.fit(state, train_target, verbose = 0)
# exploration rate,
# epsilon
def adaptiveEGreedy(self):
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
培训代理商
# gym
env = gym.make('shops-v0')
agent = DQLAgent(env)
#
batch_size = 100
episodes = 1000
#
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
#
state = env.reset()
state = np.reshape(state, [1, 3])
# , id
time = 0
taken_actions = []
sum_rewards = 0
#
while True:
#
action = agent.act(state)
#
taken_actions.append(action)
#
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 3])
#
sum_rewards += reward
#
agent.remember(state, action, reward, next_state, done)
#
state = next_state
# replay
agent.replay(batch_size)
# epsilon
agent.adaptiveEGreedy()
#
time += 1
#
progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)
#
if done:
#
clear_output(wait=True)
sns.distplot(taken_actions, color="y")
plt.title('Episode: ' + str(e))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()
break
测试代理
import time
trained_model = agent #
state = env.reset() #
state = np.reshape(state, [1,3])
#
time_t = 0
MAX_EPISOD_LENGTH = 1000 #
taken_actions = []
mean_reward = 0
#
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
#
action = trained_model.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1,3])
state = next_state
taken_actions.append(action)
#
clear_output(wait=True)
env.render()
progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
print('Reward:', round(env.reward, 3))
time.sleep(0.5)
mean_reward += env.reward
if done:
break
#
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()
总
因此,代理迅速了解如何最有利地采取行动。
总的来说,仍有很大的试验空间:您可以增加商店数量,使操作多样化,甚至只是更改训练模型的超参数-这仅仅是列表的开始。