Python还能实现哪些AI游戏?附上代码一起来一把!

  • 最新
  • 精选
  • 区块链
  • 汽车
  • 创意科技
  • 媒体达人
  • 电影音乐
  • 娱乐休闲
  • 生活旅行
  • 学习工具
  • 历史读书
  • 金融理财
  • 美食菜谱

Python 还能实现哪些 AI 游戏?附上代码一起来一把!

AI数据派 THU数据派 2020-06-18


来源:AI科技大本营

作者:李秋键

责编:Carol

本文约4562字,建议阅读10分钟
以DQN算法为例介绍如何用Python 实现AI 游戏,赶快来试一下吧。

人工智能作为当前热门在我们生活中得到了广泛应用,尤其是在智能游戏方面,有的已经达到了可以和职业选手匹敌的效果。而DQN算法作为智能游戏的经典选择算法,其主要是通过奖励惩罚机制来迭代模型,来达到更接近于人类学习的效果。
那在强化学习中, 神经网络是如何被训练的呢? 首先, 我们需要 a1, a2 正确的Q值, 这个 Q 值我们就用之前在 Q learning 中的 Q 现实来代替. 同样我们还需要一个Q估计来实现神经网络的更新. 所以神经网络的的参数就是老的NN参数加学习率 alpha乘以Q现实和Q估计的差距。
我们通过 NN 预测出Q(s2, a1) 和 Q(s2,a2) 的值, 这就是 Q 估计. 然后我们选取 Q 估计中最大值的动作来换取环境中的奖励 reward. 而 Q 现实中也包含从神经网络分析出来的两个 Q 估计值, 不过这个 Q 估计是针对于下一步在 s’ 的估计. 最后再通过刚刚所说的算法更新神经网络中的参数. 
DQN是第一个将深度学习模型与强化学习结合在一起从而成功地直接从高维的输入学习控制策略。
  • 创新点:

基于Q-Learning构造Loss Function(不算很新,过往使用线性和非线性函数拟合Q-Table时就是这样做)。
通过experience replay(经验池)解决相关性及非静态分布问题;
使用TargetNet解决稳定性问题。
  • 优点:

算法通用性,可玩不同游戏;
End-to-End 训练方式;
可生产大量样本供监督学习。
  • 缺点:

无法应用于连续动作控制;
只能处理只需短时记忆问题,无法处理需长时记忆问题(后续研究提出了使用LSTM等改进方法);
CNN不一定收敛,需精良调参。
整体的程序效果如下:
 实验前的准备
首先我们使用的python版本是3.6.5所用到的库有cv2库用来图像处理;
Numpy库用来矩阵运算;TensorFlow框架用来训练和加载模型。Collection库用于高性能的数据结构。
程序的搭建
1、游戏结构设定:
我们在DQN训练前需要有自己设定好的程序,即在这里为弹珠游戏。在游戏整体框架搭建完成后,对于计算机的决策方式我们需要给他一个初始化的决策算法为了达到更快的训练效果。
程序结构的部分代码如下:
def __init__(self):
        self.__initGame()
        # 初始化一些变量
        self.loseReward = -1
        self.winReward = 1
        self.hitReward = 0
        self.paddleSpeed = 15
        self.ballSpeed = (77)
        self.paddle_1_score = 0
        self.paddle_2_score = 0
        self.paddle_1_speed = 0.
        self.paddle_2_speed = 0.
        self.__reset()
    '''
    更新一帧
    action: [keep, up, down]
    '''

#     更新ball的位置
        self.ball_pos = self.ball_pos[0] + self.ballSpeed[0], self.ball_pos[1] + self.ballSpeed[1]
        # 获取当前场景(只取左半边)
        image = pygame.surfarray.array3d(pygame.display.get_surface())
        # image = image[321:, :]
        pygame.display.update()
        terminal = False
        if max(self.paddle_1_score, self.paddle_2_score) >= 20:
            self.paddle_1_score = 0
            self.paddle_2_score = 0
            terminal = True
        return image, reward, terminal
def update_frame(self, action):
        assert len(action) == 3
        pygame.event.pump()
        reward = 0
        # 绑定一些对象
        self.score1Render = self.font.render(str(self.paddle_1_score), True, (255255255))
        self.score2Render = self.font.render(str(self.paddle_2_score), True, (255255255))
        self.screen.blit(self.background, (00))
        pygame.draw.rect(self.screen, (255255255), pygame.Rect((55), (630470)), 2)
        pygame.draw.aaline(self.screen, (255255255), (3205), (320475))
        self.screen.blit(self.paddle_1, self.paddle_1_pos)
        self.screen.blit(self.paddle_2, self.paddle_2_pos)
        self.screen.blit(self.ball, self.ball_pos)
        self.screen.blit(self.score1Render, (240210))
        self.screen.blit(self.score2Render, (370210))
'''
    游戏初始化
    '''

    def __initGame(self):
        pygame.init()
        self.screen = pygame.display.set_mode((640480), 032)
        self.background = pygame.Surface((640480)).convert()
        self.background.fill((000))
        self.paddle_1 = pygame.Surface((1050)).convert()
        self.paddle_1.fill((0255255))
        self.paddle_2 = pygame.Surface((1050)).convert()
        self.paddle_2.fill((2552550))
        ball_surface = pygame.Surface((1515))
        pygame.draw.circle(ball_surface, (255255255), (77), (7))
        self.ball = ball_surface.convert()
        self.ball.set_colorkey((000))
        self.font = pygame.font.SysFont("calibri"40)
    '''
    重置球和球拍的位置
    '''

    def __reset(self):
        self.paddle_1_pos = (10., 215.)
        self.paddle_2_pos = (620., 215.)
        self.ball_pos = (312.5232.5)
2、行动决策机制:
首先在程序框架中设定不同的行动作为训练对象
# 行动paddle_1(训练对象)
if action[0] == 1:
    self.paddle_1_speed = 0
elif action[1] == 1:
    self.paddle_1_speed = -self.paddleSpeed
elif action[2] == 1:
    self.paddle_1_speed = self.paddleSpeed
self.paddle_1_pos = self.paddle_1_pos[0], max(min(self.paddle_1_speed + self.paddle_1_pos[1], 420), 10)
接着设置一个简单的初始化决策。根据结果判断奖励和惩罚机制,即球撞到拍上奖励,撞到墙上等等惩罚:
其中代码如下:
# 行动paddle_2(设置一个简单的算法使paddle_2的表现较优, 非训练对象)
        if self.ball_pos[0] >= 305.:
            if not self.paddle_2_pos[1] == self.ball_pos[1] + 7.5:
                if self.paddle_2_pos[1] self.ball_pos[1] + 7.5:
                    self.paddle_2_speed = self.paddleSpeed
                    self.paddle_2_pos = self.paddle_2_pos[0], max(min(self.paddle_2_pos[1] + self.paddle_2_speed, 420), 10)
                if self.paddle_2_pos[1] > self.ball_pos[1] - 42.5:
                    self.paddle_2_speed = -self.paddleSpeed
                    self.paddle_2_pos = self.paddle_2_pos[0], max(min(self.paddle_2_pos[1] + self.paddle_2_speed, 420), 10)
            else:
                self.paddle_2_pos = self.paddle_2_pos[0], max(min(self.paddle_2_pos[1] + 7.5420), 10)
        # 行动ball
        #   球撞拍上
        if self.ball_pos[0] self.paddle_1_pos[0] + 10.:
            if self.ball_pos[1] + 7.5 >= self.paddle_1_pos[1and self.ball_pos[1] self.paddle_1_pos[1] + 42.5:
                self.ball_pos = 20.self.ball_pos[1]
                self.ballSpeed = -self.ballSpeed[0], self.ballSpeed[1]
                reward = self.hitReward
        if self.ball_pos[0] + 15 >= self.paddle_2_pos[0]:
            if self.ball_pos[1] + 7.5 >= self.paddle_2_pos[1and self.ball_pos[1] self.paddle_2_pos[1] + 42.5:
                self.ball_pos = 605.self.ball_pos[1]
                self.ballSpeed = -self.ballSpeed[0], self.ballSpeed[1]
        #   拍未接到球(另外一个拍得分)
        if self.ball_pos[0] 5.:
            self.paddle_2_score += 1
            reward = self.loseReward
            self.__reset()
        elif self.ball_pos[0] > 620.:
            self.paddle_1_score += 1
            reward = self.winReward
            self.__reset()
        #   球撞墙上
        if self.ball_pos[1] 10.:
            self.ballSpeed = self.ballSpeed[0], -self.ballSpeed[1]
            self.ball_pos = self.ball_pos[0], 10
        elif self.ball_pos[1] >= 455:
            self.ballSpeed = self.ballSpeed[0], -self.ballSpeed[1]
            self.ball_pos = self.ball_pos[0], 455
3、DQN算法搭建:
为了方便整体算法的调用,我们首先定义神经网络的函数,包括卷积层损失等函数定义具体如下可见:
'''
    获得初始化weight权重
    '''

    def init_weight_variable(self, shape):
        return tf.Variable(tf.truncated_normal(shape, stddev=0.01))
    '''
    获得初始化bias权重
    '''

    def init_bias_variable(self, shape):
        return tf.Variable(tf.constant(0.01, shape=shape))
    '''
    卷积层
    '''

    def conv2D(self, x, W, stride):
        return tf.nn.conv2d(x, W, strides=[1, stride, stride, 1], padding="SAME")
    '''
    池化层
    '''

    def maxpool(self, x):
        return tf.nn.max_pool(x, ksize=[1221], strides=[1221], padding='SAME')
    '''
    计算损失
    '''

    def compute_loss(self, q_values, action_now, target_q_values):
        tmp = tf.reduce_sum(tf.multiply(q_values, action_now), reduction_indices=1)
        loss = tf.reduce_mean(tf.square(target_q_values - tmp))
        return loss
    '''
    下一帧
    '''

    def next_frame(self, action_now, scene_now, gameState):
        x_now, reward, terminal = gameState.update_frame(action_now)
        x_now = cv2.cvtColor(cv2.resize(x_now, (8080)), cv2.COLOR_BGR2GRAY)
        _, x_now = cv2.threshold(x_now, 127255, cv2.THRESH_BINARY)
        x_now = np.reshape(x_now, (80801))
        scene_next = np.append(x_now, scene_now[:, :, 0:3], axis=2)
        return scene_next, reward, terminal
    '''
    计算target_q_values
    '''

    def compute_target_q_values(self, reward_batch, q_values_batch, minibatch):
        target_q_values = []
        for i in range(len(minibatch)):
            if minibatch[i][4]:
                target_q_values.append(reward_batch[i])
            else:
                target_q_values.append(reward_batch[i] + self.gamma * np.max(q_values_batch[i]))
        return target_q_values
然后定义整体的类变量DQN,分别定义初始化和训练函数,其中网络层哪里主要就是神经网络层的调用。然后在训练函数里面记录当前动作和数据加载入优化器中达到模型训练效果。
其中代码如下:
def __init__(self, options):
        self.options = options
        self.num_action = options['num_action']
        self.lr = options['lr']
        self.modelDir = options['modelDir']
        self.init_prob = options['init_prob']
        self.end_prob = options['end_prob']
        self.OBSERVE = options['OBSERVE']
        self.EXPLORE = options['EXPLORE']
        self.action_interval = options['action_interval']
        self.REPLAY_MEMORY = options['REPLAY_MEMORY']
        self.gamma = options['gamma']
        self.batch_size = options['batch_size']
        self.save_interval = options['save_interval']
        self.logfile = options['logfile']
        self.is_train = options['is_train']
    '''
    训练网络
    '''

    def train(self, session):
        x, q_values_ph = self.create_network()
        action_now_ph = tf.placeholder('float', [None, self.num_action])
        target_q_values_ph = tf.placeholder('float', [None])
        # 计算loss
        loss = self.compute_loss(q_values_ph, action_now_ph, target_q_values_ph)
        # 优化目标
        trainStep = tf.train.AdamOptimizer(self.lr).minimize(loss)
        # 游戏
        gameState = PongGame()
        # 用于记录数据
        dataDeque = deque()
        # 当前的动作
        action_now = np.zeros(self.num_action)
        action_now[0] = 1
        # 初始化游戏状态
        x_now, reward, terminal = gameState.update_frame(action_now)
        x_now = cv2.cvtColor(cv2.resize(x_now, (8080)), cv2.COLOR_BGR2GRAY)
        _, x_now = cv2.threshold(x_now, 127255, cv2.THRESH_BINARY)
        scene_now = np.stack((x_now, )*4, axis=2)
        # 读取和保存checkpoint
        saver = tf.train.Saver()
        session.run(tf.global_variables_initializer())
        checkpoint = tf.train.get_checkpoint_state(self.modelDir)
        if checkpoint and checkpoint.model_checkpoint_path:
            saver.restore(session, checkpoint.model_checkpoint_path)
            print('[INFO]: Load %s successfully...' % checkpoint.model_checkpoint_path)
        else:
            print('[INFO]: No weights found, start to train a new model...')
        prob = self.init_prob
        num_frame = 0
        logF = open(self.logfile, 'a')
        while True:
            q_values = q_values_ph.eval(feed_dict={x: [scene_now]})
            action_idx = get_action_idx(q_values=q_values, 
                                        prob=prob, 
                                        num_frame=num_frame, 
                                        OBSERVE=self.OBSERVE, 
                                        num_action=self.num_action)
            action_now = np.zeros(self.num_action)
            action_now[action_idx] = 1
            prob = down_prob(prob=prob, 
                             num_frame=num_frame, 
                             OBSERVE=self.OBSERVE, 
                             EXPLORE=self.EXPLORE, 
                             init_prob=self.init_prob, 
                             end_prob=self.end_prob)
            for _ in range(self.action_interval):
                scene_next, reward, terminal = self.next_frame(action_now=action_now, 
                                                               scene_now=scene_now,                                                            gameState=gameState)
                scene_now = scene_next
                dataDeque.append((scene_now, action_now, reward, scene_next, terminal))
                if len(dataDeque) > self.REPLAY_MEMORY:
                    dataDeque.popleft()
            loss_now = None
            if (num_frame > self.OBSERVE):
                minibatch = random.sample(dataDeque, self.batch_size)
                scene_now_batch = [mb[0for mb in minibatch]
                action_batch = [mb[1for mb in minibatch]
                reward_batch = [mb[2for mb in minibatch]
                scene_next_batch = [mb[3for mb in minibatch]
                q_values_batch = q_values_ph.eval(feed_dict={x: scene_next_batch})
                target_q_values = self.compute_target_q_values(reward_batch, q_values_batch, minibatch)
                trainStep.run(feed_dict={
                                            target_q_values_ph: target_q_values,
                                            action_now_ph: action_batch,
                                            x: scene_now_batch
                                            })
                loss_now = session.run(loss, feed_dict={
                                                        target_q_values_ph: target_q_values,
                                                        action_now_ph: action_batch,
                                                        x: scene_now_batch
                                                        })
            num_frame += 1
            if num_frame % self.save_interval == 0:
                name = 'DQN_Pong'
                saver.save(session, os.path.join(self.modelDir, name), global_step=num_frame)
            log_content = ': %s, : %s, : %s, : %s, : %s, : %s' % (str(num_frame), str(prob), str(action_idx), str(reward), str(np.max(q_values)), str(loss_now))
            logF.write(log_content + '\n')
            print(log_content)
        logF.close()
    '''
    创建网络
    '''

    def create_network(self):
        '''
        W_conv1 = self.init_weight_variable([9, 9, 4, 16])
        b_conv1 = self.init_bias_variable([16])
        W_conv2 = self.init_weight_variable([7, 7, 16, 32])
        b_conv2 = self.init_bias_variable([32])
        W_conv3 = self.init_weight_variable([5, 5, 32, 32])
        b_conv3 = self.init_bias_variable([32])
        W_conv4 = self.init_weight_variable([5, 5, 32, 64])
        b_conv4 = self.init_bias_variable([64])
        W_conv5 = self.init_weight_variable([3, 3, 64, 64])
        b_conv5 = self.init_bias_variable([64])
        '''

        W_conv1 = self.init_weight_variable([88432])
        b_conv1 = self.init_bias_variable([32])
        W_conv2 = self.init_weight_variable([443264])
        b_conv2 = self.init_bias_variable([64])
        W_conv3 = self.init_weight_variable([336464])
        b_conv3 = self.init_bias_variable([64])
        # 5 * 5 * 64 = 1600
        W_fc1 = self.init_weight_variable([1600512])
        b_fc1 = self.init_bias_variable([512])
        W_fc2 = self.init_weight_variable([512self.num_action])
        b_fc2 = self.init_bias_variable([self.num_action])
        # input placeholder
        x = tf.placeholder('float', [None, 80804])
        '''
        conv1 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(x, W_conv1, 4) + b_conv1, training=self.is_train, momentum=0.9))
        conv2 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv1, W_conv2, 2) + b_conv2, training=self.is_train, momentum=0.9))
        conv3 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv2, W_conv3, 2) + b_conv3, training=self.is_train, momentum=0.9))
        conv4 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv3, W_conv4, 1) + b_conv4, training=self.is_train, momentum=0.9))
        conv5 = tf.nn.relu(tf.layers.batch_normalization(self.conv2D(conv4, W_conv5, 1) + b_conv5, training=self.is_train, momentum=0.9))
        flatten = tf.reshape(conv5, [-1, 1600])
        '''

        conv1 = tf.nn.relu(self.conv2D(x, W_conv1, 4) + b_conv1)
        pool1 = self.maxpool(conv1)
        conv2 = tf.nn.relu(self.conv2D(pool1, W_conv2, 2) + b_conv2)
        conv3 = tf.nn.relu(self.conv2D(conv2, W_conv3, 1) + b_conv3)
        flatten = tf.reshape(conv3, [-11600])
        fc1 = tf.nn.relu(tf.layers.batch_normalization(tf.matmul(flatten, W_fc1) + b_fc1, training=self.is_train, momentum=0.9))
        fc2 = tf.matmul(fc1, W_fc2) + b_fc2
        return x, fc2
到这里,我们整体的程序就搭建完成,下面为我们程序的运行结果: 
源码地址:
https://pan.baidu.com/s/1ksvjIiQ0BfXOah4PIE1arg
提取码:p74p
作者简介:
李秋键,CSDN博客专家,CSDN达人课作者。硕士在读于中国矿业大学,开发有taptap竞赛获奖等等。


——END——


    前往看一看

    看一看入口已关闭

    在“设置”-“通用”-“发现页管理”打开“看一看”入口

    我知道了

    已发送

    发送到看一看

    发送中

    微信扫一扫
    使用小程序

    取消 允许

    取消 允许

    微信版本过低

    当前微信版本不支持该功能,请升级至最新版本。

    我知道了 前往更新

    确定删除回复吗?

    取消 删除

      知道了

      长按识别前往小程序

      本站仅按申请收录文章,版权归原作者所有
      如若侵权,请联系本站删除

      微信QQ空间新浪微博腾讯微博人人Twitter豆瓣百度贴吧

      觉得不错,分享给更多人看到

      THU数据派 微信二维码

      THU数据派 微信二维码

      THU数据派 最新文章

      Python 还能实现哪些 AI 游戏?附上代码一起来一把!  2020-06-18

      90后CEO姚颂:在创业中感悟清华情怀的理工男  2020-06-18

      杜克大学提出 AI 算法,拯救渣画质马赛克秒变高清  2020-06-17

      企业地方落地政策需求调研  2020-06-17

      非常强悍的 RabbitMQ 总结,写得真好!  2020-06-16

      Pandas 数据分析: 3 种方法实现一个实用小功能  2020-06-16

      Science点赞斯坦福团队AI视力测试系统,20个字母4步操作,快来在线试一下  2020-06-15

      推荐丨 618,这些人工智能优质课程限时特价  2020-06-15

      拔掉MacBookPro,用8GB树莓派4工作一天,体验原来是这样的  2020-06-14

      数学表达式一键变图,CMU开发实用工具Penrose,堪称图解界LaTeX(附链接)  2020-06-14

      (function(){ var bp = document.createElement('script'); var curProtocol = window.location.protocol.split(':')[0]; if (curProtocol === 'https') { bp.src = 'https://zz.bdstatic.com/linksubmit/push.js'; } else { bp.src = 'http://push.zhanzhang.baidu.com/push.js'; } var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(bp, s); })(); (function(){ var src = (document.location.protocol == "http:") ? "http://js.passport.qihucdn.com/11.0.1.js?ba34c9f41d18b62312e960833b3cb4ae":"https://jspassport.ssl.qhimg.com/11.0.1.js?ba34c9f41d18b62312e960833b3cb4ae"; document.write(''); })();