Skip to content

数据库

Starlette 并非严格绑定到任何特定的数据库实现上。

您可以将其与异步 ORM(如 GINO)一起使用,或者使用常规的非异步端点,并与 SQLAlchemy 集成。

在本文档中,我们将展示如何针对 databases 包进行集成,该包为一系列不同的数据库驱动程序提供了 SQLAlchemy 核心支持。

这里有一个完整的示例,其中包括表定义、配置一个 database.Database 实例,以及几个与数据库交互的端点。

**.env **

DATABASE_URL=sqlite:///test.db

app.py

import contextlib

import databases
import sqlalchemy
from starlette.applications import Starlette
from starlette.config import Config
from starlette.responses import JSONResponse
from starlette.routing import Route


# Configuration from environment variables or '.env' file.
config = Config('.env')
DATABASE_URL = config('DATABASE_URL')


# Database table definitions.
metadata = sqlalchemy.MetaData()

notes = sqlalchemy.Table(
    "notes",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("text", sqlalchemy.String),
    sqlalchemy.Column("completed", sqlalchemy.Boolean),
)

database = databases.Database(DATABASE_URL)

@contextlib.asynccontextmanager
async def lifespan(app):
    await database.connect()
    yield
    await database.disconnect()

# Main application code.
async def list_notes(request):
    query = notes.select()
    results = await database.fetch_all(query)
    content = [
        {
            "text": result["text"],
            "completed": result["completed"]
        }
        for result in results
    ]
    return JSONResponse(content)

async def add_note(request):
    data = await request.json()
    query = notes.insert().values(
       text=data["text"],
       completed=data["completed"]
    )
    await database.execute(query)
    return JSONResponse({
        "text": data["text"],
        "completed": data["completed"]
    })

routes = [
    Route("/notes", endpoint=list_notes, methods=["GET"]),
    Route("/notes", endpoint=add_note, methods=["POST"]),
]

app = Starlette(
    routes=routes,
    lifespan=lifespan,
)

最后,您需要创建数据库表。建议使用 Alembic,我们在迁移中对其进行了简要介绍。

查询

查询可以使用 SQLAlchemy Core 查询来进行。

以下方法受到支持:

  • rows = await database.fetch_all(query)
  • row = await database.fetch_one(query)
  • async for row in database.iterate(query)
  • await database.execute(query)
  • await database.execute_many(query)

交易;业务;事务 (这个词有多种含义,具体翻译需根据上下文来确定) 。如果仅按照给定的单词“Transactions”进行直译,可译为“交易;事务” ,这里为您提供了几种常见的释义,以便您根据具体语境进行选择。按照题目要求,直接输出的翻译为:交易

数据库事务可以作为装饰器、上下文管理器或低级 API 来使用。

在端点上使用装饰器:

@database.transaction()
async def populate_note(request):
    # This database insert occurs within a transaction.
    # It will be rolled back by the `RuntimeError`.
    query = notes.insert().values(text="you won't see me", completed=True)
    await database.execute(query)
    raise RuntimeError()

使用上下文管理器:

async def populate_note(request):
    async with database.transaction():
        # This database insert occurs within a transaction.
        # It will be rolled back by the `RuntimeError`.
        query = notes.insert().values(text="you won't see me", completed=True)
        await request.database.execute(query)
        raise RuntimeError()

使用低级别 API:

async def populate_note(request):
    transaction = await database.transaction()
    try:
        # This database insert occurs within a transaction.
        # It will be rolled back by the `RuntimeError`.
        query = notes.insert().values(text="you won't see me", completed=True)
        await database.execute(query)
        raise RuntimeError()
    except:
        await transaction.rollback()
        raise
    else:
        await transaction.commit()

测试隔离

在针对使用数据库的服务进行测试时,有几件事是我们想要确保的。我们的要求应该是:

  • 使用单独的数据库进行测试。
  • 每次运行测试时创建一个新的测试数据库。
  • 确保每个测试用例之间的数据库状态是相互隔离的。

为满足这些要求,我们需要如此构建我们的应用程序和测试:

from starlette.applications import Starlette
from starlette.config import Config
import databases

config = Config(".env")

TESTING = config('TESTING', cast=bool, default=False)
DATABASE_URL = config('DATABASE_URL', cast=databases.DatabaseURL)
TEST_DATABASE_URL = DATABASE_URL.replace(database='test_' + DATABASE_URL.database)

# Use 'force_rollback' during testing, to ensure we do not persist database changes
# between each test case.
if TESTING:
    database = databases.Database(TEST_DATABASE_URL, force_rollback=True)
else:
    database = databases.Database(DATABASE_URL)

在试运行期间,我们仍然需要设置 TESTING ,并设置测试数据库。假设我们正在使用 py.test ,以下是我们的 conftest.py 可能的样子:

import pytest
from starlette.config import environ
from starlette.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database, drop_database

# This sets `os.environ`, but provides some additional protection.
# If we placed it below the application import, it would raise an error
# informing us that 'TESTING' had already been read from the environment.
environ['TESTING'] = 'True'

import app


@pytest.fixture(scope="session", autouse=True)
def create_test_database():
  """
  Create a clean database on every test case.
  For safety, we should abort if a database already exists.

  We use the `sqlalchemy_utils` package here for a few helpers in consistently
  creating and dropping the database.
  """
  url = str(app.TEST_DATABASE_URL)
  engine = create_engine(url)
  assert not database_exists(url), 'Test database already exists. Aborting tests.'
  create_database(url)             # Create the test database.
  metadata.create_all(engine)      # Create the tables.
  yield                            # Run the tests.
  drop_database(url)               # Drop the test database.


@pytest.fixture()
def client():
    """
    When using the 'client' fixture in test cases, we'll get full database
    rollbacks between test cases:

    def test_homepage(client):
        url = app.url_path_for('homepage')
        response = client.get(url)
        assert response.status_code == 200
    """
    with TestClient(app) as client:
        yield client

迁移;迁徙

为了管理对数据库的增量更改,您几乎肯定需要使用数据库迁移。为此,我们强烈推荐 Alembic,它是由 SQLAlchemy 的作者编写的。

$ pip install alembic
$ alembic init migrations

现在,您需要进行设置,以便 Alembic 引用配置的 DATABASE_URL,并使用您的表元数据。

alembic.ini 中删除以下行:

sqlalchemy.url = driver://user:pass@localhost/dbname

migrations/env.py 中,您需要设置 'sqlalchemy.url' 配置键和 target_metadata 变量。您会需要类似这样的内容:

# The Alembic Config object.
config = context.config

# Configure Alembic to use our DATABASE_URL and our table definitions...
import app
config.set_main_option('sqlalchemy.url', str(app.DATABASE_URL))
target_metadata = app.metadata

...

然后,使用我们上述的笔记示例,创建一个初步修订版:

alembic revision -m "Create notes table"

并用必要的指令填充新文件(在 migrations/versions 内):

def upgrade():
    op.create_table(
      'notes',
      sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
      sqlalchemy.Column("text", sqlalchemy.String),
      sqlalchemy.Column("completed", sqlalchemy.Boolean),
    )

def downgrade():
    op.drop_table('notes')

并且运行您的首次迁移。我们的笔记应用程序现在可以运行了!

alembic upgrade head

在测试期间运行迁移

每次创建测试数据库时,确保您的测试套件运行数据库迁移是一个好的实践。这将有助于发现您的迁移脚本中的任何问题,并有助于确保测试是针对与您的实时数据库状态一致的数据库运行的。

我们可以稍微调整一下 create_test_database 夹具:

from alembic import command
from alembic.config import Config
import app

...

@pytest.fixture(scope="session", autouse=True)
def create_test_database():
    url = str(app.DATABASE_URL)
    engine = create_engine(url)
    assert not database_exists(url), 'Test database already exists. Aborting tests.'
    create_database(url)             # Create the test database.
    config = Config("alembic.ini")   # Run the migrations.
    command.upgrade(config, "head")
    yield                            # Run the tests.
    drop_database(url)               # Drop the test database.