Basics
Ver. 1.4.1 (2025-06-15)
Core classes for online-adaptive data stream processing (OADSP).
Bases:
StreamSharedTemplate class for shared objects in the context of online-adaptive data stream processing.
- class mlpro.oa.streams.basics.OAStreamAdaptationType(value)
Bases:
StrEnumExtended types of adaptation, specific to online-adaptive data stream processing.
- NONE = ''
- FORWARD = 'Forward'
- EVENT = 'Event'
- REVERSE = 'Reverse'
- RENORM = 'Re-Norm'
- _generate_next_value_(start, count, last_values)
Return the lower-cased version of the member name.
- class mlpro.oa.streams.basics.OAStreamAdaptation(p_raising_object, p_subtype: OAStreamAdaptationType, p_tstamp=None, p_num_inst=1, **p_kwargs)
Bases:
AdaptationClass for extended adaptation events raised by the OAStreamTask class.
- Parameters:
p_raising_object – Raising object.
p_subtype (OAStreamAdaptationType) – Type of adaptation.
p_tstamp (TStampType = None) – Time stamp of adaptation.
**p_kwargs – Further keyword arguments to be transported by the event.
- class mlpro.oa.streams.basics.OAStreamHelper(p_visualize: bool = False)
Bases:
PlottableTemplate class for own helpers to be added to an oa stream workflow
- C_PLOT_ACTIVE: bool = False
- class mlpro.oa.streams.basics.OAStreamTask(p_name: str = None, p_range_max=1, p_ada: bool = True, p_buffer_size: int = 0, p_duplicate_data: bool = False, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
StreamTask,ModelTemplate class for online adaptive ML tasks.
- Parameters:
p_name (str) – Optional name of the task. Default is None.
p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_buffer_size (int) – Initial size of internal data buffer. Defaut = 0 (no buffering).
p_duplicate_data (bool) – If True, instances will be duplicated before processing. Default = False.
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
**p_kwargs – Further optional keyword arguments.
- C_TYPE = 'OA Stream-Task'
- C_EVENT_CLS
alias of
OAStreamAdaptation
- C_PLOT_ACTIVE: bool = True
- C_PLOT_STANDALONE: bool = True
- C_PLOT_VALID_VIEWS: list = ['2D', '3D', 'ND']
- C_PLOT_DEFAULT_VIEW: str = 'ND'
- _set_adapted(p_adapted: bool, p_subtype: OAStreamAdaptationType = OAStreamAdaptationType.NONE, p_tstamp: datetime | timedelta | float | int = None, p_num_inst=1, **p_kwargs)
- adapt(p_instances: Dict[int, Tuple[str, Instance]]) bool
Adapts the model by calling the custom method _adapt().
- Parameters:
**p_kwargs – All parameters that are needed for the adaption. Depends on the specific higher context.
- Returns:
adapted – True, if something has been adapted. False otherwise.
- Return type:
bool
- _adapt_pre() OAStreamAdaptationType
Optional custom method for preprocessing steps of adaption.
- Returns:
Type of adaptation carried out.
- Return type:
- _adapt(p_instance_new: Instance) bool
Obligatory custom method for adaptations on a new instance during regular operation.
- Parameters:
p_instance_new (Instance) – New stream instances to be processed.
- Returns:
True, if something has been adapted. False otherwise.
- Return type:
bool
- _adapt_reverse(p_instance_del: Instance) bool
Optional custom method for reverse adaptation on an obsolete instance during regular operation.
- Parameters:
p_instance_del (Instance) – Obsolete stream instances to be removed.
- Returns:
adapted – True, if something has been adapted. False otherwise.
- Return type:
bool
- _adapt_post() OAStreamAdaptationType
Optional custom method for postprocessing steps of adaption.
- Returns:
Type of adaptation carried out.
- Return type:
- _renormalize(p_normalizer: Normalizer)
Custom method to renormalize internally buffered data using the given normalizer object. This is necessary after an adaptation of a related predecessor normalizer task. See method renormalize_on_event() for further details.
- Parameters:
p_normalizer (Normalizer) – Normalizer object to be applied on task-specific
- renormalize_on_event(p_event_id: str, p_event_object: Event)
Event handler method to be registered on event Model.C_EVENT_ADAPTED of an online adaptive normalizer task. It carries out the task-specific renormalization of internally buffered data by calling the custom method _renormalize().
- Parameters:
p_event_id (str) – Unique event id
p_event_object (Event) – Event object with further context informations
- class mlpro.oa.streams.basics.OAStreamWorkflow(p_name: str = None, p_range_max=1, p_class_shared=<class 'mlpro.oa.streams.basics.OAStreamShared'>, p_ada: bool = True, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
StreamWorkflow,AWorkflowOnline adaptive workflow based on a stream-workflow and an adaptive workflow.
- Parameters:
p_name (str) – Optional name of the workflow. Default is None.
p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.
p_class_shared – Optional class for a shared object (class OAShared or a child class of OAShared)
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
p_kwargs (dict) – Further optional named parameters.
- C_TYPE = 'OA Stream-Workflow'
- add_task(p_task: StreamTask, p_pred_tasks: list = None)
Adds a task to the workflow.
- Parameters:
p_task (Task) – Task object to be added.
p_pred_tasks (list) – Optional list of predecessor task objects
- add_helper(p_helper: OAStreamHelper)
- init_plot(p_figure=None, p_plot_settings=None)
Initializes the plot of a workflow. The method creates a host figure for all tasks if no external host figure is parameterized. The sub-plots of the tasks are autmatically arranged within the host figure.
See method init_plot() of class mlpro.bf.plot.Plottable for further details.
- Parameters:
p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.
p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).
p_window_title (str) – Optional window title.
- remove_plot(p_refresh=True)
” Removes the plot and optionally refreshes the display.
- Parameters:
p_refresh (bool = True) – On True the display is refreshed after removal
- class mlpro.oa.streams.basics.OAStreamScenario(p_mode=0, p_ada: bool = True, p_cycle_limit=0, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
StreamScenarioTemplate class for stream based scenarios with online adaptive workflows.
- Parameters:
p_mode – Operation mode. See bf.ops.Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_cycle_limit (int) – Maximum number of cycles (0=no limit, -1=get from env). Default = 0.
p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.
p_logging – Log level (see constants of class mlpro.bf.various.Log). Default = Log.C_LOG_WE.
**p_kwargs – Custom keyword arguments handed over to the custom method setup().
- C_TYPE = 'OA Stream-Scenario'
- setup(**p_kwargs)
Specialized method to set up an oa stream scenario. It is automatically called by the constructor and calls in turn the custom method _setup().
- Parameters:
p_kwargs (dict) – Custom keyword parameters
- _setup(p_mode, p_ada: bool, p_visualize: bool, p_logging, **p_kwargs)
Custom method to set up a stream scenario consisting of a stream and a processing stream workflow.
- Parameters:
p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_visualize (bool) – Boolean switch for visualisation.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.
p_kwargs (dict) – Custom keyword parameters
- Returns:
stream (Stream) – A stream object.
workflow (OAWorkflow) – An online adaptive stream workflow object.