Howto BF-MT-001: Multitasking - Parallel Algorithms

Prerequisites

Please install following packages to run this howto

Executable code

## -------------------------------------------------------------------------------------------------
## -- Project : MLPro - A Synoptic Framework for Standardized Machine Learning Tasks
## -- Package : mlpro.bf.examples
## -- Module  : howto_bf_mt_001_parallel_algorithms.py
## -------------------------------------------------------------------------------------------------
## -- History :
## -- yyyy-mm-dd  Ver.      Auth.    Description
## -- 2022-10-03  1.0.0     DA       Creation/release
## -- 2022-10-09  1.1.0     DA       Simplification
## -- 2022-10-12  1.2.0     DA       Restructuring of demo steps
## -- 2022-10-13  1.3.0     DA       Restructuring of demo steps
## -------------------------------------------------------------------------------------------------

"""
Ver. 1.3.0 (2022-10-13)

This module demonstrates the use of classes ASync and Shared as part of MLPro's multitasking concept.
Both classes are used to implement a simple parallel algorithm class MyParallelAlgorithm with a
method _async_subtask() for asynchronous execution collecting results in a shared object.

In three runs method _async_subtask() is executed several times a) synchronously, b) as threads and 
c) as processes. Depending on the number of cores per cpu, the operating system, and further factors 
multiprocessing outperforms multithreading more ore less drastically. Method MyParallelAlgoritm.execute() 
determines and logs the speed factor of multithreading and multiprocessing in comparison to serial/synchronous 
computation. Open the perfmeter of your system and play with number of tasks and their duration to observe 
the behavior.

All sub-tasks store dummy results in a shared object. It is no surprise that the order of result entries
in multithreading and multiprocessing mode does not 100% match the order of sub-task starts.

You will learn:

1) The meaning and basic properties of the classes Async and Shared.

2) How to set up an own class with parallel running sub-tasks inside.

3) How to collect results of the parallel sub-functions in a shared object.

"""


from time import sleep
from mlpro.bf.various import Log
import multiprocess as mp
import mlpro.bf.mt as mt
from datetime import datetime, timedelta
from cmath import pi, sin, cos, tan
import random



## -------------------------------------------------------------------------------------------------
## -------------------------------------------------------------------------------------------------
class MyParallelAlgorithm (mt.Async):
    """
    This class demonstrates how to run methods asynchronously and to collect results in a shared
    object.
    """

    # needed for proper logging (see class mlpro.bf.various.Log)
    C_TYPE      = 'Demo'
    C_NAME      = 'Parallel Algorithm'

## -------------------------------------------------------------------------------------------------
    def __init__( self, 
                  p_num_tasks:int,
                  p_duration:timedelta,
                  p_range_max=mt.Async.C_RANGE_PROCESS, 
                  p_class_shared=mt.Shared,
                  p_logging=Log.C_LOG_ALL ):

        super().__init__( p_range_max=p_range_max, 
                          p_class_shared=p_class_shared, 
                          p_logging=p_logging )

        self._num_tasks    = p_num_tasks
        self._duration     = p_duration
        self._duration_sec = self._duration.seconds + self._duration.microseconds / 1000000


## -------------------------------------------------------------------------------------------------
    def execute(self):
        # Log at the beginning of a run
        if self._range == self.C_RANGE_NONE:
            self.log(Log.C_LOG_TYPE_S, 'Execution of', self._num_tasks, 'synchronous tasks started')
        elif self._range == self.C_RANGE_THREAD:
            self.log(Log.C_LOG_TYPE_S, 'Execution of', self._num_tasks, 'asynchronous tasks as threads started')
        else:
            self.log(Log.C_LOG_TYPE_S, 'Execution of', self._num_tasks, 'asynchronous tasks as processes started')

        # Start number of tasks (a)synchronously
        time_start = datetime.now()
        for t in range(self._num_tasks):
            self._start_async( p_target=self._async_subtask, p_tid=t)

        self.wait_async_tasks()
        time_end   = datetime.now()

        # Determination of speed factor (no parallelism = 1)
        duration_real       = time_end - time_start 
        duration_real_sec   = duration_real.seconds + duration_real.microseconds / 1000000
        speed_factor        = round( self._num_tasks * self._duration_sec / duration_real_sec, 2)

        # Log of speed factor 
        if self._range == self.C_RANGE_NONE:
            self.log(Log.C_LOG_TYPE_S, 'Execution of', self._num_tasks, 'synchronous tasks ended (speed factor =', speed_factor, ')')
        elif self._range == self.C_RANGE_THREAD:
            self.log(Log.C_LOG_TYPE_S, 'Execution of', self._num_tasks, 'asynchronous tasks as threads ended (speed factor =', speed_factor, ')')
        else:
            self.log(Log.C_LOG_TYPE_S, 'Execution of', self._num_tasks, 'asynchronous tasks as processes ended (speed factor =', speed_factor, ')')

        # Log of results collected in the shared object
        self.log(Log.C_LOG_TYPE_I, 'Results in shared object are:\n',self._so.get_results())


## -------------------------------------------------------------------------------------------------
    def _async_subtask(self, p_tid):

        self.log(Log.C_LOG_TYPE_I, 'Task', p_tid, 'started')

        # 1 Sub-task needs to check in on shared object        
        self._so.checkin( p_tid=p_tid )

        # 2 Dummy implementation to simulate a busy sub-task
        time_start = datetime.now()
        result     = 0

        while True:
            # do something meaningful
            for i in range(300): 
                result += sin(random.random()*pi) * cos(random.random()*pi) * tan(random.random()*pi)

            time_current = datetime.now()
            time_diff    = time_current - time_start
            if time_diff >= self._duration: break

        # 3 Sub-task can optionally store resuls in the shared object.
        self._so.add_result(p_tid=p_tid, p_result=result)

        # 4 Sub-task needs to check out from shared object
        self._so.checkout( p_tid=p_tid )

        self.log(Log.C_LOG_TYPE_I, 'Task', p_tid, 'stopped')





if __name__ == "__main__":

    # 1 Preparation of execution

    # https://docs.python.org/3/library/multiprocessing.html?highlight=freeze_support#multiprocessing.freeze_support
    mp.freeze_support()

    num_tasks   = 20
    duration    = timedelta(0,1,0)
    pause_sec   = 5
    logging     = Log.C_LOG_ALL



    # 2 Execution of demo class (synchronously)
    a = MyParallelAlgorithm( p_num_tasks = num_tasks, 
                             p_duration = duration, 
                             p_range_max = mt.Async.C_RANGE_NONE, 
                             p_logging = logging )
                                
    a.execute()                     



    # 3 Execution of demo class (asynchonously, multithreading)
    a.log(Log.C_LOG_TYPE_W, 'Short break for better observation of CPU load in perfmeter')
    sleep(pause_sec)

    a = MyParallelAlgorithm( p_num_tasks = num_tasks, 
                             p_duration = duration, 
                             p_range_max = mt.Async.C_RANGE_THREAD, 
                             p_logging = logging )

    a.execute()



    # 4 Execution of demo class (asynchronously, multiprocessing)
    a.log(Log.C_LOG_TYPE_W, 'Short break for better observation of CPU load in perfmeter')
    sleep(pause_sec)

    a = MyParallelAlgorithm( p_num_tasks = num_tasks, 
                             p_duration = duration, 
                             p_range_max = mt.Async.C_RANGE_PROCESS, 
                             p_logging = logging )
                            
    a.execute()                     

Results

The howto example logs details of the three runs and in particular the speed factors of multithreading and multiprocessing in comparison to the serial/synchronous execution. On a PC with an AMD Ryzen 7 CPU (8/16 cores) running Linux, the system monitor shows an approx. 5x speedup with multithreading and an approx. 18x speedup with multiprocessing.

../../../../../../../_images/howto_bf_mt_001_parallel_algorithms.png

Cross Reference