Skip to content

Main entities

To know the entities that compose PyMAPE, and their relationships, you can see a shrink version of its metamodel.

Metamodel

PyMAPE metamodel

Entities

Explore the entities (used to develop your loops), starting from left to right:

  • mape package allows the configuration by init() method and also by a config file (default: mape.yml in your working directory) or a mix of both. init() has priority over the file. From mape you have direct access to app and config.
  • config dict exposes all your configuration (eg. mape.config['debug']), and you can also use it for your purpose, putting what you want in the configuration file.
  • App gives access to all declared MAPE loops, levels, and the global Knowledge
  • Loop is identified by a uid, contains its Element, has its Knowledge, and give access to the main app object (ie. loop.app) and its level (if exists).
  • Level is optional (there is an empty one by default), It can be used to describe a hierarchical control pattern. At each level with its uid can be associated more loops.
  • Element (ie. M, A, P, E) is the pillar of PyMAPE framework, the center of the metamodel. They have an uid, but must be unique only inside its loop. There are different kinds of elements (Monitor, Analyze, Plan, Execute) and relative behaviors (base, StartOnSubscribe, StartOnInit). It has two ports (in and out), that allow to put within a stream. The operators can be added in the ports itself, or between elements (in the operators pipe).
  • There are three different Knowledge, with different scopes (App, Level, and Loop). You can access each of them from its object (eg. loop.k). You can use it like a local storage object, or as centralized memory shared by the app deployed on a different machine (using redis).
  • operators are the ones defined in the ReactiveX standard enriched with some specific and useful to our purpose.
  • The classes Message and CallMethod can be used as items of the stream, enriched with additional information in the payload. This allows addressing our stream give at each item a destination or applying some kind of routing algorithm (Message), like the IP packets. The CallMethod item allows calling the remote methods (of elements) using the stream.
  • you have already seen the loop_decorator in the First loop section, used to register elements to a loop.
  • element_decorator provides a set of decorators to define the Element class starting from a simple function. These decorators (as the above) are syntactic sugar to speed up development.

Info

Please refer to the First loop section and the following sections, to see how and when these entities are used.

mape

mape.init(debug=False, asyncio_loop=None, redis_url=None, rest_host_port=None, config_file=None)

Initialize the PyMAPE framework, internal variables and services. Allow configuration by passed arguments and/or config_file, giving priority on the first one.

Call it before start using the framework.

Examples:

mape.init(debug=True,
        redis_url="redis://localhost:6379",
        rest_host_port="0.0.0.0:6060",
        config_file="custom.yml")

Parameters:

Name Type Description Default
debug bool

Enable more verbose mode.

False
asyncio_loop AbstractEventLoop | None

Provide your asyncio loop or leave PyMAPE to generate one for you.

None
redis_url string | None

Url of your Redis instance (eg. redis://localhost:6379)

None
rest_host_port string | None

Web server "host:port", where REST API endpoint will be provided (eg. 0.0.0.0:6060).

None
config_file string | None

Path (absolute or relative to working directory) to the config file (default mape.yml).

None
Source code in mape/__init__.py
def init(debug: bool = False,
         asyncio_loop: AbstractEventLoop | None = None,
         redis_url: string | None = None,
         rest_host_port: string | None = None,
         config_file: string | None = None
         ) -> None:
    """Initialize the PyMAPE framework, internal variables and services.
    Allow configuration by passed arguments and/or `config_file`, giving priority on the first one.

    Call it before start using the framework.

    Examples:
        ```python
        mape.init(debug=True,
                redis_url="redis://localhost:6379",
                rest_host_port="0.0.0.0:6060",
                config_file="custom.yml")
        ```

    Args:
        debug: Enable more verbose mode.
        asyncio_loop: Provide your asyncio loop or leave PyMAPE to generate one for you.
        redis_url: Url of your Redis instance (eg. `redis://localhost:6379`)
        rest_host_port: Web server "host:port", where REST API endpoint will be provided (eg. `0.0.0.0:6060`).
        config_file: Path (absolute or relative to working directory) to the config file (default `mape.yml`).
    """
    global config, aio_loop, rx_scheduler, redis, fastapi, app

    if debug:
        # Configure a base (root module ) logger (StreamHandler, Formatter, etc...),
        # if user hasn't do one but enable debug
        logging.basicConfig(level=logging.DEBUG)
        # Set debug level also for PyMAPE
        logging.getLogger(__name__).setLevel(logging.DEBUG)

    config_file = config_file or mape_config.default_config_file

    if config := mape_config.load(config_file):
        logger.info(f"{config_file} loaded")
    else:
        logger.warning(f"{config_file} not found, using default config")
        config = mape_config.default

    mape_config.set('debug', debug)
    mape_config.set('redis.url', redis_url)
    mape_config.set('rest.host_port', rest_host_port)

    logger.debug(f"Config: {config}")

    # loop = asyncio.new_event_loop()
    # asyncio.set_event_loop(loop)
    # loop = asyncio_loop or loop or asyncio.get_event_loop()

    # TODO: check comment
    # We don't create a new event loop by default, because we want to be
    # sure that when this is called multiple times, each call of `init()`
    # goes through the same event loop. This way, users can schedule
    # background-tasks that keep running across multiple prompts.
    try:
        aio_loop = asyncio_loop or asyncio.get_event_loop()
    except RuntimeError:
        # Possibly we are not running in the main thread, where no event
        # loop is set by default. Or somebody called `asyncio.run()`
        # before, which closes the existing event loop. We can create a new
        # loop.
        aio_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(aio_loop)

    rx_scheduler = rx_scheduler or AsyncIOScheduler(aio_loop)

    if mape_config.get('redis.url'):
        redis = aioredis.from_url(redis_url, db=0)

    app = App(redis)

    if mape_config.get('rest.host_port'):
        fastapi = rest_setup(app, __version__)
        _start_web_server(rest_host_port, aio_loop)

    set_influxdb(mape_config.get('influxdb'))
    set_debug(mape_config.get('debug'))