OA-STREAMS - Online-adaptive Stream Processing

content/99_appendices/appendix2/sub/core/mlpro_oa/images/01_environments/MLPro-OA-Proc_class_diagram.drawio.png

Ver. 0.8.0 (2023-12-20)

Core classes for online adaptive stream processing.

class mlpro.oa.streams.basics.OAShared(p_range: int = 2)

Bases: StreamShared

Template class for shared objects in the context of online adaptive stream processing.

class mlpro.oa.streams.basics.OATask(p_name: str = None, p_range_max=1, p_ada: bool = True, p_buffer_size: int = 0, p_duplicate_data: bool = False, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: StreamTask, Model

Template class for online adaptive ML tasks.

Parameters:
  • p_name (str) – Optional name of the task. Default is None.

  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_buffer_size (int) – Initial size of internal data buffer. Defaut = 0 (no buffering).

  • p_duplicate_data (bool) – If True, instances will be duplicated before processing. Default = False.

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'OA-Task'
C_PLOT_ACTIVE: bool = True
C_PLOT_STANDALONE: bool = True
C_PLOT_VALID_VIEWS: list = ['2D', '3D', 'ND']
C_PLOT_DEFAULT_VIEW: str = 'ND'
adapt(p_inst_new: List[Instance], p_inst_del: List[Instance]) bool

Adapts the model by calling the custom method _adapt().

Parameters:

p_kwargs (dict) – All parameters that are needed for the adaption. Depends on the specific higher context.

Returns:

adapted – True, if something has been adapted. False otherwise.

Return type:

bool

_adapt(p_inst_new: List[Instance]) bool

Obligatory custom method for adaptations during regular operation.

Parameters:

p_inst_new (list) – List of new stream instances to be processed.

Returns:

adapted – True, if something has been adapted. False otherwise.

Return type:

bool

_adapt_reverse(p_inst_del: List[Instance]) bool

Optional custom method for reverse adaptations during regular operation.

Parameters:

p_inst_del (list) – List of obsolete stream instances to be removed.

Returns:

adapted – True, if something has been adapted. False otherwise.

Return type:

bool

_renormalize(p_normalizer: Normalizer)

Custom method to renormalize internally buffered data using the given normalizer object. This is necessary after an adaptation of a related predecessor normalizer task. See method renormalize_on_event() for further details.

Parameters:

p_normalizer (Normalizer) – Normalizer object to be applied on task-specific

renormalize_on_event(p_event_id: str, p_event_object: Event)

Event handler method to be registered on event Model.C_EVENT_ADAPTED of an online adaptive normalizer task. It carries out the task-specific renormalization of internally buffered data by calling the custom method _renormalize().

Parameters:
  • p_event_id (str) – Unique event id

  • p_event_object (Event) – Event object with further context informations

class mlpro.oa.streams.basics.OAWorkflow(p_name: str = None, p_range_max=1, p_class_shared=<class 'mlpro.oa.streams.basics.OAShared'>, p_ada: bool = True, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: StreamWorkflow, AWorkflow

Online adaptive workflow based on a stream-workflow and an adaptive workflow.

Parameters:
  • p_name (str) – Optional name of the workflow. Default is None.

  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_class_shared – Optional class for a shared object (class OAShared or a child class of OAShared)

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'OA-Workflow'
add_task(p_task: StreamTask, p_pred_tasks: list = None)

Adds a task to the workflow.

Parameters:
  • p_task (Task) – Task object to be added.

  • p_pred_tasks (list) – Optional list of predecessor task objects

class mlpro.oa.streams.basics.OAScenario(p_mode=0, p_ada: bool = True, p_cycle_limit=0, p_visualize: bool = False, p_logging=True)

Bases: StreamScenario

Template class for stream based scenarios with online adaptive workflows.

Parameters:
  • p_mode – Operation mode. See bf.ops.Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_cycle_limit (int) – Maximum number of cycles (0=no limit, -1=get from env). Default = 0.

  • p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.

  • p_logging – Log level (see constants of class mlpro.bf.various.Log). Default = Log.C_LOG_WE.

C_TYPE = 'OA-Scenario'
setup()

Specialized method to set up an oa stream scenario. It is automatically called by the constructor and calls in turn the custom method _setup().

_setup(p_mode, p_ada: bool, p_visualize: bool, p_logging)

Custom method to set up a stream scenario consisting of a stream and a processing stream workflow.

Parameters:
  • p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_visualize (bool) – Boolean switch for visualisation.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.

Returns:

  • stream (Stream) – A stream object.

  • workflow (OAWorkflow) – An online adaptive stream workflow object.

class mlpro.oa.streams.basics.OATrainingResults(p_scenario: Scenario, p_run, p_cycle_id, p_logging='W')

Bases: TrainingResults

class mlpro.oa.streams.basics.OATraining(**p_kwargs)

Bases: Training

C_NAME = 'OA'