2.19. Pytest plugin¶
All pytest plugin code lives in the aiomisc package itself.
The aiomisc-pytest package is required only for plugin registration
(it contains the pytest entry point). Install both:
pip install aiomisc aiomisc-pytest
2.19.1. Basic usage¶
The plugin automatically provides an event loop and an entrypoint for every
async test. Simply write async def test_... functions:
import asyncio
async def test_sample(event_loop: asyncio.AbstractEventLoop):
f = event_loop.create_future()
event_loop.call_soon(f.set_result, True)
assert await f
Async fixtures work the same way:
import asyncio
import pytest
@pytest.fixture
async def my_fixture():
await asyncio.sleep(0)
yield "value"
async def test_with_fixture(my_fixture):
assert my_fixture == "value"
2.19.2. Fixtures¶
The plugin provides the following fixtures:
2.19.2.1. Core fixtures¶
entrypointScope: function, autouse
Creates an
aiomisc.Entrypointinstance using the currentevent_loop. Starts all services from theservicesfixture and populates the context fromdefault_context.localhostScope: session
Returns the localhost address (
"127.0.0.1"or"[::1]"depending on what is available on the system).loop_debugScope: session
Returns the value of
--aiomisc-debugcommand-line option.aiomisc_test_timeoutScope: session
Returns the value of
--aiomisc-test-timeoutcommand-line option.thread_pool_sizeScope: session
Returns the value of
--aiomisc-pool-sizecommand-line option.tcp_proxyScope: session
Returns the
TCPProxyclass for emulating network problems. See TCPProxy section below.
2.19.2.2. Overridable fixtures¶
These fixtures have sensible defaults but can be overridden in your
conftest.py or test module to change behavior or scope.
event_loopScope: function, autouse, Default: creates a new event loop per test
Creates and manages an asyncio event loop. Configures thread pool, debug mode, and exception handling. Closes the loop on teardown.
Override this fixture to change its scope (module or session) when you need async fixtures that outlive a single test. See Scoped event loops and fixtures for details.
servicesScope: function, Default: empty list
Return a list of
aiomisc.Serviceinstances to start inside the entrypoint.import aiomisc import pytest class MyService(aiomisc.Service): async def start(self) -> None: self.start_event.set() await asyncio.sleep(3600) @pytest.fixture def services(): return [MyService()]
default_contextScope: function, Default: empty dict
Return a mapping of values to populate in the entrypoint context.
import pytest @pytest.fixture def default_context(): return { "foo": "bar", "bar": "foo", }
entrypoint_kwargsScope: function, Default:
{"log_config": False}Return extra keyword arguments for the
entrypoint()constructor.import pytest @pytest.fixture def entrypoint_kwargs() -> dict: return dict(log_config=False)
thread_pool_executorScope: function, Default:
aiomisc.ThreadPoolExecutorReturn the thread pool executor class to use.
import concurrent.futures import pytest @pytest.fixture def thread_pool_executor(): return concurrent.futures.ThreadPoolExecutor
2.19.2.3. Port and socket fixtures¶
aiomisc_unused_port_factoryScope: function
A callable factory that returns an unused port on each call. Sockets are cleaned up after test teardown.
aiomisc_unused_portScope: function
A single unused port number (uses
aiomisc_unused_port_factoryinternally).aiomisc_socket_factoryScope: function
A callable factory that returns a
PortSocket(port, socket)named tuple with a bound socket.
2.19.3. Markers¶
@pytest.mark.catch_loop_exceptionsUncaught event loop exceptions will fail the test.
import asyncio import pytest @pytest.mark.catch_loop_exceptions async def test_with_errors(event_loop): async def fail(): await asyncio.sleep(0) raise Exception() event_loop.create_task(fail()) await asyncio.sleep(0.1)
@pytest.mark.forbid_get_event_loopForbids calling
asyncio.get_event_loop()during the test.import asyncio import pytest @pytest.mark.forbid_get_event_loop async def test_no_get_loop(): def bad(): asyncio.get_event_loop() with pytest.raises(Exception): bad()
2.19.4. Command-line options¶
--aiomisc-debugEnable event loop debug mode. Default:
False.--aiomisc-pool-sizeThread pool size. Default:
4.--aiomisc-test-timeoutPer-test timeout in seconds. Default:
None(no timeout).
2.19.5. Environment variables¶
AIOMISC_USE_UVLOOPSet to
"0","no", or"false"to disable uvloop. Default:"1"(enabled if uvloop is installed).AIOMISC_LOOP_AUTOUSESet to
"0"to disable autouse onevent_loopandentrypointfixtures. Default:"1".
2.19.6. Scoped event loops and fixtures¶
2.19.6.1. Why change the scope?¶
By default, event_loop and entrypoint are function-scoped: a
fresh event loop is created for every test and closed on teardown. This is
the safest default because each test gets a clean state.
However, some async resources are expensive to create and should be shared
across tests: database connection pools, HTTP client sessions, preloaded
caches, and so on. In pytest, you express this by giving the fixture a
wider scope ("module" or "session").
The problem is that a wider-scoped async fixture must run on an event loop that lives at least as long as the fixture itself. If the fixture is session-scoped but the event loop is function-scoped, the loop will be closed after the first test, and the fixture’s teardown (and every subsequent test that uses it) will fail.
The solution: override event_loop with the same scope as your
widest async fixture.
2.19.6.2. The key rule¶
Important
The event_loop fixture scope must be greater than or equal to the
scope of every async fixture that depends on it.
session >= module >= class >= function
For example, if you have a scope="module" async fixture, you need at
least a scope="module" event loop.
2.19.6.3. Module-scoped loop¶
Override event_loop in your test module or in a conftest.py that
applies to the relevant directory:
import asyncio
import pytest
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
loop.close()
asyncio.set_event_loop(None)
@pytest.fixture(scope="module")
async def shared_resource():
resource = await create_expensive_resource()
yield resource
await resource.close()
async def test_first(shared_resource):
assert shared_resource is not None
async def test_second(shared_resource):
assert shared_resource is not None
All tests in the module share the same event loop and the same
shared_resource instance. The resource is created once before the
first test in the module and torn down after the last one.
2.19.6.4. Session-scoped loop¶
For fixtures that must survive the entire test session, place a
session-scoped event_loop override in a conftest.py. Because this
affects every test that inherits it, it is recommended to put it in a
subdirectory conftest.py rather than the root one, so that only the
tests that need it are affected.
tests/
conftest.py # root conftest (default fixtures)
test_unit.py # uses default function-scoped loop
integration/
conftest.py # session-scoped event_loop override
test_database.py # shares the session loop
test_api.py # shares the session loop
# tests/integration/conftest.py
import asyncio
import pytest
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield loop
finally:
loop.close()
asyncio.set_event_loop(None)
Then use session-scoped async fixtures as usual:
# tests/integration/conftest.py (continued)
import pytest
@pytest.fixture(scope="session")
async def db_pool():
pool = await create_pool(dsn="postgresql://localhost/test")
yield pool
await pool.close()
# tests/integration/test_database.py
async def test_insert(db_pool):
async with db_pool.acquire() as conn:
await conn.execute("INSERT INTO t VALUES (1)")
async def test_select(db_pool):
async with db_pool.acquire() as conn:
row = await conn.fetchrow("SELECT 1 AS n")
assert row["n"] == 1
2.19.6.5. Session-scoped async generator fixtures¶
Async generator fixtures (those that use yield) require extra care.
When the scope is wider than the entrypoint fixture, the entrypoint
must not destroy the generator during its own teardown.
This works correctly: the entrypoint is function-scoped and does not
own the session-scoped event loop, so it will not call
shutdown_asyncgens and the generator survives between tests.
import pytest
async def some_agen():
for i in range(100):
yield i + 1
@pytest.fixture(scope="session")
async def async_gen_fixture():
agen = some_agen()
val = await agen.__anext__()
assert val == 1
val = await agen.__anext__()
assert val == 2
yield val
# teardown: runs when the session-scoped loop is closing
new_val = await agen.__anext__()
assert new_val == 3
await agen.aclose()
async def test_first(async_gen_fixture):
assert async_gen_fixture == 2
async def test_second(async_gen_fixture):
assert async_gen_fixture == 2
2.19.6.6. How it works under the hood¶
Understanding the interaction between event_loop and entrypoint
helps explain why this is safe:
When
event_loopis overridden with a wider scope, the loop is not created by theentrypoint— it is passed in via theloop=parameter.The
Entrypointtracks whether it created the loop with an internal_loop_ownerflag. When the loop is passed from outside,_loop_ownerisFalse.On teardown,
Entrypoint.graceful_shutdown()only callsloop.shutdown_asyncgens()when_loop_ownerisTrue. This means the function-scoped entrypoint teardown will not destroy async generators that belong to the wider-scoped loop.loop.shutdown_asyncgens()is eventually called by theevent_loopfixture itself during its own teardown — at the correct time, after all fixtures with that scope have been finalized.
2.19.6.7. Mixing scopes¶
You can have a session-scoped loop with both session-scoped and function-scoped fixtures. Function-scoped async fixtures will run on the session loop and be created/destroyed per test as usual:
@pytest.fixture(scope="session")
async def db_pool():
"""Created once, shared across all tests."""
pool = await create_pool()
yield pool
await pool.close()
@pytest.fixture
async def db_connection(db_pool):
"""Created fresh for each test, returned to pool after."""
async with db_pool.acquire() as conn:
yield conn
async def test_query(db_connection):
await db_connection.execute("SELECT 1")
2.19.7. TCPProxy¶
TCPProxy is a helper for simulating network problems in tests.
It sits between the client and the server, allowing you to add latency,
disconnect clients, or modify traffic on the fly.
import asyncio
import pytest
import aiomisc
class EchoServer(aiomisc.service.TCPServer):
async def handle_client(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
chunk = await reader.read(65534)
while chunk:
writer.write(chunk)
chunk = await reader.read(65534)
writer.close()
await writer.wait_closed()
@pytest.fixture()
def server_port(aiomisc_unused_port_factory) -> int:
return aiomisc_unused_port_factory()
@pytest.fixture()
def services(server_port, localhost):
return [EchoServer(port=server_port, address=localhost)]
@pytest.fixture()
async def proxy(tcp_proxy, localhost, server_port):
async with tcp_proxy(localhost, server_port) as proxy:
yield proxy
async def test_echo(proxy):
reader, writer = await proxy.create_client()
writer.write(b"Hello world")
response = await asyncio.wait_for(reader.read(1024), timeout=1)
assert response == b"Hello world"
async def test_disconnect(proxy):
reader, writer = await proxy.create_client()
writer.write(b"Hello world")
await asyncio.wait_for(reader.read(1024), timeout=1)
await proxy.disconnect_all()
assert await asyncio.wait_for(reader.read(), timeout=1) == b""
async def test_slowdown(proxy):
with proxy.slowdown(read_delay=0.1, write_delay=0.2):
reader, writer = await proxy.create_client()
writer.write(b"Hello world")
response = await asyncio.wait_for(
reader.read(1024), timeout=2,
)
assert response == b"Hello world"
async def test_content_processor(proxy):
proxy.set_content_processors(
lambda _: b"replaced", # client -> server
lambda chunk: chunk[::-1], # server -> client
)
reader, writer = await proxy.create_client()
writer.write(b"original")
response = await reader.read(16)
assert response == b"replaced"[::-1]