Infrastructure blocks are part of the agent based deployment model. Work Pools and Workers simplify the specification of each flow's infrastructure and runtime environment. If you have existing agents, you can upgrade from agents to workers to significantly enhance the experience of deploying flows.
Users may specify an infrastructure block when creating a deployment. This block will be used to specify infrastructure for flow runs created by the deployment at runtime.
Infrastructure can only be used with a deployment. When you run a flow directly by calling the flow yourself, you are responsible for the environment in which the flow executes.
Prefect uses infrastructure to create the environment for a user's flow to execute.
Infrastructure is attached to a deployment and is propagated to flow runs created for that deployment. Infrastructure is deserialized by the agent and it has two jobs:
Create execution environment infrastructure for the flow run.
Run a Python command to start the prefect.engine in the infrastructure, which retrieves the flow from storage and executes the flow.
The engine acquires and calls the flow. Infrastructure doesn't know anything about how the flow is stored, it's just passing a flow run ID to the engine.
Infrastructure is specific to the environments in which flows will run. Prefect currently provides the following infrastructure types:
You may create customized infrastructure blocks through the Prefect UI or Prefect Cloud Blocks page or create them in code and save them to the API using the blocks .save() method.
Once created, there are two distinct ways to use infrastructure in a deployment:
Starting with Prefect defaults — this is what happens when you pass the -i or --infra flag and provide a type when building deployment files.
Pre-configure infrastructure settings as blocks and base your deployment infrastructure on those settings — by passing -ib or --infra-block and a block slug when building deployment files.
For example, when creating your deployment files, the supported Prefect infrastrucure types are:
In this example we specify the DockerContainer infrastructure in addition to a preconfigured AWS S3 bucket storage block.
The default deployment YAML filename may be edited as needed to add an infrastructure type or infrastructure settings.
###### A complete description of a Prefect Deployment for flow 'my-flow'###name:my-flow-deploymentdescription:nullversion:e29de5d01b06d61b4e321d40f34a480c# The work queue that will handle this deployment's runswork_queue_name:defaultwork_pool_name:default-agent-pooltags:-testparameters:{}schedules:[]paused:trueinfra_overrides:env.EXTRA_PIP_PACKAGES:s3fsinfrastructure:type:docker-containerenv:{}labels:{}name:nullcommand:-python--m-prefect.engineimage:prefecthq/prefect:2-latestimage_pull_policy:nullnetworks:[]network_mode:nullauto_remove:falsevolumes:[]stream_output:truememswap_limit:nullmem_limit:nullprivileged:falseblock_type_slug:docker-container_block_type_slug:docker-container###### DO NOT EDIT BELOW THIS LINE###flow_name:my-flowmanifest_path:my_flow-manifest.jsonstorage:bucket_path:bucket-full-of-sunshineaws_access_key_id:'**********'aws_secret_access_key:'**********'_is_anonymous:true_block_document_name:anonymous-xxxxxxxx-f1ff-4265-b55c-6353a6d65333_block_document_id:xxxxxxxx-06c2-4c3c-a505-4a8db0147011block_type_slug:s3_block_type_slug:s3path:''entrypoint:my_flow.py:my-flowparameter_openapi_schema:title:Parameterstype:objectproperties:{}required:nulldefinitions:nulltimestamp:'2023-02-08T23:00:14.974642+00:00'
Editing deployment YAML
Note the big DO NOT EDIT comment in the deployment YAML: In practice, anything above this block can be freely edited before running prefect deployment apply to create the deployment on the API.
Once the deployment exists, any flow runs that this deployment starts will use DockerContainer infrastructure.
You can also create custom infrastructure blocks — either in the Prefect UI for in code via the API — and use the settings in the block to configure your infastructure. For example, here we specify settings for Kubernetes infrastructure in a block named k8sdev.
Now we can apply the infrastrucure type and settings in the block by specifying the block slug kubernetes-job/k8sdev as the infrastructure type when building a deployment:
Process infrastructure runs a command in a new process.
Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables.
Process supports the following settings:
Attributes
Description
command
A list of strings specifying the command to start the flow run. In most cases you should not override this.
env
Environment variables to set for the new process.
labels
Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself.
name
A name for the process. For display purposes only.
DockerContainer infrastructure executes flow runs in a container.
Requirements for DockerContainer:
Docker Engine must be available.
You must configure remote Storage. Local storage is not supported for Docker.
The API must be available from within the flow run container. To facilitate connections to locally hosted APIs, localhost and 127.0.0.1 will be replaced with host.docker.internal.
The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
DockerContainer supports the following settings:
Attributes
Description
auto_remove
Bool indicating whether the container will be removed on completion. If False, the container will remain after exit for inspection.
command
A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
env
Environment variables to set for the container.
image
An optional string specifying the name of a Docker image to use. Defaults to the Prefect image. If the image is stored anywhere other than a public Docker Hub registry, use a corresponding registry block, e.g. DockerRegistry or ensure otherwise that your execution layer is authenticated to pull the image from the image registry.
image_pull_policy
Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'.
image_registry
A DockerRegistry block containing credentials to use if image is stored in a private image registry.
labels
An optional dictionary of labels, mapping name to value.
name
An optional name for the container.
networks
An optional list of strings specifying Docker networks to connect the container to.
network_mode
Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set.
stream_output
Bool indicating whether to stream output from the subprocess to local standard output.
volumes
An optional list of volume mount strings in the format of "local_path:container_path".
Prefect automatically sets a Docker image matching the Python and Prefect version you're using at deployment time. You can see all available images at Docker Hub.
KubernetesJob infrastructure executes flow runs in a Kubernetes Job.
Requirements for KubernetesJob:
kubectl must be available.
You must configure remote Storage. Local storage is not supported for Kubernetes.
The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
The Prefect CLI command prefect kubernetes manifest server automatically generates a Kubernetes manifest with default settings for Prefect deployments. By default, it simply prints out the YAML configuration for a manifest. You can pipe this output to a file of your choice and edit as necessary.
KubernetesJob supports the following settings:
Attributes
Description
cluster_config
An optional Kubernetes cluster config to use for this job.
command
A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
customizations
A list of JSON 6902 patches to apply to the base Job manifest. Alternatively, a valid JSON string is allowed (handy for deployments CLI).
env
Environment variables to set for the container.
finished_job_ttl
The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.
image
String specifying the tag of a Docker image to use for the Job.
image_pull_policy
The Kubernetes image pull policy to use for job containers.
job
The base manifest for the Kubernetes Job.
job_watch_timeout_seconds
Number of seconds to watch for job creation before timing out (defaults to None).
labels
Dictionary of labels to add to the Job.
name
An optional name for the job.
namespace
String signifying the Kubernetes namespace to use.
pod_watch_timeout_seconds
Number of seconds to watch for pod creation before timing out (default 60).
service_account_name
An optional string specifying which Kubernetes service account to use.
stream_output
Bool indicating whether to stream output from the subprocess to local standard output.
When creating deployments using KubernetesJob infrastructure, the infra_overrides parameter expects a dictionary. For a KubernetesJob, the customizations parameter expects a list.
Containers expect a list of objects, even if there is only one.
For any patches applying to the container, the path value should be a list, for example:
/spec/templates/spec/containers/0/resources
A Kubernetes-Job infrastructure block defined in Python:
A Deployment with infra-overrides defined in Python:
infra_overrides={"customizations":[{"op":"add","path":"/spec/template/spec/containers/0/resources","value":{"requests":{"cpu":"2000m","memory":"4gi"},"limits":{"cpu":"4000m","memory":"8Gi","nvidia.com/gpu":"1"}},}]}# Load an already created K8s Blockk8sjob=k8s_job.load("devk8s")deployment=Deployment.build_from_flow(flow=my_flow,name="s3-example",version=2,work_queue_name="aws",infrastructure=k8sjob,storage=storage,infra_overrides=infra_overrides,)deployment.apply()
ECSTask infrastructure runs your flow in an ECS Task.
Requirements for ECSTask:
The ephemeral Prefect API won't work with ECS directly. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
The prefect-awscollection must be installed within the agent environment: pip install prefect-aws
The ECSTask and AwsCredentials blocks must be registered within the agent environment: prefect block register -m prefect_aws.ecs
You must configure remote Storage. Local storage is not supported for ECS tasks. The most commonly used type of storage with ECSTask is S3. If you leverage that type of block, make sure that s3fs is installed within your agent and flow run environment. The easiest way to satisfy all the installation-related points mentioned above is to include the following commands in your Dockerfile:
FROMprefecthq/prefect:2-python3.9# example base image RUNpipinstalls3fsprefect-aws
Make sure to allocate enough CPU and memory to your agent, and consider adding retries
When you start a Prefect agent on AWS ECS Fargate, allocate as much CPU and memory as needed for your workloads. Your agent needs enough resources to appropriately provision infrastructure for your flow runs and to monitor their execution. Otherwise, your flow runs may get stuck in a Pending state. Alternatively, set a work-queue concurrency limit to ensure that the agent will not try to process all runs at the same time.
Some API calls to provision infrastructure may fail due to unexpected issues on the client side (for example, transient errors such as ConnectionError, HTTPClientError, or RequestTimeout), or due to server-side rate limiting from the AWS service. To mitigate those issues, we recommend adding environment variables such as AWS_MAX_ATTEMPTS (can be set to an integer value such as 10) and AWS_RETRY_MODE (can be set to a string value including standard or adaptive modes). Those environment variables must be added within the agent environment, e.g. on your ECS service running the agent, rather than on the ECSTask infrastructure block.