The intention of this post is to explain how to deploy Prefect Flow Runs as Kubernetes Jobs.
1. Introduction
You’re using Prefect for automating your workflows and would like to conduct Kubernetes Jobs as a result of flow runs? This post explains the necessary components, the prerequisites and how to run a flow as Kubernetes Job at an Azure Kubernetes Service.
1.1 What is Prefect?
We’ve to distinguish between Prefect Core and Prefect Cloud, respectively Prefect Server:
The Workflow Engine
When we talk about Prefect Core, we’re refering to the worklow engine. It allows you to define tasks, which can be written in Python. Multiple tasks can be encapsulated, which results in a flow - so flows can contain one or multiple tasks. Running a flow therefore runs all it’s containing tasks - this maps the whole workflow. Furthermore, the execution of the tasks can be speeded up - by using Dask.
The Infrastructure
Alright, so it’s about executing multiple tasks, which could also have a mutual dependency. Imagine you are processing a huge amount of data - which implies to run multiple flows. You’d like to monitor the flows, verifying their state, etc. - therfore to possess a tool acting as Orchestrator would be nice.
For that case, Prefect Cloud, respectively Prefect Server would fit. By deploying e.g.: Prefect Server, you’ll get a Prefect Agent, a GraphQL instance, a Prefect UI and further components. This allows you to manage the generated flows. If you tend to a managed solution, than Prefect Cloud would be the proper choice.
References
docs.prefect.io - Core Workflow Engine
docs.prefect.io - Orchestration & API
docs.prefect.io - Prefect Server vs. Prefect Cloud
1.2 Scope of this post
In this post, I’ll describe how a flow run can be conducted as a Kubernetes Job at an Azure Kubernetes Service by using Prefect Server. So, as a prerequisite - a Kubernetes Service (as the AKS) would be mandatory and in addition the provisioning of the Prefect Server of course. A connection to the Prefect Agent will be established, the creation of a dedicated Prefect project and the registration. Finally, the flow run will be executed.
So, this post does not focus on Prefect Core - the worklflow engine - it explains how Prefect Server and Azure Kubernetes Service work together.
For a detailed manual how to deploy Prefect Server using a Helm chart - see:
2. Description of the Use Case and the Components
2.2 The Use Case
The architecture of the “Azure-Prefect” microservice can be seen in the picture below: a Python script, including proper statements for running flows is available outside of the Azure network - in my case it is available at a dedicated Linux based Dev-Container, which I run on my localhost. After triggering the flow, the corresponding Kubernetes Job will be conducted at the Azure Kubernetes Service. The Prefect Server was deployed before using a helm chart at the Azure Kubernetes Service. In my example, a Workload from type Job is conducted, which encapsulates the Kubernetes Pod, which therefore encapsulates a Linux Container. The necessary Container Image was provided before at an Azure Container Registry.
2.2 The Components
- The Private Network: I’m going to run a Dev-Container outside of my Azure subscription, which is Linux based - Python and Prefect are installed at the Dev-Container
- Azure Network: The cloud network of my Azure subscription. Already provisioned: Azure Kubernetes Service, Prefect Server and the Azure Container Registry, which stores the Linux Container Image
- Python script: Contains Prefect statements, defining a task and a flow. This Python script is located at the mentioned Dev-Container.
- Azure Kubernetes Service and Prefect Server: In my Azure subscription, I’ve deployed an Azure Kubernetes Service. Using a dedicated helm chart, the mandatory Prefect workloads are provisioned at the AKS and the Prefect Server is ready to use.
- Azure Container Registry: This Container Registry manages a simple example Linux Container Image, which will be used for conducting a Prefect flow run as Kubernetes Job
- Workload/Job/Container: Finally a Workload will be created, which completes after a few minutes. This runs the example Linux Container
3. Prerequisites
3.1 Azure Kubernetes Service and Prefect Server
My Azure Kubernetes Service is named “tight-shepherd-aks”, the picture below shows the Deployments - it’s about a running Prefect Agent, the server-apollo, server-graphql, server-hasura, server-towel and the server-ui. In addition, ensure to set the proper ingress rules, to reach the revealed url for accessing the Prefect UI.
The Prefect Agent contains (among others) three important variables, which need to be set on your host - at which you’d like to run the Prefect code, which conducts the Kubernetes flow. The mentioned variables are:
- PREFECT__CLOUD__API
- PREFECT__CLOUD__AGENT__LABELS
- PREFECT__BACKEND
For my deployment, the related values can be seen in the picture below:
3.2 Preparing the Dev-Container and running a simple Prefect Example
3.2.1 Attach to Container
Ensure that you can run Prefect code: in my case, I’m using a self written Dev-Container, which is Linx based - Prefect and Python already installed. So, I’m running my Dev-Container and I start VS Code. Now click at “Remote Explorer”…
…and attach to the running Dev-Container.
Now you can create a new Terminal for conducting the commands.
3.2.2 Create Python scripts and the YAML file
Now, I’m going to create three files in VS Code and save them at the file system of my Dev-Container:
- prefect-example.py
- prefect-k8s-run-example.py
- deployLinux.yml
It’s about two Python scripts, including Prefect statements and a YAML file, which defines the Workload for the Linux Container, which will finally run at the Azure Kubernetes Servce as result of a Prefect flow run.
Therfore, I click at “New File”…
…and choose “Text File”.
Repeat that, so that all three files are stored at the “/home/” directory at the file system of my Dev-Container.
The source of the files can be found below:
prefect-example.py - an example for demonstrating Prefect workflows
from prefect import task
from prefect import Flow
@task
def extract():
"""Get a list of data"""
return [1, 2, 3]
@task
def transform(data):
"""Multiply the input by 10"""
return [i * 10 for i in data]
@task
def load(data):
"""Print the data to indicate it was received"""
print("Here's your data: {}".format(data))
with Flow('ETL') as flow:
e = extract()
t = transform(e)
l = load(t)
flow.run() # prints "Here's your data: [10, 20, 30]"
The sources of this example script can be found at:
docs.prefect.io - Advanced Tutorials
prefect-k8s-run-example.py - the Prefect example, which runs a flow as Kubernetes Job
import prefect
from prefect import task, Flow
from prefect.run_configs import KubernetesRun
@task
def example_task():
logger = prefect.context.get("logger")
logger.info("Hello guests of my post!")
with Flow("my-example-flow", run_config = KubernetesRun(job_template_path="deployLinux.yaml", labels=["9bad561ac917"])) as flow:
example_task()
deployLinux.yml - a YAML file, which defines the Workload from type Job (this Workload can be applied by using kubectl too of course)
apiVersion: batch/v1
kind: Job
metadata:
name: mylinuxcontainer
spec:
template:
spec:
containers:
- name: mylinuxcontainer
image: myregistry.azurecr.io/concerto:patrickstestcontainer.v.1
nodeSelector:
kubernetes.io/os: linux
restartPolicy: Never
backoffLimit: 4
Finally, those three files should be available at the Dev-Container:
3.2.3 Running prefect-example.py
I’m going to verify, that Prefect code can be executed on my Dev-Container, therefore I’m running “prefect-example.py” by entering following command:
prefect run -p prefect-example.py
As a result, the logs reveal that the different tasks are executed sequentially - in the same order as defined in the flow “ETL”:
4. Deploying the Prefect Flow Run as Kubernetes Job
4.1 Establish the connection to the Prefect-Agent
Now I’ve again to refer to the variables of the Prefect Agent:
- PREFECT__CLOUD__API
- PREFECT__CLOUD__AGENT__LABELS
- PREFECT__BACKEND
I need to set this variables at my Dev-Container, as a prerequisite of the deployment of the Prefect flow runs.
e.g.:
export PREFECT__BACKEND="server"
4.2 Running the Flow
Now let’s get to the most interesting part - finally a Prefect flow is deployed as Kubernetes Job.
4.2.1 Create a new project and do the registration
At first, I need to create a new project - I’ll name it “exampleProject” - using following command:
prefect create project exampleProject
After creating the project, the project including the Python script as argument will be registered:
prefect register --project exampleProject -p prefect-k8s-run-example.py
The logs should be similar as shown in the picture below. The logs reveal that “prefect-k8s-run-example.py” is processed and that the flow named “my-example-flow” is processed.
4.2.2 Run the Flow
Now let’s come to the final - the run of the flow. Therfore I’ll conduct following command:
prefect run --name my-example-flow --project exampleProject --watch --label "9bad561ac917"
Use the “–label” parameter for ensuring that your Prefect Agent is chosen for running the flow.
This results to the creation of the run for the flow “my-example-flow”:
In the last line, it can be seen that the Job is named “prefect-job-594ef554” - let’s remember it and prove at the Azure Kubernetes Service that this job is running.
4.2.2 Verification at the Azure Kubernetes Service
Switch to the Azure Kubernetes Service and prove that the Job “prefect-job-594ef554” is running - this can be seen at the “Workloads” section. The workload’s type is “Job”, so switch to the “Jobs” section:
A click at the workload links to the Kubernetes Pod, a click at the Pod links to the running Container - named “mylinuxcontaier” (see picture below) - as determined in the YAML file deployLinux.yml. This workload will be completed after a few minutes.
So, I’m using a dedicated YAML file, which serves as argument for the Prefect flow command - of course there exists further options. For that, please see the documentation about “KubernetesRun”:
docs.prefect.io - API - kubernetesrun
5. Conclusion
Writing and running Prefect code is a great experience - I like this approach of worklow generation. To be honest, my start with Prefect Server was tricky, as I was focused on running the flows as Kubernetes Jobs. I mixed up the different approaches of Prefect Core with regard to Prefect Server.
So, using Prefect Core refers to using the worklow engine approach, by defining and running tasks (written in Python) as flows. In contrast to that, we refer to the infrastructure if we talk about Prefect Server or Prefect Cloud - both allow us to orchestrate the flows by extending Prefect Core by the UI, the GraphQL, and the additional components.
As a requirement for that, you need to deploy the Prefect “infrastructure” on your Kubernetes Service. In this post, I’ve used an Azure Kubernetes Service, but of course you can use alternative Kubernetes Services too.
So if you need to enable Workflow Orchestration and if you’d like to combine that with Kubernetes - Prefect would be a good choice.
References
docs.prefect.io - Core Workflow Engine
docs.prefect.io - Orchestration & API
docs.prefect.io - Prefect Server vs. Prefect Cloud