Parallel Processing

This recipe shows a few strategies to parallelize “embarrassingly parallel” spool processing workflows.

Processes and Threads

dascore.Spool.map is the easiest way to process patches in a spool in parallel. Here is an example using the Python standard library module concurrent.futures:

from concurrent.futures import ProcessPoolExecutor

import dascore as dc

def my_patch_processing_function(patch):
    """A custom function for processing patches."""
    ...

spool = dc.get_example_spool("random_das")

executor = ProcessPoolExecutor()

spool.map(my_patch_processing_function, client=executor)

The ThreadPoolExecutor from the same module will also work, but due to python’s GIL may not provide much of a speed-up.

There are two downsides to this approach. First, if the patches aren’t chunked adequately it may exhaust the available memory. Second, it will only work on a single machine. The next section presents a more scalable option.

MPI4Py

This section shows how to use the “mpi4py” library to parallelize dascore code.

Installation

First, make sure you have installed DASCore on your machine. See DASCore Installation. Secondly, you need to properly install the mpi4py library. After installing and loading the Open MPI module on your machine (e.g., on Linux: load module to/mpi/openmpi/gcc/compiler/path), install mpi4py. It might be easier to install using conda-forge as below:

conda install -c conda-forge mpi4py openmpi

Please note that this procedure is tested for Python 3.11 and Open MPI GCC 3.1.3

Parallel script

Here is an example for parallelization of Patches over processors:

mpi_spool.py
#| execute: false

import sys

import dascore as dc

from mpi4py import MPI


# Load the spool
spool = dc.get_example_spool("random_das")

# Initiate MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Check the spool on the first processor
if len(spool)<1:
    if rank==0:
        raise ValueError('No Patch of data found within the spool.')
    else:
        sys.exit(1)

for i in range(rank, len(spool), size):
    patch = spool[i]
    print(f"rank: {rank}, patch number: {i}, patch: {patch}")
    ...

comm.barrier()
sys.exit(0)

Run the script

If you like to run the mpi_spool.py script using n = 4 processors (which means each processor will run the script separately), you can use:

mpiexec -n 4 python mpi_spool.py

or

mpirun -n 4 python mpi_spool.py