BF-MT - Multitasking

../../../../../../../_images/MLPro-BF-MT_class_diagram.drawio.png

Ver. 1.9.1 (2024-01-05)

This module provides classes for multitasking with optional interprocess communication (IPC) based on shared objects. Multitasking in MLPro combines multrithreading and multiprocessing and simplifies parallel programming.

Annotation to multitasking: Standard Python package multiprocessing uses pickle for serialization. This leads to problems with more complex objects. That was the reason to opt for the more flexible package multiprocess, which is a fork of multiprocessing and uses dill for serialization.

See also: https://stackoverflow.com/questions/40234771/replace-pickle-in-python-multiprocessing-lib

class mlpro.bf.mt.Range(p_range: int = 2)

Bases: object

Property class that defines the possible ranges of asynchronous execution supported by MLPro.

Parameters:

p_range (int) – Range of asynchonicity

C_RANGE_NONE

Synchronous execution only.

Type:

int

C_RANGE_THREAD

Asynchronous execution as separate thread within the current process.

Type:

int

C_RANGE_PROCESS

Asynchronous execution as separate process within the current machine.

Type:

int

C_VALID_RANGES

List of valid ranges.

Type:

list

C_RANGE_NONE = 0
C_RANGE_THREAD = 1
C_RANGE_PROCESS = 2
C_VALID_RANGES = [0, 1, 2]
get_range() int
class mlpro.bf.mt.Shared(p_range: int = 2)

Bases: Range

Template class for shared objects. It is ready to use and the default class for IPC. It provides elementary mechanisms for access control and messaging.

It is also possible to inherit and enrich a child class for special needs but please beware that at least in multiprocessing mode (p_range=Range.C_RANGE_PROCESS) a direct access to attributes is not possible. Child classes should generally provide suitable methods for access to attribues.

Parameters:

p_range (int) – Range of asynchonicity

C_MSG_TYPE_DATA = 0
C_MSG_TYPE_TERM = 1
lock(p_tid=None, p_timeout: float = None) bool

Locks the shared object for a specific process.

Parameters:
  • p_tid – Unique task id. If None then the internal locking mechanism is disabled.

  • p_timeout (float) – Optional timeout in seconds. If None, timeout is infinite.

  • Returns

  • True

  • otherwise. (if shared object was locked successfully. False)

unlock()

Unlocks the shared object.

checkin(p_tid)

Registers a task.

Parameters:

p_tid – Task id.

checkout(p_tid)

Unregisters a task.

Parameters:

p_tid – Task id.

add_result(p_tid, p_result)

Adds a result for a task.

Parameters:
  • p_tid – Task id.

  • p_result – Any kind of result data.

get_result(p_tid)

Returns the result data of a task.

Parameters:

p_tid – Task id.

Returns:

Result data of a task.

Return type:

task_results

get_results()

Returns reference to internal dictionary of results

Returns:

results – Dictionary of results

Return type:

dict

clear_results()

Clears internal dictionary of results

class mlpro.bf.mt.Async(p_range_max: int = 2, p_class_shared=None, p_logging=True)

Bases: Range, Log

Property class that enables child classes to run sub-tasks asynchronously. Depending on the given range a task can be executed as a separate thread in the same process or a separate process on the same machine.

Parameters:
  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_class_shared – Optional class for a shared object (class Shared or a child class of Shared)

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

_create_so(p_range: int, p_class_shared) Shared

Internal use. Creates a suitable shared object for the given range.

Parameters:
  • p_range (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_class_shared – Class for a shared object (class Shared or a child class of Shared)

Returns:

so – A new shared object

Return type:

Shared

get_so() Shared

Returns the associated shared object.

Returns:

so – Shared object of type Shared (or inherited)

Return type:

Shared

assign_so(p_so: Shared)

Assigns an existing shared object to the task. The task takes over the range of asynchronicity of the shared object if it is less than the current one of the task.

Parameters:

p_so (Shared) – Shared object.

_start_async(p_target, p_range: int = None, **p_kwargs) int

Starts a method or a new instance of a given class asynchronously. If neither a method nor a class is specified, a new instance of the current class is created asynchronously.

Parameters:
  • p_target – A class, method or function to be executed (a)synchronously depending on the actual range

  • p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.

  • p_kwargs (dictionary) – Parameters to be handed over to asynchonous method/instance

Returns:

range – Actual range of asynchronicity

Return type:

int

wait_async_tasks()

Waits until all internal asynchonous tasks are finished.

class mlpro.bf.mt.Task(p_id=None, p_name: str = None, p_range_max: int = 1, p_autorun=0, p_class_shared=None, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: Async, EventManager, Plottable, Persistent

Template class for a task, that can run things - and even itself - asynchronously in a thread or process. Tasks can run standalone or as part of a workflow (see class Workflow). The integrated event manager allows callbacks on specific events inside the same process(!).

Parameters:
  • p_id – Optional external id

  • p_name (str) – Optional name of the task. Default is None.

  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_THREAD.

  • p_autorun (int) – On value C_AUTORUN_RUN method run() is called imediately during instantiation. On vaule C_AUTORUN_LOOP method run_loop() is called. Value C_AUTORUN_NONE (default) causes an object instantiation without starting further actions.

  • p_class_shared – Optional class for a shared object (class Shared or a child class of Shared)

  • p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'Task'
C_AUTORUN_NONE = 0
C_AUTURUN_RUN = 1
C_AUTORUN_LOOP = 2
C_EVENT_FINISHED = 'FINISHED'
_get_custom_run_method()
get_tid()

Returns unique task id.

_autorun(p_autorun, **p_kwargs)

Internal method to automate a single or looped run.

Parameters:
  • p_autorun (int) – On value C_AUTORUN_RUN method run() is called imediately during instantiation. On vaule C_AUTORUN_LOOP method run_loop() is called. Value C_AUTORUN_NONE (default) causes an object instantiation without starting further actions.

  • p_kwargs (dict) – Further parameters handed over to method run().

run(p_range: int = None, p_wait: bool = False, **p_kwargs)

Executes the task specific actions implemented in custom method _run(). At the end event C_EVENT_FINISHED is raised to start subsequent actions (p_wait=True).

Parameters:
  • p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.

  • p_wait (bool) – If True, the method waits until all (a)synchronous tasks are finished.

  • p_kwargs (dict) – Further parameters handed over to custom method _run().

_run_async(**p_kwargs)

Internally used by method run(). It runs the custom method _run() and raises event C_EVENT_FINISHED.

Parameters:

p_kwargs (dict) – Custom parameters.

_run(**p_kwargs)

Custom method that is called (asynchronously) by method run().

Parameters:

p_kwargs (dict) – Custom parameters.

run_loop(**p_kwargs)

Executes method run() in a loop, until a message of type Shared.C_MSG_TYPE_TERM is sent to the task.

Parameters:

p_kwargs (dict) – Parameters for method run()

get_predecessors() list
set_predecessors(p_predecessor_tasks: list)

Used by class Workflow to inform a task about it’s number of predecessor tasks. See method run_on_event().

Parameters:

p_predecessor_ids (list) – List of ids of predecessor tasks in a workflow.

run_on_event(p_event_id, p_event_object: Event)

Can be used as event handler - in particular for other tasks in a workflow in combination with event C_EVENT_FINISHED. Method self.run() is called if the last predecessor task in a workflow has raised event C_EVENT_FINISHED.

Parameters:
  • p_event_id – Event id.

  • p_event_object (Event) – Event object with further context informations.

init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)

Initializes the plot functionalities of the class.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

update_plot(**p_kwargs)

Updates the plot.

Parameters:

**p_kwargs – Implementation-specific plot data and/or parameters.

class mlpro.bf.mt.Workflow(p_name: str = None, p_range_max=1, p_class_shared=None, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: Task

Ready-to-use container class for task groups. Objects of type Task (or inherited) can be added and chained to sequences or hierarchies of tasks.

Parameters:
  • p_name (str) – Optional name of the task. Default is None.

  • p_range_max (int) – Range of asynchonicity. See class Range. Default is Range.C_RANGE_THREAD.

  • p_class_shared – Optional class for a shared object (class Shared or a child class of Shared)

  • p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters handed over to every task within.

C_TYPE = 'Workflow'
C_NAME = ''
C_PLOT_ACTIVE: bool = True
switch_logging(p_logging)

Sets log level for the workflow and all tasks inside.

Parameters:

p_logging – Log level (see constants of class Log).

add_task(p_task: Task, p_pred_tasks: list = None)

Adds a task to the workflow.

Parameters:
  • p_task (Task) – Task object to be added.

  • p_pred_tasks (list) – Optional list of predecessor task objects

_get_plot_host_task(p_task: Task) Task
init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)

Initializes the plot of a workflow. The method creates a host figure for all tasks if no external host figure is parameterized. The sub-plots of the tasks are autmatically arranged within the host figure.

See method init_plot() of class mlpro.bf.plot.Plottable for further details.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

run(p_range: int = None, p_wait: bool = False, **p_kwargs)

Executes all tasks of the workflow. At the end event C_EVENT_FINISHED is raised to start subsequent actions (p_wait=True).

Parameters:
  • p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.

  • p_wait (bool) – If True, the method waits until all (a)synchronous tasks are finished.

  • p_kwargs (dict) – Further parameters handed over to custom method _run().

event_forwarder(p_event_id, p_event_object: Event)

Internally used to raise event C_EVENT_FINISHED on workflow level if all final tasks have been finished.

Parameters:
  • p_event_id – Event id.

  • p_event_object (Event) – Event object with further context informations.

wait_async_tasks()

Waits until all tasks are finished.