2.14. Working with threads¶
2.14.1. Why use threads with asyncio?¶
Asyncio is designed for non-blocking I/O operations, but many libraries and operations are inherently blocking:
File I/O (reading/writing files)
CPU-intensive computations
Legacy libraries without async support
Database drivers without async support
System calls that don’t have async alternatives
Running blocking code directly in an async function blocks the entire event loop, preventing other coroutines from executing. The solution is to run blocking code in separate threads while the event loop continues processing other tasks.
aiomisc provides convenient decorators and utilities to seamlessly integrate blocking code with asyncio, powered by the aiothreads library.
2.14.2. Quick reference¶
Decorator/Class |
Use case |
|---|---|
|
Run blocking function in thread pool |
|
Run blocking function in new thread |
|
Async iterate over blocking generator |
|
Same, but on separate thread |
|
Wrap existing generator for async iteration |
|
Call async function from thread |
|
Send data from thread to async code |
2.14.3. @aiomisc.threaded¶
Wraps a blocking function to run it in the current thread pool. The decorated function becomes awaitable.
2.14.3.1. Basic usage¶
import asyncio
import time
from aiomisc import threaded
@threaded
def blocking_function():
time.sleep(1)
return "done"
async def main():
# Run two blocking calls in parallel
results = await asyncio.gather(
blocking_function(),
blocking_function(),
)
print(results) # ['done', 'done']
asyncio.run(main())
2.14.3.2. Calling modes¶
The @threaded decorator returns a Threaded object with multiple
calling methods:
import time
from aiomisc import threaded
@threaded
def blocking_function():
time.sleep(0.1)
return "result"
async def main():
# Async call (returns coroutine)
result = await blocking_function()
# Explicit async call
result = await blocking_function.async_call()
# Synchronous call (blocks current thread)
result = blocking_function.sync_call()
2.14.3.3. Works with methods¶
The decorator works correctly with instance methods, class methods, and static methods:
from aiomisc import threaded
class MyClass:
def __init__(self, value):
self.value = value
@threaded
def instance_method(self):
return self.value
@threaded
@classmethod
def class_method(cls):
return cls.__name__
@threaded
@staticmethod
def static_method(x):
return x * 2
async def main():
obj = MyClass(42)
print(await obj.instance_method()) # 42
print(await MyClass.class_method()) # MyClass
print(await MyClass.static_method(21)) # 42
2.14.4. @aiomisc.threaded_separate¶
Wraps a blocking function to run it in a new separate thread (not from pool). Use this for long-running background tasks that would otherwise occupy a thread pool slot for extended periods.
import asyncio
import time
import threading
import aiomisc
@aiomisc.threaded
def quick_task():
"""Short task - uses thread pool"""
time.sleep(0.1)
return "quick"
@aiomisc.threaded_separate
def long_running_task(stop_event: threading.Event):
"""Long task - runs in dedicated thread"""
while not stop_event.is_set():
print("Working...")
time.sleep(1)
return "finished"
async def main():
stop_event = threading.Event()
# Schedule stop after 5 seconds
loop = asyncio.get_event_loop()
loop.call_later(5, stop_event.set)
# Both run concurrently
results = await asyncio.gather(
quick_task(),
long_running_task(stop_event),
)
print(results) # ['quick', 'finished']
with aiomisc.entrypoint() as loop:
loop.run_until_complete(main())
2.14.5. Threaded iterators¶
2.14.5.1. @aiomisc.threaded_iterable¶
Wraps a blocking generator function to make it async-iterable. The generator runs in a thread pool while yielding values to async code.
Note
The generator uses lazy start - execution begins only when iteration
starts (first async for or __anext__() call), not when entering
the async context manager.
import asyncio
import aiomisc
@aiomisc.threaded_iterable
def read_large_file(path):
"""Read file line by line without blocking event loop"""
with open(path, 'r') as f:
for line in f:
yield line.strip()
async def main():
async with read_large_file('/etc/hosts') as lines:
async for line in lines:
print(line)
asyncio.run(main())
2.14.5.1.1. Buffer size control¶
Use max_size parameter to control backpressure. The generator thread
will block when the buffer is full:
import aiomisc
# Buffer holds at most 2 items
@aiomisc.threaded_iterable(max_size=2)
def produce_data():
for i in range(1000):
yield i # Blocks when 2 items buffered
2.14.5.2. @aiomisc.threaded_iterable_separate¶
Same as @threaded_iterable but runs in a dedicated thread instead of
the thread pool. Use for long-running generators:
import aiomisc
@aiomisc.threaded_iterable_separate
def tail_file(path):
"""Continuously read new lines from file"""
with open(path, 'r') as f:
f.seek(0, 2) # Go to end
while True:
line = f.readline()
if line:
yield line.strip()
2.14.5.3. Cleanup with context managers¶
For infinite generators, always use async context manager or call .close():
import aiomisc
@aiomisc.threaded_iterable(max_size=2)
def infinite_generator():
counter = 0
while True:
yield counter
counter += 1
async def main():
# Option 1: Context manager (recommended)
async with infinite_generator() as gen:
async for value in gen:
if value >= 10:
break # Context manager handles cleanup
# Option 2: Manual cleanup
gen = infinite_generator()
async for value in gen:
if value >= 10:
break
await gen.close() # Must call close!
2.14.6. aiomisc.IteratorWrapper¶
Wraps an existing generator to make it async-iterable. Useful when you can’t use decorators or need to specify a custom executor:
import concurrent.futures
import aiomisc
def my_generator():
for i in range(100):
yield i
async def main():
# Use default thread pool
wrapper = aiomisc.IteratorWrapper(
my_generator,
max_size=10
)
async with wrapper as gen:
async for item in gen:
print(item)
# Or use custom thread pool
pool = concurrent.futures.ThreadPoolExecutor(2)
wrapper = aiomisc.IteratorWrapper(
my_generator,
executor=pool,
max_size=10
)
async with wrapper as gen:
async for item in gen:
print(item)
pool.shutdown()
2.14.7. aiomisc.IteratorWrapperSeparate¶
Same as IteratorWrapper but runs the generator in a dedicated thread:
import aiomisc
def blocking_generator():
while True:
yield "data"
async def main():
wrapper = aiomisc.IteratorWrapperSeparate(
blocking_generator,
max_size=5
)
async with wrapper as gen:
count = 0
async for item in gen:
count += 1
if count >= 100:
break
2.14.8. Calling async from threads¶
2.14.8.1. aiomisc.sync_await¶
Execute an async function synchronously from a thread. Automatically detects the running event loop:
import asyncio
import aiomisc
async def async_operation():
await asyncio.sleep(0.1)
return "async result"
@aiomisc.threaded
def thread_function():
# Call async function from thread
result = aiomisc.sync_await(async_operation())
return f"got: {result}"
async def main():
result = await thread_function()
print(result) # got: async result
with aiomisc.entrypoint() as loop:
loop.run_until_complete(main())
2.14.8.2. aiomisc.sync_wait_coroutine¶
Lower-level function that requires explicit loop argument:
import asyncio
import aiomisc
async def fetch_data():
await asyncio.sleep(0.1)
return {"key": "value"}
@aiomisc.threaded
def process_in_thread(loop):
# Explicit loop argument
data = aiomisc.sync_wait_coroutine(loop, fetch_data())
return data["key"]
with aiomisc.entrypoint() as loop:
result = loop.run_until_complete(process_in_thread(loop))
print(result) # value
2.14.9. aiomisc.FromThreadChannel¶
A channel for sending data from threads to async code. Unlike
IteratorWrapper, you control when and what to send:
import asyncio
import aiomisc
async def consumer(channel: aiomisc.FromThreadChannel):
async for item in channel:
print(f"Received: {item}")
@aiomisc.threaded_separate
def producer(channel: aiomisc.FromThreadChannel):
for i in range(5):
channel.put(i) # Send to async consumer
channel.close() # Signal end of data
async def main():
channel = aiomisc.FromThreadChannel(max_size=2)
await asyncio.gather(
consumer(channel),
producer(channel),
)
asyncio.run(main())
2.14.10. contextvars support¶
All decorators automatically copy context variables to the thread:
import asyncio
import contextvars
import aiomisc
request_id = contextvars.ContextVar("request_id")
@aiomisc.threaded
def log_with_context(message):
print(f"[{request_id.get()}] {message}")
async def handle_request(req_id):
request_id.set(req_id)
await log_with_context("Processing request")
async def main():
await asyncio.gather(
handle_request("req-001"),
handle_request("req-002"),
handle_request("req-003"),
)
asyncio.run(main())
Output:
[req-001] Processing request
[req-002] Processing request
[req-003] Processing request
Note
Context variables are copied to threads, so modifications in threads don’t affect the parent context.
2.14.11. aiomisc.ThreadPoolExecutor¶
A fast thread pool implementation used by aiomisc internally.
Manual setup:
import asyncio
from aiomisc import ThreadPoolExecutor
loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor(4)
loop.set_default_executor(thread_pool)
Note
The entrypoint context manager sets this automatically.
Use the pool_size argument to control thread pool size:
with aiomisc.entrypoint(pool_size=8) as loop:
...