Parallel Python

Part 2: Pool

One of the core concurrent.futures features is concurrent.futures.ProcessPoolExecutor. This provides a pool of workers that can be used to parallelise a map.

For example, we have been working with examples like the following which run using the serial map function:

In [1]:
def square(x):
    """Function to return the square of the argument"""
    return x * x

r = [1, 2, 3, 4, 5]
result = map(square, r)
print(list(result))
[1, 4, 9, 16, 25]

To convert this code to be able to run the function across multiple processors, we need to change it to look like:

pool.py
from concurrent.futures import ProcessPoolExecutor

def square(x):
    """Function to return the square of the argument"""
    return x * x

if __name__ == "__main__":
    r = [1, 2, 3, 4, 5]
    
    # create a pool of workers
    with ProcessPoolExecutor() as pool:
        result = pool.map(square, r)

    print(list(result))

which, when run, gives us the same result:

$
python pool.py
[1, 4, 9, 16, 25]

The core logic of the code has remained the same but we have had to make some changes in order to make it ready to support running in parallel:

  1. We moved the code that calls the map into an if __name__ == "__main__" block. This is to ensure that only the master process create the pool of worker processes.
  2. We have created a pool of workers using with ProcessPoolExecutor() as pool. Inside this block we can access the worker pool with the pool variable and once we leave the block, the workers will be automatically cleaned up.
  3. We call pool.map instead of just map to make the mapping be performed by the workers.

The parallel work is conducted on the line

result = pool.map(square, r)

This performs a map of the function square over the list of items in r. The map is divided up over all of the workers in the pool. This means that, if you have 10 workers (e.g. if you have 10 cores), then each worker will perform only one tenth of the work. If you have 2 workers, then each worker will perform only half of the work.

You can verify that the square function is divided between your workers by using an os.getpid call, which will return the process ID (PID) of the worker. We can also manually set the number of worker processes that should be created by passing max_workers= to the ProcessPoolExecutor constructor. Edit your pool.py script and set the contents equal to:

pool.py
import os
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

def square(x):
    """Function to return the square of the argument"""
    print(f"Worker {os.getpid()} calculating square of {x}")
    return x * x

if __name__ == "__main__":
    # create a pool of workers
    with ProcessPoolExecutor(max_workers=2) as pool:
        # create an array of 20 integers, from 1 to 20
        r = range(1, 21)

        result = pool.map(square, r)

    total = reduce(lambda x, y: x + y, result)

    print(f"The sum of the square of the first 20 integers is {total}")

Run this script using

$
python pool.py
Worker 7116 calculating square of 1
Worker 7117 calculating square of 2
Worker 7116 calculating square of 3
Worker 7116 calculating square of 4
Worker 7117 calculating square of 5
Worker 7117 calculating square of 6
Worker 7117 calculating square of 7
Worker 7117 calculating square of 8
Worker 7117 calculating square of 9
Worker 7117 calculating square of 10
Worker 7116 calculating square of 11
Worker 7117 calculating square of 12
Worker 7116 calculating square of 13
Worker 7117 calculating square of 14
Worker 7116 calculating square of 15
Worker 7117 calculating square of 16
Worker 7116 calculating square of 17
Worker 7116 calculating square of 18
Worker 7116 calculating square of 19
Worker 7117 calculating square of 20
The sum of the square of the first 20 integers is 2870

(the exact PIDs of the workers, and the order in which they print will be different on your machine)

You can see in the output that there are two workers, signified by the two different worker PIDs. The work has been divided evenly amongst them.

Exercise

Edit pool.py and change the value of max_workers. How is the work divided as you change the number of workers?

Using multiple pools in a single script

You can use more than one ProcessPoolExecutor in your script, but you should ensure that you use them one after another. The way ProcessPoolExecutor works is to fork your script into the team of workers when you create a ProcessPoolExecutor object. Each worker contains a complete copy of all of the functions and variables that exist at the time of the fork. This means that any changes after the fork will not be held by the other workers.

If you made a Python script called broken_pool.py with the contents:

broken_pool.py
from concurrent.futures import ProcessPoolExecutor

def square(x):
    """Return the square of the argument"""
    return x * x

if __name__ == "__main__":

    r = [1, 2, 3, 4, 5]

    with ProcessPoolExecutor() as pool:
        result = pool.map(square, r)

        print(f"Square result: {list(result)}")

        def cube(x):
            """Return the cube of the argument"""
            return x * x * x

        result = pool.map(cube, r)

        print(f"Cube result: {list(result)}")

and ran it you would see an error like:

AttributeError: Can't get attribute 'cube' on <module '__main__' from 'broken_pool.py'>

The problem is that pool was created before the cube function. The worker copies of the script were thus created before cube was defined, and so don’t contain a copy of this function. This is one of the reasons why you should always define your functions above the if __name__ == "__main__" block.

Alternatively, if you have to define the function in the __main__ block, then ensure that you create the pool after the definition. For example, one fix here is to create a second pool for the second map:

pool.py
from concurrent.futures import ProcessPoolExecutor

def square(x):
    """Return the square of the argument"""
    return x * x

if __name__ == "__main__":

    r = [1, 2, 3, 4, 5]

    with ProcessPoolExecutor() as pool:
        result = pool.map(square, r)

        print(f"Square result: {list(result)}")

    def cube(x):
        """Return the cube of the argument"""
        return x * x * x

    with ProcessPoolExecutor() as pool:
        result = pool.map(cube, r)

        print(f"Cube result: {list(result)}")

Running this should print out

$
python pool.py
Square result: [1, 4, 9, 16, 25]
Cube result: [1, 8, 27, 64, 125]