2.4. Services#

Services is an abstraction to help organize lots of different tasks in one process. Each service must implement start() method and can implement stop() method.

Service instance should be passed to the entrypoint, and will be started after the event loop has been created.

Note

Current event-loop will be set before start() method called. The event loop will be set as current for this thread.

Please avoid using asyncio.get_event_loop() explicitly inside start() method. Use self.loop instead:

import asyncio
from threading import Event
from aiomisc import entrypoint, Service

event = Event()

class MyService(Service):
  async def start(self):
      # Send signal to entrypoint for continue running
      self.start_event.set()

      event.set()
      # Start service task
      await asyncio.sleep(3600)


with entrypoint(MyService()) as loop:
    assert event.is_set()

Method start() creates as a separate task that can run forever. But in this case self.start_event.set() should be called for notifying entrypoint.

During graceful shutdown method stop() will be called first, and after that, all running tasks will be canceled (including start()).

This package contains some useful base classes for simple services writing.

2.4.1. TCPServer#

TCPServer - it’s a base class for writing TCP servers. Just implement handle_client(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer


log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


async def echo_client(host, port):
    reader, writer = await asyncio.open_connection(host=host, port=port)
    writer.write(b"hello\n")
    assert await reader.readline() == b"hello\n"

    writer.write(b"world\n")
    assert await reader.readline() == b"world\n"

    writer.close()
    await writer.wait_closed()


with entrypoint(
    EchoServer(address='localhost', port=8901),
) as loop:
    loop.run_until_complete(echo_client("localhost", 8901))

2.4.2. UDPServer#

UDPServer - it’s a base class for writing UDP servers. Just implement handle_datagram(data, addr) to use it.

class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


with entrypoint(UDPPrinter(address='localhost', port=3000)) as loop:
    loop.run_forever()

2.4.3. TLSServer#

TLSServer - it’s a base class for writing TCP servers with TLS. Just implement handle_client(reader, writer) to use it.

class SecureEchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())

service = SecureEchoServer(
    address='localhost',
    port=8900,
    ca='ca.pem',
    cert='cert.pem',
    key='key.pem',
    verify=False,
)

with entrypoint(service) as loop:
    loop.run_forever()

2.4.4. TCPClient#

TCPClient - it’s a base class for writing TCP clients. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient

log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(TCPClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(address='localhost', port=8901),
    EchoClient(address='localhost', port=8901),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.5. TLSClient#

TLSClient - it’s a base class for writing TLS clients. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, TCPClient

log = logging.getLogger(__name__)


class EchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(TLSClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='server.pem',
        key='server.key',
    ),
    EchoClient(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='client.pem',
        key='client.key',
    ),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.6. RobustTCPClient#

RobustTCPClient - it’s a base class for writing TCP clients with auto-reconnection when connection lost. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient

log = logging.getLogger(__name__)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(RobustTCPClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(address='localhost', port=8901),
    EchoClient(address='localhost', port=8901),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.7. RobustTLSClient#

RobustTLSClient - it’s a base class for writing TLS clients with auto-reconnection when connection lost. Just implement handle_connection(reader, writer) to use it.

import asyncio
import logging
from aiomisc import entrypoint
from aiomisc.service import TCPServer, RobustTCPClient

log = logging.getLogger(__name__)


class EchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while not reader.at_eof():
            writer.write(await reader.read(255))

        log.info("Client connection closed")


class EchoClient(RobustTLSClient):

    async def handle_connection(self, reader: asyncio.StreamReader,
                                writer: asyncio.StreamWriter) -> None:
        writer.write(b"hello\n")
        assert await reader.readline() == b"hello\n"

        writer.write(b"world\n")
        assert await reader.readline() == b"world\n"

        writer.write_eof()
        writer.close()
        await writer.wait_closed()


with entrypoint(
    EchoServer(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='server.pem',
        key='server.key',
    ),
    EchoClient(
        address='localhost', port=8901,
        ca='ca.pem',
        cert='client.pem',
        key='client.key',
    ),
) as loop:
    loop.run_until_complete(asyncio.sleep(0.1))

2.4.8. PeriodicService#

PeriodicService runs PeriodicCallback as a service and waits for the running callback to complete on the stop method. You need to use PeriodicService as a base class and override callback async coroutine method.

Service class accepts required interval argument - periodic interval in seconds and optional delay argument - periodic execution delay in seconds (0 by default).

import aiomisc
from aiomisc.service.periodic import PeriodicService


class MyPeriodicService(PeriodicService):
    async def callback(self):
        log.info('Running periodic callback')
        # ...

service = MyPeriodicService(interval=3600, delay=0)  # once per hour

with entrypoint(service) as loop:
    loop.run_forever()

2.4.9. DNS Server#

The DNS server described here uses the aiomisc library, which provides utilities for asynchronous I/O operations, and the dnslib library for handling DNS records and packets. This setup is ideal for high-performance, non-blocking DNS query handling.

2.4.9.1. Key Features#

  1. Asynchronous I/O: Utilizes asynchronous operations to handle multiple DNS queries concurrently, ensuring high performance and scalability.

  2. UDP and TCP Support: Supports DNS queries over both UDP and TCP protocols, making it versatile for various network configurations.

  3. ``EDNS0`` Support: Implements Extension Mechanisms for DNS (EDNS0) to handle larger DNS messages and extended functionalities.

  4. Customizable DNS Records: Allows easy configuration of DNS zones and records, enabling you to define and manage your DNS entries efficiently.

2.4.9.2. Prerequisites#

Install the required libraries using pip:

pip install aiomisc[dns]

2.4.9.3. Setting Up the Server#

from aiomisc import entrypoint
from aiomisc.service.dns import (
    DNSStore, DNSZone, TCPDNSServer, UDPDNSServer, records,
)

zone = DNSZone("test.")
zone.add_record(records.A.create("test.", "10.10.10.10"))
zone.add_record(records.AAAA.create("test.", "fd10::10"))

store = DNSStore()
store.add_zone(zone)
services = [
    UDPDNSServer(
        store=store, address="::1", port=5053,
    ),
    TCPDNSServer(
        store=store, address="::1", port=5053,
    ),
]

if __name__ == "__main__":
    with entrypoint(*services, log_level="debug") as loop:
        loop.run_forever()

2.4.9.4. Testing the Server#

You can test the DNS server using tools like dig. For example, to query the A and AAAA records for test., use the following commands:

dig @::1 -p 5053 test. A
dig @::1 -p 5053 test. AAAA
dig @::1 -p 5053 +tcp test. A
dig @::1 -p 5053 +tcp test. AAAA

These commands should return the IP addresses 10.10.10.10 and fd10::10 respectively, confirming that the DNS server is working correctly.

2.4.9.5. Dynamic Store Management#

One of the powerful features of this DNS server setup is the ability to dynamically manage the DNS store. This allows you to add or remove zones and records at runtime, without needing to restart the server.

Managing DNS zones and records dynamically is essential for a flexible DNS server setup. This guide focuses on how to manipulate DNS zones and records using the DNSStore and DNSZone classes, providing practical examples for each operation.

2.4.9.6. Adding a Zone#

You can add a new zone to the DNSStore to manage its records. This operation ensures that the zone is available for DNS queries.

from aiomisc.service.dns import DNSStore, DNSZone

# Create a DNSStore instance
dns_store = DNSStore()

# Create a DNSZone instance
zone = DNSZone("example.com.")

# Add the zone to the store
dns_store.add_zone(zone)

# Verify the zone is added
assert dns_store.get_zone("example.com.") is zone

2.4.9.7. Removing a Zone#

Removing a zone from the DNSStore ensures it is no longer available for DNS queries.

# Add the zone to the store
dns_store.add_zone(zone)

# Remove the zone from the store
dns_store.remove_zone("example.com.")

# Verify the zone is removed
assert dns_store.get_zone("example.com.") is None

2.4.9.8. Adding a Record to a Zone#

To manage DNS entries, you can add records to a specific zone. This operation makes the record available for DNS resolution within that zone.

from aiomisc.service.dns.records import A, RecordType

# Create a DNSZone instance
zone = DNSZone("example.com.")

# Create an A record
record = A.create(name="www.example.com.", ip="192.0.2.1")

# Add the record to the zone
zone.add_record(record)

# Add the zone to the store
dns_store.add_zone(zone)

# Query the store for the record
records = dns_store.query("www.example.com.", RecordType.A)

# Verify the record is added
assert record in records

2.4.9.9. Querying Records#

You can query the DNSStore to retrieve records for a specific domain name. This is useful to verify if a record exists or to handle DNS queries.

# Query the store for a nonexistent record
records = dns_store.query("nonexistent.example.com.", RecordType.A)

# Verify no records are found
assert len(records) == 0

2.4.9.10. Handling Duplicate Zones#

Attempting to add a zone that already exists should raise an error, ensuring that each zone is unique within the DNSStore.

# Add the zone to the store
dns_store.add_zone(zone)

# Attempt to add the same zone again
try:
    dns_store.add_zone(zone)
except ValueError as e:
    print(f"Error: {e}")

2.4.9.11. Removing a Nonexistent Zone#

Removing a zone that does not exist should raise an error, indicating that the operation is invalid.

# Attempt to remove a nonexistent zone
try:
    dns_store.remove_zone("nonexistent.com.")
except ValueError as e:
    print(f"Error: {e}")

2.4.9.12. Querying Subdomains#

The DNSStore supports querying subdomains, allowing you to resolve records within subdomains of an existing zone.

# Create a DNSZone instance
zone = DNSZone("example.com.")

# Create an A record for a subdomain
record = A.create(name="sub.example.com.", ip="192.0.2.2")

# Add the record to the zone
zone.add_record(record)

# Add the zone to the store
dns_store.add_zone(zone)

# Query the store for the subdomain record
records = dns_store.query("sub.example.com.", RecordType.A)

# Verify the subdomain record is found
assert record in records

2.4.9.13. Retrieving a Zone#

You can retrieve a zone from the DNSStore to inspect or manipulate it further.

# Add the zone to the store
dns_store.add_zone(zone)

# Retrieve the zone from the store
retrieved_zone = dns_store.get_zone("example.com.")

# Verify the zone is retrieved
assert retrieved_zone is zone

2.4.9.14. Handling Nonexistent Zones#

Retrieving a zone that does not exist should return None.

# Attempt to retrieve a nonexistent zone
nonexistent_zone = dns_store.get_zone("nonexistent.com.")

# Verify no zone is retrieved
assert nonexistent_zone is None

2.4.9.15. Removing a Record from a Zone#

To remove a DNS record from a zone, you can use the remove_record method. This operation ensures the record is no longer available for DNS resolution.

# Create a DNSZone instance
zone = DNSZone("example.com.")

# Create an A record
record = A.create(name="www.example.com.", ip="192.0.2.1")

# Add the record to the zone
zone.add_record(record)

# Remove the record from the zone
zone.remove_record(record)

# Add the zone to the store
dns_store.add_zone(zone)

# Query the store for the record
records = dns_store.query("www.example.com.", RecordType.A)

# Verify the record is removed
assert len(records) == 0

2.4.9.16. Finding Zone by Prefix#

The DNSStore can find the appropriate zone for a given domain name, which is useful for handling queries with subdomains.

# Create a DNSZone instance
zone = DNSZone("example.com.")

# Add the zone to the store
dns_store.add_zone(zone)

# Find the zone for a subdomain
zone_prefix = dns_store.get_zone_for_name("sub.example.com.")

# Verify the correct zone is found
assert zone_prefix == ("com", "example")

2.4.10. CronService#

CronService runs CronCallback's as a service and waits for running callbacks to complete on the stop method.

It’s based on croniter. You can register async coroutine method with spec argument - cron like format:

Warning

requires installed croniter:

pip install croniter

or using extras:

pip install aiomisc[cron]
import aiomisc
from aiomisc.service.cron import CronService


async def callback():
    log.info('Running cron callback')
    # ...

service = CronService()
service.register(callback, spec="0 * * * *") # every hour at zero minutes

with entrypoint(service) as loop:
    loop.run_forever()

You can also inherit from CronService, but remember that callback registration should be proceeded before start

import aiomisc
from aiomisc.service.cron import CronService


class MyCronService(CronService):
    async def callback(self):
        log.info('Running cron callback')
        # ...

    async def start(self):
        self.register(self.callback, spec="0 * * * *")
        await super().start()

service = MyCronService()

with entrypoint(service) as loop:
    loop.run_forever()

2.4.11. Multiple services#

Pass several service instances to the entrypoint to run all of them. After exiting the entrypoint service instances will be gracefully shut down.

import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(PeriodicService):
    async def callabck(self):
        print('Hello from service', self.name)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())


class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


services = (
    LoggingService(name='#1', interval=1),
    EchoServer(address='localhost', port=8901),
    UDPPrinter(address='localhost', port=3000),
)


with entrypoint(*services) as loop:
    loop.run_forever()

2.4.12. Configuration#

Service metaclass accepts all kwargs and will set it to self as attributes.

import asyncio
from aiomisc import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(Service):
    # required kwargs
    __required__ = frozenset({'name'})

    # default value
    delay: int = 1

    async def start(self):
        self.start_event.set()
        while True:
            # attribute ``name`` from kwargs
            # must be defined when instance initializes
            print('Hello from service', self.name)

            # attribute ``delay`` from kwargs
            await asyncio.sleep(self.delay)

services = (
    LoggingService(name='#1'),
    LoggingService(name='#2', delay=3),
)


with entrypoint(*services) as loop:
    loop.run_forever()

2.4.13. aiohttp service#

Warning

requires installed aiohttp:

pip install aiohttp

or using extras:

pip install aiomisc[aiohttp]

aiohttp application can be started as a service:

import aiohttp.web
import argparse
from aiomisc import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--address", default="::",
                   help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return aiohttp.web.Response(text=text)


class REST(AIOHTTPService):
    async def create_application(self):
        app = aiohttp.web.Application()

        app.add_routes([
            aiohttp.web.get('/', handle),
            aiohttp.web.get('/{name}', handle)
        ])

        return app

arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

Class AIOHTTPSSLService is similar to AIOHTTPService but creates an HTTPS server. You must pass SSL-required options (see TLSServer class).

2.4.14. asgi service#

Warning

requires installed aiohttp-asgi:

pip install aiohttp-asgi

or using extras:

pip install aiomisc[asgi]

Any ASGI-like application can be started as a service:

import argparse

from fastapi import FastAPI

from aiomisc import entrypoint
from aiomisc.service.asgi import ASGIHTTPService, ASGIApplicationType

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--address", default="::",
                   help="Listen HTTP address")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}


class REST(ASGIHTTPService):
    async def create_asgi_app(self) -> ASGIApplicationType:
        return app


arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

Class ASGIHTTPSSLService is similar to ASGIHTTPService but creates HTTPS server. You must pass SSL-required options (see TLSServer class).

2.4.15. uvicorn service#

Warning

requires installed uvicorn:

pip install uvicorn

or using extras:

pip install aiomisc[uvicorn]

Any ASGI-like application can be started via uvicorn as a service:

import argparse

from fastapi import FastAPI

from aiomisc import entrypoint
from aiomisc.service.uvicorn import UvicornApplication, UvicornService

parser = argparse.ArgumentParser()
group = parser.add_argument_group('HTTP options')

group.add_argument("-l", "--host", default="::",
                   help="Listen HTTP host")
group.add_argument("-p", "--port", type=int, default=8080,
                   help="Listen HTTP port")


app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}


class REST(UvicornService):
    async def create_application(self) -> UvicornApplication:
        return app


arguments = parser.parse_args()
service = REST(host=arguments.host, port=arguments.port)

with entrypoint(service) as loop:
    loop.run_forever()

2.4.16. GRPC service#

This is an example of a GRPC service which is defined in a file and loads a hello.proto file without code generation, this example is one of the examples from grpcio, the other examples will work as expected.

Proto definition:

syntax = "proto3";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

Service initialization example:

import grpc

import aiomisc
from aiomisc.service.grpc_server import GRPCService


protos, services = grpc.protos_and_services("hello.proto")


class Greeter(services.GreeterServicer):
    async def SayHello(self, request, context):
        return protos.HelloReply(message='Hello, %s!' % request.name)


def main():
    grpc_service = GRPCService(compression=grpc.Compression.Gzip)
    services.add_GreeterServicer_to_server(
        Greeter(), grpc_service,
    )
    grpc_service.add_insecure_port('[::]:0')
    grpc_service.add_insecure_port('[::1]:0')
    grpc_service.add_insecure_port('127.0.0.1:0')
    grpc_service.add_insecure_port('localhost:0')
    grpc_service.add_secure_port('localhost:0', grpc.local_server_credentials())
    grpc_service.add_secure_port('[::]:0', grpc.local_server_credentials())

    with aiomisc.entrypoint(grpc_service) as loop:
        loop.run_forever()


if __name__ == '__main__':
    main()

To enable reflection for the service you use reflection flag:

GRPCService(reflection=True)

2.4.17. Memory Tracer#

Simple and useful service for logging large python objects allocated in memory.

import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import MemoryTracer


async def main():
    leaking = []

    while True:
        leaking.append(os.urandom(128))
        await asyncio.sleep(0)


with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
    loop.run_until_complete(main())

Output example:

[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
      12 |       12 |   1.9KiB |   1.9KiB | aiomisc/periodic.py:40
      12 |       12 |   1.8KiB |   1.8KiB | aiomisc/entrypoint.py:93
       6 |        6 |   1.1KiB |   1.1KiB | aiomisc/thread_pool.py:71
       2 |        2 |   976.0B |   976.0B | aiomisc/thread_pool.py:44
       5 |        5 |   712.0B |   712.0B | aiomisc/thread_pool.py:52

[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
   43999 |    43999 |   7.1MiB |   7.1MiB | scratches/scratch_8.py:11
      47 |       47 |   4.7KiB |   4.7KiB | env/bin/../lib/python3.7/abc.py:143
      33 |       33 |   2.8KiB |   2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
      44 |       44 |   2.4KiB |   2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
      14 |       14 |   2.4KiB |   2.4KiB | aiomisc/periodic.py:40

2.4.18. Profiler#

Simple service for profiling. Optional path argument can be provided to dump complete profiling data, which can be later used by, for example, snakeviz. Also can change ordering with the order argument (“cumulative” by default).

import asyncio
import os
from aiomisc import entrypoint
from aiomisc.service import Profiler


async def main():
    for i in range(100):
        time.sleep(0.01)


with entrypoint(Profiler(interval=0.1, top_results=5)) as loop:
    loop.run_until_complete(main())

Output example:

108 function calls in 1.117 seconds

Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   100    1.117    0.011    1.117    0.011 {built-in method time.sleep}
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:89(__init__)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:99(init)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/pstats.py:118(load_stats)
     1    0.000    0.000    0.000    0.000 <...>/lib/python3.7/cProfile.py:50(create_stats)

2.4.19. Raven service#

Simple service for sending unhandled exceptions to the sentry service instance.

Simple example:

import asyncio
import logging
import sys

from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender


async def main():
    while True:
        await asyncio.sleep(1)

        try:
            1 / 0
        except ZeroDivisionError:
            logging.exception("Exception")


raven_sender = RavenSender(
    sentry_dsn=(
        "https://[email protected]/"
        "5530251"
    ),
    client_options=dict(
        # Got environment variable SENTRY_NAME by default
        name="example-from-aiomisc",
        # Got environment variable SENTRY_ENVIRONMENT by default
        environment="simple_example",
        # Got environment variable SENTRY_RELEASE by default
        release=__version__,
    )
)


with entrypoint(raven_sender) as loop:
    loop.run_until_complete(main())

Full configuration:

import asyncio
import logging
import sys

from aiomisc import entrypoint
from aiomisc.version import __version__
from aiomisc.service.raven import RavenSender


async def main():
    while True:
        await asyncio.sleep(1)

        try:
            1 / 0
        except ZeroDivisionError:
            logging.exception("Exception")


raven_sender = RavenSender(
    sentry_dsn=(
        "https://[email protected]/"
        "5530251"
    ),
    client_options=dict(
        # Got environment variable SENTRY_NAME by default
        name="",
        # Got environment variable SENTRY_ENVIRONMENT by default
        environment="full_example",
        # Got environment variable SENTRY_RELEASE by default
        release=__version__,

        # Default options values
        include_paths=set(),
        exclude_paths=set(),
        auto_log_stacks=True,
        capture_locals=True,
        string_max_length=400,
        list_max_length=50,
        site=None,
        include_versions=True,
        processors=(
            'raven.processors.SanitizePasswordsProcessor',
        ),
        sanitize_keys=None,
        context={'sys.argv': getattr(sys, 'argv', [])[:]},
        tags={},
        sample_rate=1,
        ignore_exceptions=(),
    )
)


with entrypoint(raven_sender) as loop:
    loop.run_until_complete(main())

You will find the full specification of options in the Raven documentation.

2.4.20. SDWatchdogService#

Ready to use service just adding to your entrypoint and notifying SystemD service watchdog timer.

This can be safely added at any time, since if the service does not detect systemd-related environment variables, then its initialization is skipped.

Example of python file:

import logging
from time import sleep

from aiomisc import entrypoint
from aiomisc.service.sdwatchdog import SDWatchdogService


if __name__ == '__main__':
    with entrypoint(SDWatchdogService()) as loop:
        pass

Example of systemd service file:

[Service]
# Activating the notification mechanism
Type=notify

# Command which should be started
ExecStart=/home/mosquito/.venv/aiomisc/bin/python /home/mosquito/scratch.py

# The time for which the program must send a watchdog notification
WatchdogSec=5

# Kill the process if it has stopped responding to the watchdog timer
WatchdogSignal=SIGKILL

# The service should be restarted on failure
Restart=on-failure

# Try to kill the process instead of cgroup
KillMode=process

# Trying to stop service properly
KillSignal=SIGINT

# Trying to restart service properly
RestartKillSignal=SIGINT

# Send SIGKILL when timeouts are exceeded
FinalKillSignal=SIGKILL
SendSIGKILL=yes

2.4.21. ProcessService#

A base class for launching a function by a separate system process, and by termination when the parent process is stopped.

from typing import Dict, Any

import aiomisc.service

# Fictional miner implementation
from .my_miner import Miner


class MiningService(aiomisc.service.ProcessService):
    bitcoin: bool = False
    monero: bool = False
    dogiecoin: bool = False

    def in_process(self) -> Any:
        if self.bitcoin:
            miner = Miner(kind="bitcoin")
        elif self.monero:
            miner = Miner(kind="monero")
        elif self.dogiecoin:
            miner = Miner(kind="dogiecoin")
        else:
            # Nothing to do
            return

        miner.do_mining()


services = [
    MiningService(bitcoin=True),
    MiningService(monero=True),
    MiningService(dogiecoin=True),
]

if __name__ == '__main__':
    with aiomisc.entrypoint(*services) as loop:
        loop.run_forever()

2.4.22. RespawningProcessService#

A base class for launching a function by a separate system process, and by termination when the parent process is stopped, It’s pretty like ProcessService but have one difference when the process unexpectedly exited this will be respawned.

import logging
from typing import Any

import aiomisc

from time import sleep


class SuicideService(aiomisc.service.RespawningProcessService):
    def in_process(self) -> Any:
        sleep(10)
        logging.warning("Goodbye mad world")
        exit(42)


if __name__ == '__main__':
    with aiomisc.entrypoint(SuicideService()) as loop:
        loop.run_forever()