在Asyncio + Dependency Injector上监视守护程序-依赖项注入指南

嗨,



我是Dependency Injector的创建者这是Python的依赖项注入框架。



这是另一个使用Dependency Injector构建应用程序的教程。



今天,我想展示如何基于模块构建异步守护程序asyncio



该手册包括以下部分:



  1. 我们要建造什么?
  2. 工具检查
  3. 项目结构
  4. 准备环境
  5. 记录和配置
  6. 调度员
  7. 监控example.com
  8. 监控httpbin.org
  9. 测验
  10. 结论


可以在Github上找到完成的项目



首先,需要具备以下条件:



  • 的初步知识 asyncio
  • 了解依赖注入的原理


我们要建造什么?



我们将构建一个监视守护程序,该守护程序将监视对Web服务的访问。



守护程序每隔几秒钟就会将请求发送到example.comhttpbin.org收到响应后,它将以下数据写入日志:



  • 回应码
  • 响应的字节数
  • 完成请求所花费的时间






工具检查



我们将使用Dockerdocker-compose让我们检查一下它们是否已安装:



docker --version
docker-compose --version


输出应如下所示:



Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31


如果未安装Docker或docker-compose,则需要先安装它们。请遵循以下指南:





工具准备就绪。让我们继续进行项目结构。



项目结构



创建一个项目文件夹并转到它:



mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial


现在我们需要创建一个初始项目结构。按照以下结构创建文件和文件夹。目前所有文件均为空。我们稍后再填写。



初始项目结构:



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


初始项目结构已准备就绪。我们将在以下各节中对其进行扩展。



接下来,我们正在等待环境的准备。



准备环境



在本节中,我们将准备用于启动守护程序的环境。



首先,您需要定义依赖关系。我们将使用如下软件包:



  • dependency-injector -依赖注入框架
  • aiohttp -网络框架(我们只需要一个http客户端)
  • pyyaml -用于解析YAML文件的库,用于读取配置
  • pytest -测试框架
  • pytest-asyncio-用于测试 asyncio应用程序的帮助程序
  • pytest-cov -帮助库,用于测试测试的代码覆盖率


让我们在文件中添加以下几行requirements.txt



dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov


并在终端中执行:



pip install -r requirements.txt


接下来,我们创建Dockerfile它将描述构建和启动守护程序的过程。我们将其python:3.8-buster用作基本图像。



让我们在文件中添加以下几行Dockerfile



FROM python:3.8-buster

ENV PYTHONUNBUFFERED=1

WORKDIR /code
COPY . /code/

RUN apt-get install openssl \
 && pip install --upgrade pip \
 && pip install -r requirements.txt \
 && rm -rf ~/.cache

CMD ["python", "-m", "monitoringdaemon"]


最后一步是定义设置docker-compose



让我们在文件中添加以下几行docker-compose.yml



version: "3.7"

services:

  monitor:
    build: ./
    image: monitoring-daemon
    volumes:
      - "./:/code"


全部都准备好了。让我们开始构建映像,并检查环境配置是否正确。



让我们在终端中执行:



docker-compose build


构建过程可能需要几分钟。最后,您应该看到:



Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest


构建过程完成后,启动容器:



docker-compose up


你会看见:



Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0


环境已准备就绪。容器以code开头和结尾0



下一步是设置日志记录并读取配置文件。



记录和配置



在本节中,我们将配置日志记录并读取配置文件。



让我们从添加应用程序的主要部分开始-依赖容器(进一步就是容器)。该容器将包含应用程序的所有组件。



让我们添加前两个组件。这是配置对象,是用于配置日志记录的功能。



让我们编辑containers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )


在设置它们的值之前,我们使用了配置参数。这是提供者工作的原则Configuration



首先我们使用,然后设置值。



日志记录设置将包含在配置文件中。



让我们编辑config.yml



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"


现在让我们定义一个将启动守护程序的函数。她通常被称为main()它将创建一个容器。该容器将用于读取配置文件并调用日志记录设置功能。



让我们编辑__main__.py



"""Main module."""

from .containers import ApplicationContainer


def main() -> None:
    """Run the application."""
    container = ApplicationContainer()

    container.config.from_yaml('config.yml')
    container.configure_logging()


if __name__ == '__main__':
    main()


容器是应用程序中的第一个对象。它用于获取所有其他对象。


配置日志记录和读取已配置。在下一节中,我们将创建一个监视任务管理器。



调度员



现在该添加监视任务管理器了。



调度程序将包含监视任务列表并控制其执行。他将按照时间表执行每个任务。Monitor-用于监视任务的基类。要创建特定任务,您需要添加子类并实现方法check()





让我们为监视任务添加一个调度程序和一个基类。



让我们创建dispatcher.pymonitors.py在包中monitoringdaemon



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


让我们在文件中添加以下几行monitors.py



"""Monitors module."""

import logging


class Monitor:

    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)

    async def check(self) -> None:
        raise NotImplementedError()


并到文件dispatcher.py



""""Dispatcher module."""

import asyncio
import logging
import signal
import time
from typing import List

from .monitors import Monitor


class Dispatcher:

    def __init__(self, monitors: List[Monitor]) -> None:
        self._monitors = monitors
        self._monitor_tasks: List[asyncio.Task] = []
        self._logger = logging.getLogger(self.__class__.__name__)
        self._stopping = False

    def run(self) -> None:
        asyncio.run(self.start())

    async def start(self) -> None:
        self._logger.info('Starting up')

        for monitor in self._monitors:
            self._monitor_tasks.append(
                asyncio.create_task(self._run_monitor(monitor)),
            )

        asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
        asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)

        await asyncio.gather(*self._monitor_tasks, return_exceptions=True)

        self.stop()

    def stop(self) -> None:
        if self._stopping:
            return

        self._stopping = True

        self._logger.info('Shutting down')
        for task, monitor in zip(self._monitor_tasks, self._monitors):
            task.cancel()
        self._logger.info('Shutdown finished successfully')

    @staticmethod
    async def _run_monitor(monitor: Monitor) -> None:
        def _until_next(last: float) -> float:
            time_took = time.time() - last
            return monitor.check_every - time_took

        while True:
            time_start = time.time()

            try:
                await monitor.check()
            except asyncio.CancelledError:
                break
            except Exception:
                monitor.logger.exception('Error executing monitor check')

            await asyncio.sleep(_until_next(last=time_start))


调度程序需要添加到容器中。



让我们编辑containers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )


每个组件都添加到了容器中。


最后,我们需要更新功能main()我们将从容器中获取调度程序,并调用其方法run()



让我们编辑__main__.py



"""Main module."""

from .containers import ApplicationContainer


def main() -> None:
    """Run the application."""
    container = ApplicationContainer()

    container.config.from_yaml('config.yml')
    container.configure_logging()

    dispatcher = container.dispatcher()
    dispatcher.run()


if __name__ == '__main__':
    main()


现在让我们启动守护程序并测试其工作。



让我们在终端中执行:



docker-compose up


输出应如下所示:



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0


一切正常。由于没有监视任务,因此调度程序将启动和停止。



在本节结束时,我们的恶魔的骨骼已准备就绪。在下一节中,我们将添加第一个监视任务。



监控example.com



在本节中,我们将添加一个监视任务,该任务将监视对http://example.com的访问



我们将以一种新型的监视任务扩展类模型HttpMonitor



HttpMonitor这是一个儿童班Monitor我们将实现check()方法。它将发送HTTP请求并记录收到的响应。HTTP请求的详细信息将委托给类HttpClient





让我们先添加HttpClient



让我们http.py在包中创建一个文件monitoringdaemon



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


并添加以下几行:



"""Http client module."""

from aiohttp import ClientSession, ClientTimeout, ClientResponse


class HttpClient:

    async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
        async with ClientSession(timeout=ClientTimeout(timeout)) as session:
            async with session.request(method, url) as response:
                return response


接下来,您需要添加HttpClient到容器中。



让我们编辑containers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )


现在我们准备添加HttpMonitor让我们将其添加到模块中monitors



让我们编辑monitors.py



"""Monitors module."""

import logging
import time
from typing import Dict, Any

from .http import HttpClient


class Monitor:

    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)

    async def check(self) -> None:
        raise NotImplementedError()


class HttpMonitor(Monitor):

    def __init__(
            self,
            http_client: HttpClient,
            options: Dict[str, Any],
    ) -> None:
        self._client = http_client
        self._method = options.pop('method')
        self._url = options.pop('url')
        self._timeout = options.pop('timeout')
        super().__init__(check_every=options.pop('check_every'))

    @property
    def full_name(self) -> str:
        return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)

    async def check(self) -> None:
        time_start = time.time()

        response = await self._client.request(
            method=self._method,
            url=self._url,
            timeout=self._timeout,
        )

        time_end = time.time()
        time_took = time_end - time_start

        self.logger.info(
            'Response code: %s, content length: %s, request took: %s seconds',
            response.status,
            response.content_length,
            round(time_took, 3)
        )


我们都准备为http://example.com添加支票我们需要对容器进行两项更改:



  • 添加工厂example_monitor
  • 转移example_monitor到调度员。


让我们编辑containers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
        ),
    )


提供程序example_monitor取决于配置值。让我们添加这些值:



编辑config.yml



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

monitors:

  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5


全部都准备好了。我们启动守护程序并检查工作。



我们在终端执行:



docker-compose up


我们看到类似的结论:



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.067 seconds
monitor_1  |
monitor_1  | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.073 seconds


我们的守护程序可以监视对http://example.com的访问可用性



让我们添加监视https://httpbin.org



监控httpbin.org



在本部分中,我们将添加一个监视任务,该任务将监视对http://example.com的访问



https://httpbin.org添加监视任务将更加容易,因为所有组件均已准备就绪。我们只需要向容器添加一个新的提供程序并更新配置即可。



让我们编辑containers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )


让我们编辑config.yml



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

monitors:

  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5

  httpbin:
    method: "GET"
    url: "https://httpbin.org/get"
    timeout: 5
    check_every: 5


让我们启动守护程序并检查日志。



让我们在终端中执行:



docker-compose up


我们看到类似的结论:



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.077 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.18 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.066 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.126 seconds


功能部分完成。守护程序监视对http://example.comhttps://httpbin.org的访问可用性



在下一节中,我们将添加一些测试。



测验



添加一些测试会很好。来做吧。在包中



创建文件tests.pymonitoringdaemon



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   ├── monitors.py
│   └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


并添加以下几行:



"""Tests module."""

import asyncio
import dataclasses
from unittest import mock

import pytest

from .containers import ApplicationContainer


@dataclasses.dataclass
class RequestStub:
    status: int
    content_length: int


@pytest.fixture
def container():
    container = ApplicationContainer()
    container.config.from_dict({
        'log': {
            'level': 'INFO',
            'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
        },
        'monitors': {
            'example': {
                'method': 'GET',
                'url': 'http://fake-example.com',
                'timeout': 1,
                'check_every': 1,
            },
            'httpbin': {
                'method': 'GET',
                'url': 'https://fake-httpbin.org/get',
                'timeout': 1,
                'check_every': 1,
            },
        },
    })
    return container


@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
    caplog.set_level('INFO')

    http_client_mock = mock.AsyncMock()
    http_client_mock.request.return_value = RequestStub(
        status=200,
        content_length=635,
    )

    with container.http_client.override(http_client_mock):
        example_monitor = container.example_monitor()
        await example_monitor.check()

    assert 'http://fake-example.com' in caplog.text
    assert 'response code: 200' in caplog.text
    assert 'content length: 635' in caplog.text


@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
    caplog.set_level('INFO')

    example_monitor_mock = mock.AsyncMock()
    httpbin_monitor_mock = mock.AsyncMock()

    with container.example_monitor.override(example_monitor_mock), \
            container.httpbin_monitor.override(httpbin_monitor_mock):

        dispatcher = container.dispatcher()
        event_loop.create_task(dispatcher.start())
        await asyncio.sleep(0.1)
        dispatcher.stop()

    assert example_monitor_mock.check.called
    assert httpbin_monitor_mock.check.called


要运行测试,请在终端中运行:



docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon


您应该得到类似的结果:



platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items

monitoringdaemon/tests.py ..                                    [100%]

----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name                             Stmts   Miss  Cover
----------------------------------------------------
monitoringdaemon/__init__.py         0      0   100%
monitoringdaemon/__main__.py         9      9     0%
monitoringdaemon/containers.py      11      0   100%
monitoringdaemon/dispatcher.py      43      5    88%
monitoringdaemon/http.py             6      3    50%
monitoringdaemon/monitors.py        23      1    96%
monitoringdaemon/tests.py           37      0   100%
----------------------------------------------------
TOTAL                              129     18    86%


请注意,在测试中test_example_monitor我们如何HttpClient使用方法代替模拟.override()这样,您可以覆盖任何提供程序的返回值。



测试中执行相同的操作,以用模拟test_dispatcher代替监视任务。





结论



我们基于asyncio依赖注入原理构建了一个监视守护程序我们使用了依赖注入器作为依赖注入框架。



Dependency Injector带来的好处是容器。



当您需要了解或更改应用程序的结构时,容器便开始发挥作用。使用容器,这很容易,因为应用程序的所有组件及其依赖项都位于一个位置:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )




一个容器,作为您的应用程序的映射。您总是知道什么取决于什么。



下一步是什么?






All Articles