我是Dependency Injector的创建者。这是Python的依赖项注入框架。
这是另一个使用Dependency Injector构建应用程序的教程。
今天,我想展示如何基于模块构建异步守护程序
asyncio
。
该手册包括以下部分:
可以在Github上找到完成的项目。
首先,需要具备以下条件:
- 的初步知识
asyncio
- 了解依赖注入的原理
我们要建造什么?
我们将构建一个监视守护程序,该守护程序将监视对Web服务的访问。
守护程序每隔几秒钟就会将请求发送到example.com和httpbin.org。收到响应后,它将以下数据写入日志:
- 回应码
- 响应的字节数
- 完成请求所花费的时间
工具检查
我们将使用Docker和docker-compose。让我们检查一下它们是否已安装:
docker --version
docker-compose --version
输出应如下所示:
Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31
如果未安装Docker或docker-compose,则需要先安装它们。请遵循以下指南:
工具准备就绪。让我们继续进行项目结构。
项目结构
创建一个项目文件夹并转到它:
mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial
现在我们需要创建一个初始项目结构。按照以下结构创建文件和文件夹。目前所有文件均为空。我们稍后再填写。
初始项目结构:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
初始项目结构已准备就绪。我们将在以下各节中对其进行扩展。
接下来,我们正在等待环境的准备。
准备环境
在本节中,我们将准备用于启动守护程序的环境。
首先,您需要定义依赖关系。我们将使用如下软件包:
dependency-injector
-依赖注入框架aiohttp
-网络框架(我们只需要一个http客户端)pyyaml
-用于解析YAML文件的库,用于读取配置pytest
-测试框架pytest-asyncio
-用于测试asyncio
应用程序的帮助程序库pytest-cov
-帮助库,用于测试测试的代码覆盖率
让我们在文件中添加以下几行
requirements.txt
:
dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov
并在终端中执行:
pip install -r requirements.txt
接下来,我们创建
Dockerfile
。它将描述构建和启动守护程序的过程。我们将其python:3.8-buster
用作基本图像。
让我们在文件中添加以下几行
Dockerfile
:
FROM python:3.8-buster
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY . /code/
RUN apt-get install openssl \
&& pip install --upgrade pip \
&& pip install -r requirements.txt \
&& rm -rf ~/.cache
CMD ["python", "-m", "monitoringdaemon"]
最后一步是定义设置
docker-compose
。
让我们在文件中添加以下几行
docker-compose.yml
:
version: "3.7"
services:
monitor:
build: ./
image: monitoring-daemon
volumes:
- "./:/code"
全部都准备好了。让我们开始构建映像,并检查环境配置是否正确。
让我们在终端中执行:
docker-compose build
构建过程可能需要几分钟。最后,您应该看到:
Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest
构建过程完成后,启动容器:
docker-compose up
你会看见:
Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0
环境已准备就绪。容器以code开头和结尾
0
。
下一步是设置日志记录并读取配置文件。
记录和配置
在本节中,我们将配置日志记录并读取配置文件。
让我们从添加应用程序的主要部分开始-依赖容器(进一步就是容器)。该容器将包含应用程序的所有组件。
让我们添加前两个组件。这是配置对象,是用于配置日志记录的功能。
让我们编辑
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
在设置它们的值之前,我们使用了配置参数。这是提供者工作的原则Configuration
。
首先我们使用,然后设置值。
日志记录设置将包含在配置文件中。
让我们编辑
config.yml
:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
现在让我们定义一个将启动守护程序的函数。她通常被称为
main()
。它将创建一个容器。该容器将用于读取配置文件并调用日志记录设置功能。
让我们编辑
__main__.py
:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
if __name__ == '__main__':
main()
容器是应用程序中的第一个对象。它用于获取所有其他对象。
配置日志记录和读取已配置。在下一节中,我们将创建一个监视任务管理器。
调度员
现在该添加监视任务管理器了。
调度程序将包含监视任务列表并控制其执行。他将按照时间表执行每个任务。类
Monitor
-用于监视任务的基类。要创建特定任务,您需要添加子类并实现方法check()
。
让我们为监视任务添加一个调度程序和一个基类。
让我们创建
dispatcher.py
并monitors.py
在包中monitoringdaemon
:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
让我们在文件中添加以下几行
monitors.py
:
"""Monitors module."""
import logging
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
并到文件
dispatcher.py
:
""""Dispatcher module."""
import asyncio
import logging
import signal
import time
from typing import List
from .monitors import Monitor
class Dispatcher:
def __init__(self, monitors: List[Monitor]) -> None:
self._monitors = monitors
self._monitor_tasks: List[asyncio.Task] = []
self._logger = logging.getLogger(self.__class__.__name__)
self._stopping = False
def run(self) -> None:
asyncio.run(self.start())
async def start(self) -> None:
self._logger.info('Starting up')
for monitor in self._monitors:
self._monitor_tasks.append(
asyncio.create_task(self._run_monitor(monitor)),
)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
self.stop()
def stop(self) -> None:
if self._stopping:
return
self._stopping = True
self._logger.info('Shutting down')
for task, monitor in zip(self._monitor_tasks, self._monitors):
task.cancel()
self._logger.info('Shutdown finished successfully')
@staticmethod
async def _run_monitor(monitor: Monitor) -> None:
def _until_next(last: float) -> float:
time_took = time.time() - last
return monitor.check_every - time_took
while True:
time_start = time.time()
try:
await monitor.check()
except asyncio.CancelledError:
break
except Exception:
monitor.logger.exception('Error executing monitor check')
await asyncio.sleep(_until_next(last=time_start))
调度程序需要添加到容器中。
让我们编辑
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
每个组件都添加到了容器中。
最后,我们需要更新功能
main()
。我们将从容器中获取调度程序,并调用其方法run()
。
让我们编辑
__main__.py
:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
dispatcher = container.dispatcher()
dispatcher.run()
if __name__ == '__main__':
main()
现在让我们启动守护程序并测试其工作。
让我们在终端中执行:
docker-compose up
输出应如下所示:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0
一切正常。由于没有监视任务,因此调度程序将启动和停止。
在本节结束时,我们的恶魔的骨骼已准备就绪。在下一节中,我们将添加第一个监视任务。
监控example.com
在本节中,我们将添加一个监视任务,该任务将监视对http://example.com的访问。
我们将以一种新型的监视任务扩展类模型
HttpMonitor
。
HttpMonitor
这是一个儿童班Monitor
。我们将实现check()方法。它将发送HTTP请求并记录收到的响应。HTTP请求的详细信息将委托给类HttpClient
。
让我们先添加
HttpClient
。
让我们
http.py
在包中创建一个文件monitoringdaemon
:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
并添加以下几行:
"""Http client module."""
from aiohttp import ClientSession, ClientTimeout, ClientResponse
class HttpClient:
async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
async with ClientSession(timeout=ClientTimeout(timeout)) as session:
async with session.request(method, url) as response:
return response
接下来,您需要添加
HttpClient
到容器中。
让我们编辑
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
现在我们准备添加
HttpMonitor
。让我们将其添加到模块中monitors
。
让我们编辑
monitors.py
:
"""Monitors module."""
import logging
import time
from typing import Dict, Any
from .http import HttpClient
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
class HttpMonitor(Monitor):
def __init__(
self,
http_client: HttpClient,
options: Dict[str, Any],
) -> None:
self._client = http_client
self._method = options.pop('method')
self._url = options.pop('url')
self._timeout = options.pop('timeout')
super().__init__(check_every=options.pop('check_every'))
@property
def full_name(self) -> str:
return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)
async def check(self) -> None:
time_start = time.time()
response = await self._client.request(
method=self._method,
url=self._url,
timeout=self._timeout,
)
time_end = time.time()
time_took = time_end - time_start
self.logger.info(
'Response code: %s, content length: %s, request took: %s seconds',
response.status,
response.content_length,
round(time_took, 3)
)
我们都准备为http://example.com添加支票。我们需要对容器进行两项更改:
- 添加工厂
example_monitor
。 - 转移
example_monitor
到调度员。
让我们编辑
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
),
)
提供程序
example_monitor
取决于配置值。让我们添加这些值:
编辑
config.yml
:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
全部都准备好了。我们启动守护程序并检查工作。
我们在终端执行:
docker-compose up
我们看到类似的结论:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.067 seconds
monitor_1 |
monitor_1 | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.073 seconds
我们的守护程序可以监视对http://example.com的访问可用性。
让我们添加监视https://httpbin.org。
监控httpbin.org
在本部分中,我们将添加一个监视任务,该任务将监视对http://example.com的访问。
为https://httpbin.org添加监视任务将更加容易,因为所有组件均已准备就绪。我们只需要向容器添加一个新的提供程序并更新配置即可。
让我们编辑
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
让我们编辑
config.yml
:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
httpbin:
method: "GET"
url: "https://httpbin.org/get"
timeout: 5
check_every: 5
让我们启动守护程序并检查日志。
让我们在终端中执行:
docker-compose up
我们看到类似的结论:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.077 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.18 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.066 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.126 seconds
功能部分完成。守护程序监视对http://example.com和https://httpbin.org的访问可用性。
在下一节中,我们将添加一些测试。
测验
添加一些测试会很好。来做吧。在包中
创建文件:
tests.py
monitoringdaemon
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ ├── monitors.py
│ └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
并添加以下几行:
"""Tests module."""
import asyncio
import dataclasses
from unittest import mock
import pytest
from .containers import ApplicationContainer
@dataclasses.dataclass
class RequestStub:
status: int
content_length: int
@pytest.fixture
def container():
container = ApplicationContainer()
container.config.from_dict({
'log': {
'level': 'INFO',
'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
},
'monitors': {
'example': {
'method': 'GET',
'url': 'http://fake-example.com',
'timeout': 1,
'check_every': 1,
},
'httpbin': {
'method': 'GET',
'url': 'https://fake-httpbin.org/get',
'timeout': 1,
'check_every': 1,
},
},
})
return container
@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
caplog.set_level('INFO')
http_client_mock = mock.AsyncMock()
http_client_mock.request.return_value = RequestStub(
status=200,
content_length=635,
)
with container.http_client.override(http_client_mock):
example_monitor = container.example_monitor()
await example_monitor.check()
assert 'http://fake-example.com' in caplog.text
assert 'response code: 200' in caplog.text
assert 'content length: 635' in caplog.text
@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
caplog.set_level('INFO')
example_monitor_mock = mock.AsyncMock()
httpbin_monitor_mock = mock.AsyncMock()
with container.example_monitor.override(example_monitor_mock), \
container.httpbin_monitor.override(httpbin_monitor_mock):
dispatcher = container.dispatcher()
event_loop.create_task(dispatcher.start())
await asyncio.sleep(0.1)
dispatcher.stop()
assert example_monitor_mock.check.called
assert httpbin_monitor_mock.check.called
要运行测试,请在终端中运行:
docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon
您应该得到类似的结果:
platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items
monitoringdaemon/tests.py .. [100%]
----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name Stmts Miss Cover
----------------------------------------------------
monitoringdaemon/__init__.py 0 0 100%
monitoringdaemon/__main__.py 9 9 0%
monitoringdaemon/containers.py 11 0 100%
monitoringdaemon/dispatcher.py 43 5 88%
monitoringdaemon/http.py 6 3 50%
monitoringdaemon/monitors.py 23 1 96%
monitoringdaemon/tests.py 37 0 100%
----------------------------------------------------
TOTAL 129 18 86%
请注意,在测试中test_example_monitor
我们如何HttpClient
使用方法代替模拟.override()
。这样,您可以覆盖任何提供程序的返回值。
测试中执行相同的操作,以用模拟test_dispatcher
代替监视任务。
结论
我们基于
asyncio
依赖注入的原理构建了一个监视守护程序。我们使用了依赖注入器作为依赖注入框架。
Dependency Injector带来的好处是容器。
当您需要了解或更改应用程序的结构时,容器便开始发挥作用。使用容器,这很容易,因为应用程序的所有组件及其依赖项都位于一个位置:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
一个容器,作为您的应用程序的映射。您总是知道什么取决于什么。
下一步是什么?
- 在GitHub上了解有关Dependency Injector的更多信息
- 退房在阅读文档的文档
- 有疑问或发现错误?在Github上打开一个问题