Interface and implementations of various task runners.
Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow.
Example
>>> from prefect import flow, task
>>> from prefect.task_runners import SequentialTaskRunner
>>> from typing import List
>>>
>>> @task
>>> def say_hello(name):
... print(f"hello {name}")
>>>
>>> @task
>>> def say_goodbye(name):
... print(f"goodbye {name}")
>>>
>>> @flow(task_runner=SequentialTaskRunner())
>>> def greetings(names: List[str]):
... for name in names:
... say_hello(name)
... say_goodbye(name)
>>>
>>> greetings(["arthur", "trillian", "ford", "marvin"])
hello arthur
goodbye arthur
hello trillian
goodbye trillian
hello ford
goodbye ford
hello marvin
goodbye marvin
Switching to a DaskTaskRunner:
>>> from prefect_dask.task_runners import DaskTaskRunner
>>> flow.task_runner = DaskTaskRunner()
>>> greetings(["arthur", "trillian", "ford", "marvin"])
hello arthur
goodbye arthur
hello trillian
hello ford
goodbye marvin
hello marvin
goodbye ford
goodbye trillian
For usage details, see the Task Runners documentation.
classBaseTaskRunner(metaclass=abc.ABCMeta):def__init__(self)->None:self.logger=get_logger(f"task_runner.{self.name}")self._started:bool=False@property@abc.abstractmethoddefconcurrency_type(self)->TaskConcurrencyType:pass# noqa@propertydefname(self):returntype(self).__name__.lower().replace("taskrunner","")defduplicate(self):""" Return a new task runner instance with the same options. """# The base class returns `NotImplemented` to indicate that this is not yet# implemented by a given task runner.returnNotImplementeddef__eq__(self,other:object)->bool:""" Returns true if the task runners use the same options. """iftype(other)==type(self)and(# Compare public attributes for naive equality check# Subclasses should implement this method with a check init option equality{k:vfork,vinself.__dict__.items()ifnotk.startswith("_")}=={k:vfork,vinother.__dict__.items()ifnotk.startswith("_")}):returnTrueelse:returnNotImplemented@abc.abstractmethodasyncdefsubmit(self,key:UUID,call:Callable[...,Awaitable[State[R]]],)->None:""" Submit a call for execution and return a `PrefectFuture` that can be used to get the call result. Args: task_run: The task run being submitted. task_key: A unique key for this orchestration run of the task. Can be used for caching. call: The function to be executed run_kwargs: A dict of keyword arguments to pass to `call` Returns: A future representing the result of `call` execution """raiseNotImplementedError()@abc.abstractmethodasyncdefwait(self,key:UUID,timeout:float=None)->Optional[State]:""" Given a `PrefectFuture`, wait for its return state up to `timeout` seconds. If it is not finished after the timeout expires, `None` should be returned. Implementers should be careful to ensure that this function never returns or raises an exception. """raiseNotImplementedError()@asynccontextmanagerasyncdefstart(self:T,)->AsyncIterator[T]:""" Start the task runner, preparing any resources necessary for task submission. Children should implement `_start` to prepare and clean up resources. Yields: The prepared task runner """ifself._started:raiseRuntimeError("The task runner is already started!")asyncwithAsyncExitStack()asexit_stack:self.logger.debug("Starting task runner...")try:awaitself._start(exit_stack)self._started=Trueyieldselffinally:self.logger.debug("Shutting down task runner...")self._started=Falseasyncdef_start(self,exit_stack:AsyncExitStack)->None:""" Create any resources required for this task runner to submit work. Cleanup of resources should be submitted to the `exit_stack`. """pass# noqadef__str__(self)->str:returntype(self).__name__
Return a new task runner instance with the same options.
Source code in src/prefect/task_runners.py
112113114115116117118
defduplicate(self):""" Return a new task runner instance with the same options. """# The base class returns `NotImplemented` to indicate that this is not yet# implemented by a given task runner.returnNotImplemented
@asynccontextmanagerasyncdefstart(self:T,)->AsyncIterator[T]:""" Start the task runner, preparing any resources necessary for task submission. Children should implement `_start` to prepare and clean up resources. Yields: The prepared task runner """ifself._started:raiseRuntimeError("The task runner is already started!")asyncwithAsyncExitStack()asexit_stack:self.logger.debug("Starting task runner...")try:awaitself._start(exit_stack)self._started=Trueyieldselffinally:self.logger.debug("Shutting down task runner...")self._started=False
@abc.abstractmethodasyncdefsubmit(self,key:UUID,call:Callable[...,Awaitable[State[R]]],)->None:""" Submit a call for execution and return a `PrefectFuture` that can be used to get the call result. Args: task_run: The task run being submitted. task_key: A unique key for this orchestration run of the task. Can be used for caching. call: The function to be executed run_kwargs: A dict of keyword arguments to pass to `call` Returns: A future representing the result of `call` execution """raiseNotImplementedError()
Given a PrefectFuture, wait for its return state up to timeout seconds.
If it is not finished after the timeout expires, None should be returned.
Implementers should be careful to ensure that this function never returns or
raises an exception.
Source code in src/prefect/task_runners.py
156157158159160161162163164165
@abc.abstractmethodasyncdefwait(self,key:UUID,timeout:float=None)->Optional[State]:""" Given a `PrefectFuture`, wait for its return state up to `timeout` seconds. If it is not finished after the timeout expires, `None` should be returned. Implementers should be careful to ensure that this function never returns or raises an exception. """raiseNotImplementedError()
A concurrent task runner that allows tasks to switch when blocking on IO.
Synchronous tasks will be submitted to a thread pool maintained by anyio.
Example
Using a thread for concurrency:
>>> from prefect import flow
>>> from prefect.task_runners import ConcurrentTaskRunner
>>> @flow(task_runner=ConcurrentTaskRunner)
>>> def my_flow():
>>> ...
classConcurrentTaskRunner(BaseTaskRunner):""" A concurrent task runner that allows tasks to switch when blocking on IO. Synchronous tasks will be submitted to a thread pool maintained by `anyio`. Example: ``` Using a thread for concurrency: >>> from prefect import flow >>> from prefect.task_runners import ConcurrentTaskRunner >>> @flow(task_runner=ConcurrentTaskRunner) >>> def my_flow(): >>> ... ``` """def__init__(self):# TODO: Consider adding `max_workers` support using anyio capacity limiters# Runtime attributesself._task_group:anyio.abc.TaskGroup=Noneself._result_events:Dict[UUID,Event]={}self._results:Dict[UUID,Any]={}self._keys:Set[UUID]=set()super().__init__()@propertydefconcurrency_type(self)->TaskConcurrencyType:returnTaskConcurrencyType.CONCURRENTdefduplicate(self):returntype(self)()asyncdefsubmit(self,key:UUID,call:Callable[[],Awaitable[State[R]]],)->None:ifnotself._started:raiseRuntimeError("The task runner must be started before submitting work.")ifnotself._task_group:raiseRuntimeError("The concurrent task runner cannot be used to submit work after ""serialization.")# Create an event to set on completionself._result_events[key]=Event()# Rely on the event loop for concurrencyself._task_group.start_soon(self._run_and_store_result,key,call)asyncdefwait(self,key:UUID,timeout:float=None,)->Optional[State]:ifnotself._task_group:raiseRuntimeError("The concurrent task runner cannot be used to wait for work after ""serialization.")returnawaitself._get_run_result(key,timeout)asyncdef_run_and_store_result(self,key:UUID,call:Callable[[],Awaitable[State[R]]]):""" Simple utility to store the orchestration result in memory on completion Since this run is occurring on the main thread, we capture exceptions to prevent task crashes from crashing the flow run. """try:result=awaitcall()exceptBaseExceptionasexc:result=awaitexception_to_crashed_state(exc)self._results[key]=resultself._result_events[key].set()asyncdef_get_run_result(self,key:UUID,timeout:float=None)->Optional[State]:""" Block until the run result has been populated. """result=None# retval on timeout# Note we do not use `asyncio.wrap_future` and instead use an `Event` to avoid# stdlib behavior where the wrapped future is cancelled if the parent future is# cancelled (as it would be during a timeout here)withanyio.move_on_after(timeout):awaitself._result_events[key].wait()result=self._results[key]returnresult# timeout reachedasyncdef_start(self,exit_stack:AsyncExitStack):""" Start the process pool """self._task_group=awaitexit_stack.enter_async_context(anyio.create_task_group())def__getstate__(self):""" Allow the `ConcurrentTaskRunner` to be serialized by dropping the task group. """data=self.__dict__.copy()data.update({k:Noneforkin{"_task_group"}})returndatadef__setstate__(self,data:dict):""" When deserialized, we will no longer have a reference to the task group. """self.__dict__.update(data)self._task_group=None
A simple task runner that executes calls as they are submitted.
If writing synchronous tasks, this runner will always execute tasks sequentially.
If writing async tasks, this runner will execute tasks sequentially unless grouped
using anyio.create_task_group or asyncio.gather.
classSequentialTaskRunner(BaseTaskRunner):""" A simple task runner that executes calls as they are submitted. If writing synchronous tasks, this runner will always execute tasks sequentially. If writing async tasks, this runner will execute tasks sequentially unless grouped using `anyio.create_task_group` or `asyncio.gather`. """def__init__(self)->None:super().__init__()self._results:Dict[str,State]={}@propertydefconcurrency_type(self)->TaskConcurrencyType:returnTaskConcurrencyType.SEQUENTIALdefduplicate(self):returntype(self)()asyncdefsubmit(self,key:UUID,call:Callable[...,Awaitable[State[R]]],)->None:# Run the function immediately and store the result in memorytry:result=awaitcall()exceptBaseExceptionasexc:result=awaitexception_to_crashed_state(exc)self._results[key]=resultasyncdefwait(self,key:UUID,timeout:float=None)->Optional[State]:returnself._results[key]