Howto BF-STREAMS-111: Rearranger (2D)
Prerequisites
Please install the following packages to run this example properly:
Executable code
## -------------------------------------------------------------------------------------------------
## -- Project : MLPro - A Synoptic Framework for Standardized Machine Learning Tasks
## -- Package : mlpro.bf.examples
## -- Module : howto_bf_streams_111_stream_task_rearranger_2d.py
## -------------------------------------------------------------------------------------------------
## -- History :
## -- yyyy-mm-dd Ver. Auth. Description
## -- 2022-10-27 0.0.0 DA Creation
## -- 2022-12-14 1.0.0 DA First implementation
## -------------------------------------------------------------------------------------------------
"""
Ver. 1.0.0 (2022-12-14)
This module demonstrates the principles of stream processing with MLPro. To this regard, a stream of
a stream provider is combined with a stream workflow to a stream scenario. The workflow consists of
a standard task Rearranger and a custom task. The stream scenario is used to process some instances.
You will learn:
1) How to implement an own custom stream task.
2) How to set up a stream workflow based on stream tasks.
3) How to set up a stream scenario based on a stream and a processing stream workflow.
4) How to run a stream scenario dark or with visualization.
"""
from mlpro.bf.streams import *
from mlpro.bf.streams.streams import *
from mlpro.bf.streams.tasks import Rearranger
## -------------------------------------------------------------------------------------------------
## -------------------------------------------------------------------------------------------------
class MyTask (StreamTask):
"""
Demo implementation of a stream task with custom method _run().
"""
# needed for proper logging (see class mlpro.bf.various.Log)
C_NAME = 'Custom'
## -------------------------------------------------------------------------------------------------
def _run(self, p_inst_new: list, p_inst_del: list):
pass
## -------------------------------------------------------------------------------------------------
## -------------------------------------------------------------------------------------------------
class MyScenario (StreamScenario):
"""
Example of a custom stream scenario including a stream and a stream workflow. See class
mlpro.bf.streams.models.StreamScenario for further details and explanations.
"""
C_NAME = 'Demo Rearranger'
## -------------------------------------------------------------------------------------------------
def _setup(self, p_mode, p_visualize: bool, p_logging):
# 1 Import a native stream from MLPro
provider_mlpro = StreamProviderMLPro(p_logging=p_logging)
stream = provider_mlpro.get_stream('Rnd10Dx1000', p_mode=p_mode, p_logging=p_logging)
# 2 Set up a stream workflow
workflow = StreamWorkflow( p_name='wf1',
p_range_max=Task.C_RANGE_NONE,
p_visualize=p_visualize,
p_logging=logging )
# 2.1 Set up and add a rearranger task to reduce the feature and label space
features = stream.get_feature_space().get_dims()
features_new = [ ( 'F', features[1:3] ) ]
task_rearranger = Rearranger( p_name='t1',
p_range_max=Task.C_RANGE_THREAD,
p_visualize=p_visualize,
p_logging=p_logging,
p_features_new=features_new )
workflow.add_task( p_task=task_rearranger )
# 2.2 Set up and add an own custom task
task_custom = MyTask( p_name='t2', p_visualize=p_visualize, p_logging=logging )
workflow.add_task( p_task=task_custom, p_pred_tasks=[task_rearranger] )
# 3 Return stream and workflow
return stream, workflow
# 1 Preparation of demo/unit test mode
if __name__ == '__main__':
# 1.1 Parameters for demo mode
cycle_limit = 200
logging = Log.C_LOG_ALL
visualize = True
else:
# 1.2 Parameters for internal unit test
cycle_limit = 2
logging = Log.C_LOG_NOTHING
visualize = False
# 2 Instantiate the stream scenario
myscenario = MyScenario( p_mode=Mode.C_MODE_SIM,
p_cycle_limit=cycle_limit,
p_visualize=visualize,
p_logging=logging )
# 3 Reset and run own stream scenario
myscenario.reset()
if __name__ == '__main__':
myscenario.init_plot( p_plot_settings=PlotSettings( p_view = PlotSettings.C_VIEW_ND,
p_step_rate = 2 ) )
input('Press ENTER to start stream processing...')
myscenario.run()
if __name__ == '__main__':
input('Press ENTER to exit...')
Results
Cross Reference