First loop
Let's write your first MAPE loop. You'll write a simplified loop of an ambulance that switch on/off the siren and increase speed in case of an emergency detected.
Ambulance loop¶
You'll implement this simple ambulance
loop, composed of 3 connected elements:
- Monitor
detect
- Planner
custom_policy
- Executer
exec
In the next sections this graphical notation will be more clear.
import mape
mape.init() # (1)
# Ambulance loop definition, named "ambulance"
loop = mape.Loop(uid='ambulance') # (2)
# Monitor element of ambulance loop
@loop.monitor # (3)
def detect(emergency, on_next): # (4)
on_next(emergency)
- mape.init accept different params to setup PyMAPE, allowing configuration by a config file and/or directly in the source code.
uid
is an unique identification for the defined loop. If you don't pass anuid
, system provide a random one for you.- Decorator create and register the new element monitor to the ambulance
loop
. - The function name
detect
is used asuid
of our element inside the ambulance loop.
A this point, you have a loop (ambulance
) made up of a single monitor element called detect
.
Loop and Element uid
Loop uid ambulance
must be unique in the whole app, instead detect
(monitor uid) must be so only within the loop to which it belongs. This allow to address an element by its path (eg. ambulance.detect
).
detect()
function is the monitor element. It accepts at least two positional params:
- stream item (
emergency
) - is the input value of the element. The elements talk with each other by streams following the ReactiveX philosophy. - on_next - is the function to call with the value to pass to the next linked elements (
subscribed()
). It can be called 0 or N times, you haven't to confused with the functionreturn
concept.
type of detect
detect
is no more a simple function but an object, instance of class Monitor
.
Go ahead with a plan
for add policies in case of an emergency detected.
@loop.plan(uid='custom_policy') # (1)
def policy(emergency, on_next, self):
# Emergency plans
if emergency is True:
on_next({'speed': self.emergency_speed}) # (2)
on_next({'siren': True})
else:
on_next({'speed': 80})
on_next({'siren': False})
policy.emergency_speed = 180
- Differently from the monitor, here we choose the element
uid
(custom_policy
instead ofpolicy
). - Use the object property
emergency_speed
set externally.
In this case, policy()
has a third param self
. When you add it to the function signature, you can directly access to the Element
object behind the scene.
This time, respect previous detect()
, you pass a dict
to the next linked element.
The last element defined and registered is the execute element, exec
applies the plan on the managed system (ie. ambulance), in our case simply print an output.
@loop.execute
def exec(item, on_next):
if 'speed' in item:
print(f"Speed: {item['speed']} Km/h")
if 'siren' in item:
print(f"Siren is {'ON' if item['siren'] else 'OFF'}")
The last step to complete ambulance loop is to connect (subscribe()
) the three elements:
from mape import operators as ops # (1)
# Alternative way
detect.pipe(
ops.through(policy),
ops.through(exec)
).subscribe()
ops
are all the RxPY operators plus some more extras.
You are finally ready to try your ambulance emergency system, sending a detected emergency to the monitor element detect
:
- Just call the
detect()
function and pass the item as param.
As you can see, nothing happens!
This is the wanted behaviour, because you have to start element detect
whenever you are ready to receive the stream.
Different elements behaviour
Elements can have 3 different behaviors with respect to the stream passing through them:
- manual start (monitor): the element cannot send or receive items until the
.start()
method invocation. This is often related to internal resource lifetime (eg. DB connection, socket, ...). It allows to connect all loops and elements before start receiving the stream. - start on subscribe (analyzer, planner): the element will be started when someone is interested to it (ie.
subscribed()
). - start on init (executer): the element is ready from its initialization.
So, we start the monitor element and simulate an emergency:
Get the results:
Speed: 180
Siren is ON
Again, ending the emergency (ie. False
):
Get the results:
Speed: 80
Siren is OFF
Operators¶
Inspired by the functional programming, there are plenty operators available in RxPY and directly usable in PyMAPE. They can be chained in a pipeline, allowing transformations, combinations, filtering, grouping, error managing, applying conditions, making math operations, and so on.
There are more than 140 operators (about 400 variants), and we advise and encourage the creation of your custom operators following your need.
Filters¶
In the ambulance example there is an issue. If the monitor collect an emergency multiple time, all the following element (planner and executer) are still invoked.
Same on emergency end
To solve the problem, we can put a filter
at the input of planner (or the output of monitor):
@loop.plan(uid='custom_policy', ops_in=ops.distinct_until_changed()) # (1)
def policy(emergency, on_next, self):
# Emergency plans
if emergency is True:
on_next({'speed': self.emergency_speed}) # (2)
on_next({'siren': True})
else:
on_next({'speed': 80})
on_next({'siren': False})
- Simply use the param
ops_out
to put the operator on output.
distinct_until_changed()
allows sequence that contains only distinct contiguous item.
A better example
Ambulance can be not the best example to understand the use of distinct_until_changed()
, but try to image a monitor that use polling to read state of managed system. The values read can be the same until the system change the state (eg. motion detection, ambient temprature, ...).
Loop and Element access¶
PyMAPE provide a simplified access to his main entities. For example, you can iterate:
# Iterate over all loops and elements defined in the app
for loop in mape.app:
print(f"* {loop.uid}")
for element in loop:
print(f" - {element.uid}")
Returns:
* ambulance
- detect
- custom_policy
- exec
...or use python list comprehension to retrieve particular elements:
# Get all execute elements
[element.uid for element in mape.app.ambulance if isinstance(element, mape.Execute)]
Returns:
['exec']
You can chain the loop and element uid
(dot-notation path) to have direct access to the elements:
Same result is obtained with:
- Allowing use of variable as dot-notation path.
Debug¶
Some tips for debug your loops and elements crossed by the streams.
Internal library debug
To enable PyMAPE debug (ie. verbose information), remember to configure your logger. At least:
import logging
logging.basicConfig(level=logging.INFO) # (1)
logging.getLogger('mape').setLevel(logging.DEBUG)
- Set level for "root" module logger
- Set level for PyMAPE internal logger
or enable PyMAPE debug, simply:
- Use only if you want try our internal logger settings.
Element¶
You can print the in and/or out items stream of an element:
Sample debug output
detect(True)
return:
# detect(True)
Speed: 180
(9373) on next: {'speed': 180} | in > [ambulance.exec]
(2472) on next: {'speed': 180} | [ambulance.custom_policy] > out
Siren is ON
(9373) on next: {'siren': True} | in > [ambulance.exec]
(2472) on next: {'siren': True} | [ambulance.custom_policy] > out
(2073) on next: True | in > [ambulance.custom_policy]
(2388) on next: True | strem out of detect()
(2616) on next: True | [ambulance.detect] > out
The history order is due the recursive call, on which ReactiveX is based. If necessary this beahviour can be "fixed" in the next realease.
It use PyMAPE internal logger.
LogObserver¶
You can create a fake element (LogObserver
) and attach it to real element. The LogObserver
doesn't modify the stream but simply print it.
If you want print all the stream output from the montiro, you can modify the loop subscription of ambulance like this:
Sample debug output
detect(True)
return:
It use "root" module logger or print()
if not exist.