2.5. Abstract connection pool#

aiomisc.PoolBase is an abstract class for implementing user-defined connection pool.

Example for aioredis:

import asyncio
import aioredis
import aiomisc


class RedisPool(aiomisc.PoolBase):
    def __init__(self, uri, maxsize=10, recycle=60):
        super().__init__(maxsize=maxsize, recycle=recycle)
        self.uri = uri

    async def _create_instance(self):
        return await aioredis.create_redis(self.uri)

    async def _destroy_instance(self, instance: aioredis.Redis):
        instance.close()
        await instance.wait_closed()

    async def _check_instance(self, instance: aioredis.Redis):
        try:
            await asyncio.wait_for(instance.ping(1), timeout=0.5)
        except:
            return False

        return True


async def main():
    pool = RedisPool("redis://localhost")
    async with pool.acquire() as connection:
        await connection.set("foo", "bar")

    async with pool.acquire() as connection:
        print(await connection.get("foo"))


asyncio.run(main())