无人驾驶的出租车在城市周围开着黄色的橡皮鸭!Gym-Duckietown平台的问题检查模块

分析人士说,到2040年,世界上大多数主要城市将无人驾驶汽车。但是要在20年后的道路上放松,我们需要在自动驾驶算法方面做一些出色的工作。为此,麻省理工学院开发了Duckietown平台,该平台使您可以以最低的成本进行此操作。在达奇敦(Duckietown),低成本的移动机器人按照城市的缩小模型运输黄色的橡皮鸭。在此平台的基础上,举办了AI驾驶奥运会,并在大学开设了有关人工智能技术在无人驾驶汽车管理中的应用的课程。



在本文中,我将讨论我的课程项目,该项目是我与移动机器人算法实验室合作完成的 JetBrains研究:关于我为Gym-Duckietown模拟器编写的问题检查器我们将讨论测试系统以及该系统与使用External Grader技术的在线教育平台(例如与Stepik.org平台)的集成









关于作者



我的名字叫达尼尔·普鲁申科(Daniil Plushenko),我是圣彼得堡HSE硕士课程“编程和数据分析的第一年(第二年)学生在2019年,我在同一所大学完成了应用数学和计算机科学的学士学位



Duckietown平台



Duckietown自动驾驶汽车研究项目。该项目的组织者创建了一个平台,该平台有助于实施一种新的方法来学习人工智能和机器人技术领域。这一切都始于2016年在麻省理工学院开设的一门课程,但后来逐渐遍及全球和不同层次的教育:从高中到硕士课程。



Duckietown平台分为两个部分。首先,它是具有道路,建筑物,道路标志和障碍物的城市交通环境的缩小模型。其次,是运输。运行Raspberry Pi的小型移动机器人(Duckiebots)通过照相机接收有关周围世界的信息,并沿着道路运输该市居民-黄色橡皮鸭。







我与Duckietown模拟器一起工作。叫做Gym-Duckietown是一个用Python编写的开源项目。仿真器将您的机器人放置在城市中,根据您使用的算法(或按下的按钮)更改其位置,重新绘制图片并将该机器人的当前位置写入日志。  



如果您有兴趣尝试,我建议您自己克隆存储库并运行manual_control.py:通过这种方式,可以使用键盘上的箭头控制机器人。



模拟器



屏幕截图模拟器可以用作执行任务的环境。让我们设置以下问题:在包含一条车道的给定地图上,您需要沿直线行驶一米。







要解决此问题,可以使用以下算法:



for _ in range(25):
    env.step([1, 0])
    env.render()


此变量env存储环境的状态。

step方法将输入action:描述机器人动作的两个元素的列表。第一个元素设置速度,第二个元素设置旋转角度。该方法render考虑到机器人的新位置来重绘图片。凭经验选择步数和速度:这些是机器人在直线上精确行进一米所需的值。



如果您想在manual_control.py中使用此代码段,则将其粘贴到此处到目前为止的代码正忙于加载环境。为简单起见,您可以重用它,然后将上述解决方案添加到该问题中。



测试系统



我希望能够自动检查此类任务:实现机器人控制算法,模拟行程并报告所提出的算法是否正确执行了任务。这种测试系统将有可能在准备进行自动运输管理竞赛以及出于教育目的时使用环境:向学生发出一系列问题并自动检查其解决方案。我在参加课程项目时就参与了它的开发,下面我将告诉您我得到了什么。



检查解决方案时的步骤顺序如下:







您可以自己向测试系统添加任务,也可以参考我所做的任务。每个任务都有一个条件生成器-一个生成环境状态的类。它包含地图名称和机器人在起始单元格内的起始位置。还设置目标坐标:在这种情况下,它是距起始位置一米的点。



class Ride1MTaskGenerator(TaskGenerator):
    def __init__(self, args):
        super().__init__(args)

    def generate_task(self):
        env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
        env = env_loader(
            map_name="straight_road",
            position_on_initial_road_tile=PositionOnInitialRoadTile(
                x_coefficient=0.5,
                z_coefficient=0.5,
                angle=0,
            ),
        )
        self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
        self.generated_task['env'] = env
        env.render()
        return self.generated_task


这里TrackingDuckietownEnvCVTaskEnv是包装器类,用于存储有关行程的信息以进行进一步分析。 



class TrackingDuckietownEnv:
    def __init__(self, **kwargs):
        self.__wrapped = DuckietownEnv(**kwargs)
    def step(self, action):
        obs, reward, done, misc = self.__wrapped.step(action)
        message = misc['Simulator']['msg']
        if 'invalid pose' in message.lower():
            raise InvalidPoseException(message)
        for t in self.trackers:
            t.track(self)
        return obs, reward, done, misc


跟踪器收集有关当前状态的信息,例如机器人的位置。



CVTaskEnv如果仅需要使用摄像机提供的信息(“计算机视觉”)而不使用仿真器的功能(例如,如果您需要知道机器人距离条带中心的距离或最近的可见对象的位置),则可以使用它。CVTaskEnv调用仿真器函数可以简化问题的解决方案,并且该类限制了仿真器方法的调用。显示标志时使用is_cv_task。 



class CVTaskEnv:
    def __init__(self, **kwargs):
        self.__wrapped = TrackingDuckietownEnv(**kwargs)

    def __getattr__(self, item):
        if item in self.__wrapped.overriden_methods:
            return self.__wrapped.__getattribute__(item)
        ALLOWED_FOR_CV_TASKS = [
            'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
            'road_tile_size', 'trip_statistics'
        ]
        if item in ALLOWED_FOR_CV_TASKS:
            return self.__wrapped.__getattr__(item)
        else:
            raise AttributeError(item + " call is not allowed in CV tasks")


决策执行完成后(假设尚未被超时中断),行程信息将通过一系列检查器传递。如果所有检查程序均已成功工作,则认为该问题已正确解决。否则,将显示一个解释性结论-例如,机器人摔倒,开车离开道路等。



这是标准检查器之一。他检查旅程结束后机器人是否已返回起点。例如,在任务中需要先到达某个点然后返回时,此功能很有用。



class SameInitialAndFinalCoordinatesChecker(Checker):
    def __init__(self, maximum_deviation=0.1, **kwargs):
        super().__init__(**kwargs)
        self.maximum_deviation = maximum_deviation

    def check(self, generated_task, trackers, **kwargs):
        trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
        trip_data = trip_statistics.trip_data
        if len(trip_data) == 0:
            return True
        initial_coordinates = trip_data[0].position.coordinates
        final_coordinates = trip_data[-1].position.coordinates
        return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation


这样的测试系统可以在手动模式下使用,即,手动开始测试,然后直观地研究判决。例如,如果我们想在Stepik.org上开设有关自动驾驶的在线课程,则需要与该平台集成。这将在本文的下一部分中讨论。



与在线平台整合



对于测试任务,经常使用由edX平台开发的External Grader技术



使用外部评分器时,教育平台不会自行检查任务,而是生成发送到另一台设备的程序包队列。队列连接功能在xqueue-watcher项目中实现。 Xqueue-watcher提取包裹,然后由验证器对其进行测试(与文本/数字比较相比,验证器通常执行更多的重要操作)之后,验证结论将发送回教育平台一侧。



让我们更详细地考虑连接到队列的时刻。教育平台提供连接数据后,需要将它们添加到配置文件,并使用等级方法直接实施验证启动。可以在此处此处找到更详细的说明



Xqueue-watcher调用端点get_submission,如果可能,它将从队列中检索包。在那之后,她去测试。然后,xqueue-watcher调用put_result返回判决。



您可以像这样启动xqueue-watcher:



make requirements && python -m xqueue_watcher -d conf.d/


假设我们要使用External Grader技术,但不想在在线平台上运行课程。Xqueue-watcher的实现是基于存在一些文件存储的假设,其中上传了带有解决方案的文件(平台具有这种存储)。我们可以修改Xqueue,以便不再需要文件存储,并且通常甚至在我们的笔记本电脑上也可以运行这样的系统。



首先,您需要学习如何维护包裹队列本身。队列功能由xqueue项目提供





图片取自文档



您可以这样运行它:



apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address


您可能需要创建一个文件〜/ edx / edx.log



默认情况下,xqueue不会给xqueue-watcher软件包内容,即提供解决问题的文件,而是链接到文件存储中的这些文件。为了独立于文件存储,您可以使文件本身发送并存储在xqueue-watcher运行所在的同一台计算机上。 



以下是修改源代码以实现此目的的



方法lms_interface.py中_upload方法的实现已被替换为:



def _upload(file_to_upload, path, name):
    '''
    Upload file using the provided keyname.
    Returns:
        URL to access uploaded file
    '''
    full_path = os.path.join(path, name)
    return default_storage.save(full_path, file_to_upload)


如果未连接任何文件存储,则此方法会将解决方案将文件保存到路径$ queue_name / $ file_to_upload_hash。



在ext_interface.py文件中的get_sumbission的实现中,代替此行,请编写:



xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
    with open(xqueue_files[xqueue_file], 'r') as f:
        xqueue_files[xqueue_file] = f.readlines()


让我们传输的不是文件的链接(路径),而是文件的内容。



每个解决方案在资源有限的“一次性” docker容器中执行,即,为执行每个解决方案创建一个单独的容器,测试结束后将其删除。为了创建这样的容器并在其中执行命令,使用了portainer-api(实际上,作为Docker API的包装)。



结果



在本文中,我讨论了如何创建自动运输测试系统和任务,以及该系统与使用External Grader技术的在线教育平台的集成。我希望不久将启动使用该系统的课程,并且与在线平台集成的部分对希望创建自己的离线或在线课程的用户很有用。



All Articles