2.1. entrypoint#
In the generic case, the entrypoint helper creates an event loop and cancels already running coroutines on exit.
import asyncio
import aiomisc
async def main():
await asyncio.sleep(1)
with aiomisc.entrypoint() as loop:
loop.run_until_complete(main())
Complete example:
import asyncio
import aiomisc
import logging
import signal
async def main():
await asyncio.sleep(1)
logging.info("Hello there")
with aiomisc.entrypoint(
pool_size=2,
log_level='info',
log_format='color', # default when "rich" absent
log_buffer_size=1024, # default
log_flush_interval=0.2, # default
log_config=True, # default
policy=asyncio.DefaultEventLoopPolicy(), # default
debug=False, # default
catch_signals=(signal.SIGINT, signal.SIGTERM), # default
shutdown_timeout=60, # default
) as loop:
loop.run_until_complete(main())
Running entrypoint from async code
import asyncio
import aiomisc
import logging
from aiomisc.service.periodic import PeriodicService
log = logging.getLogger(__name__)
class MyPeriodicService(PeriodicService):
async def callback(self):
log.info('Running periodic callback')
# ...
async def main():
service = MyPeriodicService(interval=1, delay=0) # once per minute
# returns an entrypoint instance because event-loop
# already running and might be get via asyncio.get_event_loop()
async with aiomisc.entrypoint(service) as ep:
try:
await asyncio.wait_for(ep.closing(), timeout=1)
except asyncio.TimeoutError:
pass
asyncio.run(main())
2.1.1. Dynamic running of services#
Sometimes it is not enough to add services to the entrypoint at the start,
or it is not possible to get the service parameters before the start of
the event-loop. In this case it is possible to start services after the
event-loop has started, this feature available from version 17
.
import asyncio
import aiomisc
import logging
from aiomisc.service.periodic import PeriodicService
log = logging.getLogger(__name__)
class MyPeriodicService(PeriodicService):
async def callback(self):
log.info('Running periodic callback')
async def add_services():
entrypoint = aiomisc.entrypoint.get_current()
services = [
MyPeriodicService(interval=2, delay=1),
MyPeriodicService(interval=2, delay=0),
]
await entrypoint.start_services(*services)
await asyncio.sleep(10)
await entrypoint.stop_services(*services)
with aiomisc.entrypoint() as loop:
loop.create_task(add_services())
loop.run_forever()
2.1.2. Configuration from environment#
Module support configuration from environment variables:
AIOMISC_LOG_LEVEL
- default logging levelAIOMISC_LOG_FORMAT
- default log formatAIOMISC_LOG_DATE_FORMAT
- default logging date formatAIOMISC_LOG_CONFIG
- should logging be configuredAIOMISC_LOG_FLUSH
- interval between logs flushing from bufferAIOMISC_LOG_BUFFERING
- should logging be bufferedAIOMISC_LOG_BUFFER_SIZE
- maximum log buffer sizeAIOMISC_POOL_SIZE
- thread pool sizeAIOMISC_USE_UVLOOP
- should use uvloop when it available,0
to disableAIOMISC_SHUTDOWN_TIMEOUT
- If, after receiving the signal, the program does not terminate within this timeout, a force-exit occurs.
2.2. run()
shortcut#
aiomisc.run()
- it’s the short way to create and destroy
aiomisc.entrypoint
. It’s very similar to asyncio.run()
but handle Service
’s and other entrypoint
’s kwargs.
import asyncio
import aiomisc
async def main():
loop = asyncio.get_event_loop()
now = loop.time()
await asyncio.sleep(0.1)
assert now < loop.time()
aiomisc.run(main())
2.3. Logging configuration#
entrypoint
accepts log_format
argument with a specific set of formats,
in which logs will be written to stderr:
stream
- Python’s default logging handlercolor
- logging with colorlog modulejson
- json structure per each linesyslog
- logging using stdlib logging.handlers.SysLogHandlerplain
- just log messages, without date or level infojournald
- available only when logging-journald module has been installed.rich
/rich_tb
- available only when rich module has been installed.rich_tb
it’s the same asrich
but with fully expanded tracebacks.
Additionally, you can specify log level using log_level
argument and date
format using log_date_format
parameters.
An entrypoint
will call aiomisc.log.basic_config
function implicitly
using passed log_*
parameters. Alternatively you can call
aiomisc.log.basic_config
function manually passing it already created
eventloop.
However, you can configure logging earlier using aiomisc_log.basic_config
,
but you will lose log buffering and flushing in a separate thread.
This function is what is actually called during the logging configuration,
the entrypoint
passes a wrapper for the handler there to flush it into
the separate thread.
import logging
from aiomisc_log import basic_config
basic_config(log_format="color")
logging.info("Hello")
If you want to configure logging before the entrypoint
is started,
for example after the arguments parsing, it is safe to configure it twice
(or more).
import logging
import aiomisc
from aiomisc_log import basic_config
basic_config(log_format="color")
logging.info("Hello from usual python")
async def main():
logging.info("Hello from async python")
with aiomisc.entrypoint(log_format="color") as loop:
loop.run_until_complete(main())
Sometimes you want to configure logging manually, the following example demonstrates how to do this:
import os
import logging
from logging.handlers import RotatingFileHandler
from gzip import GzipFile
import aiomisc
class GzipLogFile(GzipFile):
def write(self, data) -> int:
if isinstance(data, str):
data = data.encode()
return super().write(data)
class RotatingGzipFileHandler(RotatingFileHandler):
""" Really added just for example you have to test it properly """
def shouldRollover(self, record):
if not os.path.isfile(self.baseFilename):
return False
if self.stream is None:
self.stream = self._open()
return 0 < self.maxBytes < os.stat(self.baseFilename).st_size
def _open(self):
return GzipLogFile(filename=self.baseFilename, mode=self.mode)
async def main():
for _ in range(1_000):
logging.info("Hello world")
with aiomisc.entrypoint(log_config=False) as loop:
gzip_handler = RotatingGzipFileHandler(
"app.log.gz",
# Maximum 100 files by 10 megabytes
maxBytes=10 * 2 ** 20, backupCount=100
)
stream_handler = logging.StreamHandler()
formatter = logging.Formatter(
"[%(asctime)s] <%(levelname)s> "
"%(filename)s:%(lineno)d (%(threadName)s): %(message)s"
)
gzip_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
logging.basicConfig(
level=logging.INFO,
# Wrapping all handlers in separate streams will not block the
# event-loop even if gzip takes a long time to open the
# file.
handlers=map(
aiomisc.log.wrap_logging_handler,
(gzip_handler, stream_handler)
)
)
loop.run_until_complete(main())