Basics

../../../../../../_images/MLPro-OA-Stream-Processing_class_diagram.drawio.png

Ver. 1.4.3 (2025-07-16)

Core classes for online-adaptive data stream processing (OADSP).

class mlpro.oa.streams.basics.OAStreamShared(p_range: int = 2)

Bases: StreamShared

Template class for shared objects in the context of online-adaptive data stream processing.

class mlpro.oa.streams.basics.OAStreamAdaptationType

Bases: AdaptationType

Extended types of adaptation, specific to online-adaptive data stream processing.

REVERSE = 'Reverse'
RENORM = 'Renorm'
class mlpro.oa.streams.basics.OAStreamAdaptation(p_raising_object, p_subtype: OAStreamAdaptationType, p_tstamp=None, p_num_inst=1, **p_kwargs)

Bases: Adaptation

Class for extended adaptation events raised by the OAStreamTask class.

Parameters:
  • p_raising_object – Raising object.

  • p_subtype (OAStreamAdaptationType) – Type of adaptation.

  • p_tstamp (TStampType = None) – Time stamp of adaptation.

  • **p_kwargs – Further keyword arguments to be transported by the event.

class mlpro.oa.streams.basics.OAStreamTask(p_name: str = None, p_range_max=1, p_ada: bool = True, p_buffer_size: int = 0, p_duplicate_data: bool = False, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: StreamTask, Model

Template class for online adaptive ML tasks.

Parameters:
  • p_name (str) – Optional name of the task. Default is None.

  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_buffer_size (int) – Initial size of internal data buffer. Defaut = 0 (no buffering).

  • p_duplicate_data (bool) – If True, instances will be duplicated before processing. Default = False.

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • **p_kwargs – Further optional keyword arguments.

C_TYPE = 'OA Stream-Task'
C_EVENT_CLS

alias of OAStreamAdaptation

C_PLOT_ACTIVE: bool = True
C_PLOT_STANDALONE: bool = True
C_PLOT_VALID_VIEWS: list = ['2D', '3D', 'ND']
C_PLOT_DEFAULT_VIEW: str = 'ND'
_set_adapted(p_adapted: bool, p_subtype: OAStreamAdaptationType = '', p_tstamp: datetime | timedelta | float | int = None, p_num_inst=1, **p_kwargs)
adapt(p_instances: Dict[int, Tuple[str, Instance]]) bool

Adapts the model by calling the custom method _adapt().

Parameters:

**p_kwargs – All parameters that are needed for the adaption. Depends on the specific higher context.

Returns:

adapted – True, if something has been adapted. False otherwise.

Return type:

bool

_adapt_pre() OAStreamAdaptationType

Optional custom method for preprocessing steps of adaption.

Returns:

Type of adaptation carried out.

Return type:

OAStreamAdaptationType

_adapt(p_instance_new: Instance) bool

Obligatory custom method for adaptations on a new instance during regular operation.

Parameters:

p_instance_new (Instance) – New stream instances to be processed.

Returns:

True, if something has been adapted. False otherwise.

Return type:

bool

_adapt_reverse(p_instance_del: Instance) bool

Optional custom method for reverse adaptation on an obsolete instance during regular operation.

Parameters:

p_instance_del (Instance) – Obsolete stream instances to be removed.

Returns:

adapted – True, if something has been adapted. False otherwise.

Return type:

bool

_adapt_post() OAStreamAdaptationType

Optional custom method for postprocessing steps of adaption.

Returns:

Type of adaptation carried out.

Return type:

OAStreamAdaptationType

_renormalize(p_normalizer: Normalizer)

Custom method to renormalize internally buffered data using the given normalizer object. This is necessary after an adaptation of a related predecessor normalizer task. See method renormalize_on_event() for further details.

Parameters:

p_normalizer (Normalizer) – Normalizer object to be applied on task-specific

renormalize_on_event(p_event_id: str, p_event_object: Event)

Event handler method to be registered on event Model.C_EVENT_ADAPTED of an online adaptive normalizer task. It carries out the task-specific renormalization of internally buffered data by calling the custom method _renormalize().

Parameters:
  • p_event_id (str) – Unique event id

  • p_event_object (Event) – Event object with further context informations

class mlpro.oa.streams.basics.OAStreamWorkflow(p_name: str = None, p_range_max=1, p_class_shared=<class 'mlpro.oa.streams.basics.OAStreamShared'>, p_ada: bool = True, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: StreamWorkflow, AWorkflow

Online adaptive workflow based on a stream-workflow and an adaptive workflow.

Parameters:
  • p_name (str) – Optional name of the workflow. Default is None.

  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_class_shared – Optional class for a shared object (class OAShared or a child class of OAShared)

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'OA Stream-Workflow'
add_task(p_task: StreamTask, p_pred_tasks: list = None)

Adds a task to the workflow.

Parameters:
  • p_task (Task) – Task object to be added.

  • p_pred_tasks (list) – Optional list of predecessor task objects

init_plot(p_figure=None, p_plot_settings=None)

Initializes the plot of a workflow. The method creates a host figure for all tasks if no external host figure is parameterized. The sub-plots of the tasks are autmatically arranged within the host figure.

See method init_plot() of class mlpro.bf.plot.Plottable for further details.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

  • p_window_title (str) – Optional window title.

remove_plot(p_refresh=True)

” Removes the plot and optionally refreshes the display.

Parameters:

p_refresh (bool = True) – On True the display is refreshed after removal

class mlpro.oa.streams.basics.OAStreamScenario(p_mode=0, p_ada: bool = True, p_cycle_limit=0, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: StreamScenario

Template class for stream based scenarios with online adaptive workflows.

Parameters:
  • p_mode – Operation mode. See bf.ops.Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_cycle_limit (int) – Maximum number of cycles (0=no limit, -1=get from env). Default = 0.

  • p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.

  • p_logging – Log level (see constants of class mlpro.bf.various.Log). Default = Log.C_LOG_WE.

  • **p_kwargs – Custom keyword arguments handed over to the custom method setup().

C_TYPE = 'OA Stream-Scenario'
setup(**p_kwargs)

Specialized method to set up an oa stream scenario. It is automatically called by the constructor and calls in turn the custom method _setup().

Parameters:

p_kwargs (dict) – Custom keyword parameters

_setup(p_mode, p_ada: bool, p_visualize: bool, p_logging, **p_kwargs)

Custom method to set up a stream scenario consisting of a stream and a processing stream workflow.

Parameters:
  • p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_ada (bool) – Boolean switch for adaptivitiy. Default = True.

  • p_visualize (bool) – Boolean switch for visualisation.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Custom keyword parameters

Returns:

  • stream (Stream) – A stream object.

  • workflow (OAWorkflow) – An online adaptive stream workflow object.