Parallel Python

Part 2: Asynchronous Functions and Futures

The ProcessPoolExecutor.map function allows you to map a single function across an entire list of data. But what if you want to apply lots of different functions? The solution is to tell individual workers to run different functions, by submitting functions to workers.

The ProcessPoolExecutor class comes with the function submit. This is used to tell one process in the worker pool to run a specified function. For example, create a new script called pool_submit.py and type into it:

pool_submit.py
import os
import time
from concurrent.futures import ProcessPoolExecutor

def slow_function(nsecs):
    """
    Function that sleeps for 'nsecs' seconds, returning
    the number of seconds that it slept
    """

    print(f"Process {os.getpid()} going to sleep for {nsecs} second(s)")

    # use the time.sleep function to sleep for nsecs seconds
    time.sleep(nsecs)

    print(f"Process {os.getpid()} waking up")

    return nsecs

if __name__ == "__main__":
    print(f"Master process is PID {os.getpid()}")

    with ProcessPoolExecutor() as pool:
        r = pool.submit(slow_function, 5)

    print(f"Result is {r.result()}")

Run this script:

$
python pool_submit.py
Master process is PID 28612
Process 28613 going to sleep for 5 second(s)
Process 28613 waking up
Result is 5

You should see the something like this printed to the screen (with a delay of five seconds when the worker process sleeps).

The key line in this script is:

r = pool.submit(slow_function, 5)

The pool.submit function will request that one of the workers in the pool should run the passed function (in this case slow_function), with the arguments passed to the submit. The value returned by pool.submit is a special kind of object and in order to get the return valule of the function call out of it, we have to call the result() method on it. The reasons for this will be covered shortly.

The call to submit can take multiple arguments, as long as the submitted function takes the same number of arguments. For example, edit your pool_submit.py function to read:

pool_submit.py
import os
import time
from concurrent.futures import ProcessPoolExecutor

def slow_add(nsecs, x, y):
    """
    Function that sleeps for 'nsecs' seconds, and
    then returns the sum of x and y
    """
    print(f"Process {os.getpid()} going to sleep for {nsecs} second(s)")

    time.sleep(nsecs)

    print(f"Process {os.getpid()} waking up")

    return x + y

if __name__ == "__main__":
    print(f"Master process is PID {os.getpid()}")

    with ProcessPoolExecutor() as pool:
        r = pool.submit(slow_add, 1, 6, 7)

    print(f"Result is {r.result()}")

Here we have edited slow_function to be slow_add, with this function accepting three arguments. These three arguments are passed using the arguments in pool.apply(slow_add, 1, 6, 7).

Running this script using should give output similar to:

$
python pool_submit.py
Master process is PID 28633
Process 28634 going to sleep for 1 second(s)
Process 28634 waking up
Result is 13

Asynchronous Functions

The reason for the explicit call to r.result() is that it allows us to submit multiple functions to run in parallel and to then request their results once they have finished:

applyasync.py
import os
import time
from concurrent.futures import ProcessPoolExecutor

def slow_add(nsecs, x, y):
    """
    Function that sleeps for 'nsecs' seconds, and
    then returns the sum of x and y
    """
    print(f"Process {os.getpid()} going to sleep for {nsecs} second(s)")

    time.sleep(nsecs)

    print(f"Process {os.getpid()} waking up")

    return x + y

if __name__ == "__main__":
    print(f"Master process is PID {os.getpid()}")

    with ProcessPoolExecutor() as pool:
        r1 = pool.submit(slow_add, 1, 6, 7)
        r2 = pool.submit(slow_add, 1, 2, 3)

        print(f"Result one is {r1.result()}")
        print(f"Result two is {r2.result()}")

Running this script using should give output similar to:

$
python applyasync.py
Master process is PID 28667
Process 28668 going to sleep for 1 second(s)
Process 28669 going to sleep for 1 second(s)
Process 28669 waking up
Process 28668 waking up
Result one is 13
Result two is 5

The keys lines of this script are

r1 = pool.submit(slow_add, 1, 6, 7)
r2 = pool.submit(slow_add, 1, 2, 3)

The key thing to notice here is that while the first call to submit is submitting a function which takes a second to run, Python is not waiting for that function to finish before moving on to the second submit call. They will both be submitted at almost the same time. It's not until the call to r1.result() that the program will wait for the slow_add function to finish.

Most noticeably here, even though each function call took one second to run, the whole program did not take two seconds. Due to running them in parallel, it finished the whole program in just over one second.

Futures

An issue with running a function asynchronously is that the return value of the function is not available immediately. This means that, when running an asynchronous function, you don’t get the return value directly. Instead, submit returns a placeholder for the return value. This placeholder is called a “future”, and is a variable that in the future will contain the result of the function.

Futures are a very common variable type in parallel programming across many languages. Futures provide several common functions:

  • Block (wait) until the result is available. In concurrent.futures, this is done implicitly via the .result() function, e.g. r1.result() in the above script. There is also an implict wait when then context manager (the with block) closes to make sure all running process are finished.
  • Retrieve the result when it is available (blocking until it is available). This is also done with the .result() function, e.g. r1.result().
  • Test whether or not the result is available. This is the .done() function, which returns True when the asynchronous function has finished and the result is available via .result().
  • Test whether or not the function was a success, e.g. whether or not an exception was raised when running the function. This is the .exception() function, which returns the None if the asynchronous function completed without raising an exception and return the exception object if there was an error.

In the above example, r1 and r2 were both futures for the results of the two asynchronous calls of slow_sum. The two slow_sum calls were processed by two worker processes. The master process was then blocked using r1.result() to wait for the result of the first call, and then blocked using r2.result() to wait for the result of the second call.

We can explore this more using the following example. Create a script called future.py and copy into it:

future.py
import time
from concurrent.futures import ProcessPoolExecutor

def slow_add(nsecs, x, y):
    """
    Function that sleeps for 'nsecs' seconds, and
    then returns the sum of x and y
    """
    time.sleep(nsecs)
    return x + y

def slow_diff(nsecs, x, y):
    """
    Function that sleeps for 'nsecs' seconds, and
    then retruns the difference of x and y
    """
    time.sleep(nsecs)
    return x - y

def broken_function(nsecs):
    """Function that deliberately raises an AssertationError"""
    time.sleep(nsecs)
    raise ValueError("Called broken function")

if __name__ == "__main__":
    futures = []

    with ProcessPoolExecutor() as pool:
        futures.append(pool.submit(slow_add, 3.1, 6, 7))
        futures.append(pool.submit(slow_diff, 2.1, 5, 2))
        futures.append(pool.submit(slow_add, 1.1, 8, 1))
        futures.append(pool.submit(slow_diff, 5.1, 9, 2))
        futures.append(pool.submit(broken_function, 4.1))

        while True:
            all_finished = True

            print("\nHave the workers finished?")

            for i, future in enumerate(futures):
                if future.done():
                    print(f"Task {i} has finished")
                else:
                    all_finished = False
                    print(f"Task {i} is running...")

            if all_finished:
                break

            time.sleep(1)

        print("\nHere are the results.")

        for i, future in enumerate(futures):
            if future.exception() is None:
                print(f"Task {i} was successful. Result is {future.result()}")
            else:
                print(f"Task {i} failed!")
                e = future.exception()
                print(f"    Error = {type(e)} : {e}")

Running this script using should give output similar to:

$
python future.py
Have the workers finished?
Task 0 is running...
Task 1 is running...
Task 2 is running...
Task 3 is running...
Task 4 is running...

Have the workers finished?
Task 0 is running...
Task 1 is running...
Task 2 is running...
Task 3 is running...
Task 4 is running...

Have the workers finished?
Task 0 is running...
Task 1 is running...
Task 2 has finished
Task 3 is running...
Task 4 is running...

Have the workers finished?
Task 0 is running...
Task 1 has finished
Task 2 has finished
Task 3 is running...
Task 4 is running...

Have the workers finished?
Task 0 has finished
Task 1 has finished
Task 2 has finished
Task 3 is running...
Task 4 is running...

Have the workers finished?
Task 0 has finished
Task 1 has finished
Task 2 has finished
Task 3 is running...
Task 4 is running...

Have the workers finished?
Task 0 has finished
Task 1 has finished
Task 2 has finished
Task 3 has finished
Task 4 has finished

Here are the results.
Task 0 was successful. Result is 13
Task 1 was successful. Result is 3
Task 2 was successful. Result is 9
Task 3 was successful. Result is 7
Task 4 failed!
    Error = <class 'ValueError'> : Called broken function

Is this output that you expected? Note that the exception raised by broken_function is held safely in its associated future. This is indicated by .exception() not returning None (if you .result() a future that contains an exception, then that exception is raised).

Exercise

Edit the future.py script so that you can control the number of workers in the pool using a command line argument (e.g. using ProcessPoolExecutor(max_workers=int(sys.argv[1])) rather than ProcessPoolExecutor()).

Edit the script to add calls to more asynchronous functions.

Then experiment with running the script with different numbers of processes in the pool and with different numbers of asynchronous function calls.

How are the asynchronous function calls distributed across the pool of worker processes?