Parallel Prefect: A bit about Dask, real fast

George Coyne
SlateCo
Published in
2 min readJul 8, 2021

--

The problem: I heard Prefect can run Python in parallel, what is the simplest way to achieve this?

The solution: Executors

TL;DR:

Back in the early days, in 2019, Chris White wrote a great article about getting up and running with Prefect and Dask, using an external Dask cluster. In this brief post I wanted to provide a very clear instruction how to easily achieve parallelism and speed, in Prefect.

Executors specify how the tasks within a Prefect flow are to be executed. We’re talking about the LocalDaskExecutor in this article, we’ll get to the DaskExecutor later. Specifically there are three executors as of publishing:

LocalExecutor: the default, no frills executor. All tasks are executed in a single thread, parallelism is not supported.

LocalDaskExecutor: an executor that runs on dask primitives with a using either threads or processes.

DaskExecutor: the most feature-rich of the executors, this executor runs on dask.distributed and has support for distributed execution.

Under the hood prefect uses Dask primitives, which handle the appropriate controls around events, locking, coordination and waiting for task completion that is required to run parallel tasks on a single node.

The application of the executor is incredibly simple. We set the executor attribute equal to one of the three above executors, and let it rip. Below is a full example with the functional API:

A note about selecting parameters from Prefect:

Selecting a scheduler and num_workers

You should use scheduler="threads" if:

Your tasks are often IO bound (e.g. API requests, uploading/downloading data, database calls, etc…). Tasks like these can sometimes benefit from having more threads than cores, but usually not more than by a factor of 2–4 (e.g. if you have 4 cores available, set num_workers=16 at most).

Your tasks make use of separate processes (e.g. ShellTask).

Your tasks make use of libraries like numpy, pandas, or scikit-learn that release the global interpreter lock (GIL). The default value for num_workers is likely sufficient - tasks like these are CPU bound and won't benefit from multiple threads per core.

You should use scheduler="processes" in most other cases. These tasks are also usually CPU bound, so the default value of num_workers should be sufficient.

Stay tuned for the second installment, Prefect x Kubernetes x Dask

SLATE is a multi-disciplined engineering firm with a focus on modern approaches to data architecture and software development. SLATE delivers custom solutions that allow organizations to gain insights to drive effective decision making.

--

--