I will be using Prefect Orion as the orchestration engine to run a set of flows on a schedule.
For example, I require a few reports to be run and delivered to my Slack channel on a daily basis at 9am.
To get started, we create a Python3 venv, activate it and install Prefect 2.01
python3 -m venv orion-venv
source orion-venv/bin/activate
pip install prefect
My folder structure is as follows:
flows/
- daily_9am.py
- daily_6pm.py
- ...
src/
scripts/
- __init__.py
- script_one.py
- script_two.py
- ...
daily_9am.py
from prefect import flow, get_run_logger
from src.scripts import script_one, script_two
# By default, Prefect uses ConcurrentTaskRunner,
# for non-blocking, concurrent execution of tasks
from prefect.task_runners import SequentialTaskRunner
# We init a script dictionary that contains the scripts
# we want to run at 9am daily
scripts_dict = {
"script_one": script_one,
"script_two": script_two
}
@flow(name="daily_9am",
description="These tasks are run at 9am daily.",
task_runner=SequentialTaskRunner())
def daily_9am(scripts: list):
logger = get_run_logger()
# Loop through the scripts
for script in scripts:
logger.info(f"[ START ] Flow name: {flow}")
# Call the .submit() method to run the task
scripts_dict[script].submit()
logger.info(f"[ END ] Flow name: {flow}")
The scripts describe the work that you want to accomplish. Example:
script_one.py
from prefect import task, get_run_logger
@task(retries=3, retry_delay_seconds=10, tags=["daily"])
def script_one():
logger = get_run_logger()
logger.info("I am script one.")
... get some work done ...
We login to Prefect Cloud (instructions):
prefect cloud login --key xxx_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
Next, we build our deployment: