使用FastAPI编写Python Web服务

图片



我知道,我知道,您可能在想“又是什么?!”



是的,对哈布雷他们 已经关于FastAPI框架多次但是,我建议更详细地考虑该工具,并编写自己的迷你Habr的API,不包含因果报应和等级,但具有二十一点,具有测试,身份验证,迁移和与数据库的异步工作。

数据库架构和迁移



首先,使用SQLAlchemy表达式语言,我们将描述数据库架构。让我们创建一个文件模型/ users.py



import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID

metadata = sqlalchemy.MetaData()


users_table = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
    sqlalchemy.Column("name", sqlalchemy.String(100)),
    sqlalchemy.Column("hashed_password", sqlalchemy.String()),
    sqlalchemy.Column(
        "is_active",
        sqlalchemy.Boolean(),
        server_default=sqlalchemy.sql.expression.true(),
        nullable=False,
    ),
)


tokens_table = sqlalchemy.Table(
    "tokens",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column(
        "token",
        UUID(as_uuid=False),
        server_default=sqlalchemy.text("uuid_generate_v4()"),
        unique=True,
        nullable=False,
        index=True,
    ),
    sqlalchemy.Column("expires", sqlalchemy.DateTime()),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)


models / posts.py文件



import sqlalchemy

from .users import users_table

metadata = sqlalchemy.MetaData()


posts_table = sqlalchemy.Table(
    "posts",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
    sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
    sqlalchemy.Column("title", sqlalchemy.String(100)),
    sqlalchemy.Column("content", sqlalchemy.Text()),
)


要自动化数据库迁移,请安装alembic



$ pip install alembic


要初始化Alembic,请运行:



$ alembic init migrations


该命令将在当前目录中创建alembic.ini文件和一个包含以下内容迁移目录



  • 将存储迁移文件版本目录
  • 调用alembic时运行的env.py脚本
  • 一个script.py.mako文件,其中包含新迁移的模板。


我们将指示数据库的URL,为此,我们alembic.ini文件中添加以下行:



sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s


%(variable_name)的 格式允许我们根据环境设置变量的不同值,像这样env.py文件中覆盖它们



from os import environ
from alembic import context
from app.models import posts, users

# Alembic Config   
#     alembic.ini
config = context.config

section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))

fileConfig(config.config_file_name)

target_metadata = [users.metadata, posts.metadata]


这里我们从环境变量中获取DB_USER,DB_PASS,DB_NAME和DB_HOST的值。另外,env.py文件target_metadata属性中指定数据库的元数据,否则Alembic将无法确定需要在数据库中进行哪些更改。



一切就绪,我们可以生成迁移并更新数据库:




$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head


我们启动应用程序并连接数据库



让我们创建一个main.py文件



from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}


并通过运行以下命令启动应用程序:



$ uvicorn main:app --reload


让我们确保一切正常。在浏览器中打开http://127.0.0.1:8000/并查看
{"Hello": "World"}


要连接到数据库,我们将使用数据库模块,该模块允许我们异步执行查询。



让我们配置服务的启动关闭 事件,在该事件中将发生与数据库的连接和断开连接。让我们编辑main.py文件



from os import environ

import databases

#      
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
    f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
#   database,      
database = databases.Database(SQLALCHEMY_DATABASE_URL)


app = FastAPI()


@app.on_event("startup")
async def startup():
    #       
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    #       
    await database.disconnect()


@app.get("/")
async def read_root():
    #    ,      
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
    )
    return await database.fetch_all(query)


我们打开http://127.0.0.1:8000/,如果在响应中看到一个空列表[],那么一切进展顺利,我们可以继续进行下去。



请求和响应验证



我们将实现用户注册的可能性。为此,我们需要验证HTTP请求和响应。为了解决这个问题,我们将使用pydantic



pip install pydantic


创建一个schemas / users.py文件,并添加一个负责验证请求主体的模型:



from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


请注意,字段类型是使用类型注释定义的。除了诸如intstr之类的内置数据类型外,pydantic还提供了大量提供附加验证的类型。例如,EmailStr类型检查接收到的值是有效的电子邮件。要使用EmailStr类型,需要安装email-validator模块



pip install email-validator


响应主体应包含其自己的特定字段,例如idaccess_token,因此让我们添加负责生成对schemas / users.py文件的响应的模型:



from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator


class UserCreate(BaseModel):
    """  sign-up  """
    email: EmailStr
    name: str
    password: str


class UserBase(BaseModel):
    """       """
    id: int
    email: EmailStr
    name: str


class TokenBase(BaseModel):
    token: UUID4 = Field(..., alias="access_token")
    expires: datetime
    token_type: Optional[str] = "bearer"

    class Config:
        allow_population_by_field_name = True

    @validator("token")
    def hexlify_token(cls, value):
        """  UUID  hex  """
        return value.hex


class User(UserBase):
    """         """
    token: TokenBase = {}


对于模型中的每个字段,您可以编写一个自定义验证器。例如,hexlify_token将UUID值转换为十六进制字符串。值得注意的是,当您需要覆盖模型字段的默认行为时,可以使用Field。例如,令牌:UUID4 = Field(...,alias =“ access_token”)设置令牌字段access_token别名。为了指示该字段是必需的,传递了一个特殊值-...省略号作为第一个参数 让我们添加utils / users.py文件,在其中我们将创建将用户写入数据库所需的方法:







import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_

from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema

def get_random_string(length=12):
    """   ,    """
    return "".join(random.choice(string.ascii_letters) for _ in range(length))


def hash_password(password: str, salt: str = None):
    """     """
    if salt is None:
        salt = get_random_string()
    enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
    return enc.hex()


def validate_password(password: str, hashed_password: str):
    """ ,         """
    salt, hashed = hashed_password.split("$")
    return hash_password(password, salt) == hashed


async def get_user_by_email(email: str):
    """     """
    query = users_table.select().where(users_table.c.email == email)
    return await database.fetch_one(query)


async def get_user_by_token(token: str):
    """       """
    query = tokens_table.join(users_table).select().where(
        and_(
            tokens_table.c.token == token,
            tokens_table.c.expires > datetime.now()
        )
    )
    return await database.fetch_one(query)


async def create_user_token(user_id: int):
    """       user_id """
    query = (
        tokens_table.insert()
        .values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
        .returning(tokens_table.c.token, tokens_table.c.expires)
    )

    return await database.fetch_one(query)


async def create_user(user: user_schema.UserCreate):
    """      """
    salt = get_random_string()
    hashed_password = hash_password(user.password, salt)
    query = users_table.insert().values(
        email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
    )
    user_id = await database.execute(query)
    token = await create_user_token(user_id)
    token_dict = {"token": token["token"], "expires": token["expires"]}

    return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}




创建一个文件routers / users.py并添加一个注册路由,表明它在请求中期望使用CreateUser模型并返回一个User模型

from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils


router = APIRouter()


@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
    db_user = await users_utils.get_user_by_email(email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return await users_utils.create_user(user=user)


它仍然仅用于连接来自routers / users.py文件的路由为此,将以下行添加到main.py



from app.routers import users
app.include_router(users.router)


身份验证和访问控制



现在我们的数据库中已有用户,我们可以设置应用程序身份验证了。让我们添加一个接受用户名和密码并返回令牌的端点。更新routers / users.py文件以添加



from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm


@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
    user = await users_utils.get_user_by_email(email=form_data.username)

    if not user:
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    if not users_utils.validate_password(
        password=form_data.password, hashed_password=user["hashed_password"]
    ):
        raise HTTPException(status_code=400, detail="Incorrect email or password")

    return await users_utils.create_user_token(user_id=user["id"])


同时,我们不需要自己描述请求模型,Fastapi提供了一个特殊的依赖OAuth2PasswordRequestForm,它使路由期望用户名和密码两个字段。



为了限制未经身份验证的用户访问某些路由,我们将编写一个依赖方法。它将验证提供的令牌属于活动用户,并返回用户的详细信息。这将使我们能够在所有需要身份验证的路由上使用用户信息。让我们创建一个utils / dependecies.py文件



from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = await users_utils.get_user_by_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    if not user["is_active"]:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
        )
    return user


请注意,一个依赖关系可能又依赖于另一个依赖关系。例如,OAuth2PasswordBearer是一个依赖项,可以使FastAPI清楚地知道当前路由需要身份验证。



要检查一切是否按预期进行,请添加/ users / me route ,它返回当前用户的详细信息。将这些添加到routers / users.py



from app.utils.dependencies import get_current_user


@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
    return current_user


现在,我们有了/ users / me路由,只有经过身份验证的用户才能访问。



一切准备就绪,终于可以为用户提供创建和编辑出版物的功能:



utils / posts.py
from datetime import datetime

from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select


async def create_post(post: post_schema.PostModel, user):
    query = (
        posts_table.insert()
        .values(
            title=post.title,
            content=post.content,
            created_at=datetime.now(),
            user_id=user["id"],
        )
        .returning(
            posts_table.c.id,
            posts_table.c.title,
            posts_table.c.content,
            posts_table.c.created_at,
        )
    )
    post = await database.fetch_one(query)

    # Convert to dict and add user_name key to it
    post = dict(zip(post, post.values()))
    post["user_name"] = user["name"]
    return post


async def get_post(post_id: int):
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .where(posts_table.c.id == post_id)
    )
    return await database.fetch_one(query)


async def get_posts(page: int):
    max_per_page = 10
    offset1 = (page - 1) * max_per_page
    query = (
        select(
            [
                posts_table.c.id,
                posts_table.c.created_at,
                posts_table.c.title,
                posts_table.c.content,
                posts_table.c.user_id,
                users_table.c.name.label("user_name"),
            ]
        )
        .select_from(posts_table.join(users_table))
        .order_by(desc(posts_table.c.created_at))
        .limit(max_per_page)
        .offset(offset1)
    )
    return await database.fetch_all(query)


async def get_posts_count():
    query = select([func.count()]).select_from(posts_table)
    return await database.fetch_val(query)


async def update_post(post_id: int, post: post_schema.PostModel):
    query = (
        posts_table.update()
        .where(posts_table.c.id == post_id)
        .values(title=post.title, content=post.content)
    )
    return await database.execute(query)





路由器/ posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status

router = APIRouter()


@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
    post = await post_utils.create_post(post, current_user)
    return post


@router.get("/posts")
async def get_posts(page: int = 1):
    total_cout = await post_utils.get_posts_count()
    posts = await post_utils.get_posts(page)
    return {"total_count": total_cout, "results": posts}


@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
    return await post_utils.get_post(post_id)


@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
    post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
    post = await post_utils.get_post(post_id)
    if post["user_id"] != current_user["id"]:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You don't have access to modify this post",
        )

    await post_utils.update_post(post_id=post_id, post=post_data)
    return await post_utils.get_post(post_id)





让我们通过添加到main.py连接新路线

from app.routers import posts
app.include_router(posts.router)


测试中



我们将在pytest中编写测试



$ pip install pytest


为了测试端点,FastAPI提供了一个特殊的工具TestClient



让我们编写一个不需要数据库连接的端点测试:



from app.main import app
from fastapi.testclient import TestClient

client = TestClient(app)


def test_health_check():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"Hello": "World"}


如您所见,一切都很简单。您必须初始化TestClient并将其用于测试HTTP请求。



要测试其余的端点,您需要创建一个测试数据库。让我们编辑main.py文件,向其中添加测试基础配置:



from os import environ

import databases

DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")

TESTING = environ.get("TESTING")

if TESTING:
    #      
    DB_NAME = "async-blogs-temp-for-test"
    TEST_SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
    DB_NAME = "async-blogs"
    SQLALCHEMY_DATABASE_URL = (
        f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
    )
    database = databases.Database(SQLALCHEMY_DATABASE_URL)


我们仍在为我们的应用程序使用“异步博客”数据库但是,如果设置了环境变量TESTING的值,那么将使用数据库“ async-blogs-temp-for-test”



在运行测试时自动创建“ async-blogs-temp-for-test”数据库,并在运行它们后将其删除,请在tests / conftest.py文件中创建一个固定装置



import os

import pytest

#  `os.environ`,    
os.environ['TESTING'] = 'True'

from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database


@pytest.fixture(scope="module")
def temp_db():
    create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  
    base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) #   alembic 
    command.upgrade(alembic_cfg, "head") #  

    try:
        yield database.TEST_SQLALCHEMY_DATABASE_URL
    finally:
        drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #  


要创建和删除数据库,我们将使用sqlalchemy_utils在测试中



使用temp_db固定装置,我们可以测试应用程序的所有端点:



def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth Vader",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["access_token"] is not None


测试/ test_posts.py
import asyncio

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_create_post(temp_db):
    user = UserCreate(
        email="vader@deathstar.com",
        name="Darth",
        password="rainbow"
    )
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        # Create user and use his token to add new post
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        response = client.post(
            "/posts",
            json=request_data,
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 201
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_create_post_forbidden_without_token(temp_db):
    request_data = {
      "title": "42",
      "content": "Don't panic!"
    }
    with TestClient(app) as client:
        response = client.post("/posts", json=request_data)
    assert response.status_code == 401


def test_posts_list(temp_db):
    with TestClient(app) as client:
        response = client.get("/posts")
    assert response.status_code == 200
    assert response.json()["total_count"] == 1
    assert response.json()["results"][0]["id"] == 1
    assert response.json()["results"][0]["title"] == "42"
    assert response.json()["results"][0]["content"] == "Don't panic!"


def test_post_detail(temp_db):
    post_id = 1
    with TestClient(app) as client:
        response = client.get(f"/posts/{post_id}")
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Don't panic!"


def test_update_post(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        # Create user token to add new post
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.put(
            f"/posts/{post_id}",
            json=request_data,
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["title"] == "42"
    assert response.json()["content"] == "Life? Don't talk to me about life."


def test_update_post_forbidden_without_token(temp_db):
    post_id = 1
    request_data = {
      "title": "42",
      "content": "Life? Don't talk to me about life."
    }
    with TestClient(app) as client:
        response = client.put(f"/posts/{post_id}", json=request_data)
    assert response.status_code == 401




测试/ test_users.py
import asyncio
import pytest

from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient


def test_sign_up(temp_db):
    request_data = {
        "email": "vader@deathstar.com",
        "name": "Darth",
        "password": "rainbow"
    }
    with TestClient(app) as client:
        response = client.post("/sign-up", json=request_data)
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"
    assert response.json()["token"]["expires"] is not None
    assert response.json()["token"]["token"] is not None


def test_login(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 200
    assert response.json()["token_type"] == "bearer"
    assert response.json()["expires"] is not None
    assert response.json()["access_token"] is not None


def test_login_with_invalid_password(temp_db):
    request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
    with TestClient(app) as client:
        response = client.post("/auth", data=request_data)
    assert response.status_code == 400
    assert response.json()["detail"] == "Incorrect email or password"


def test_user_detail(temp_db):
    with TestClient(app) as client:
        # Create user token to see user info
        loop = asyncio.get_event_loop()
        token = loop.run_until_complete(create_user_token(user_id=1))
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {token['token']}"}
        )
    assert response.status_code == 200
    assert response.json()["id"] == 1
    assert response.json()["email"] == "vader@deathstar.com"
    assert response.json()["name"] == "Darth"


def test_user_detail_forbidden_without_token(temp_db):
    with TestClient(app) as client:
        response = client.get("/users/me")
    assert response.status_code == 401


@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
    user = UserCreate(
        email="sidious@deathstar.com",
        name="Palpatine",
        password="unicorn"
    )
    with TestClient(app) as client:
        # Create user and use expired token
        loop = asyncio.get_event_loop()
        user_db = loop.run_until_complete(create_user(user))
        freezer.move_to("'2015-11-10'")
        response = client.get(
            "/users/me",
            headers={"Authorization": f"Bearer {user_db['token']['token']}"}
        )
    assert response.status_code == 401




PS来源



就是这样,可以在GitHub上查看帖子的源存储库



All Articles