Creating a DAG
Simply, the DAG is the workflow task you are composing. This is a DAG structure similar to many other workflow tools (e.g. Task A → Task B → Task C).
However, the writing of a DAG in Typhoon can be much quicker and more intuitive due to the simpler YAML syntax and that it shares data.
Concepts:
- tasks: this defines each node (or step) in the DAG workflow (i.e. do something).
- functions: this is the operation to carry out (e.g. read data, write data, branches, if, etc.).
- transformations: stored functions that are re-usable and testable (e.g. gzip, dicts_to_csv).
- components: re-usable sets of multi-task DAGS (e.g. glob_compress_and_copy, if, db_to_swowflake). Can be used as standalone or in a task.
DAG code
We can check an example DAG in dags/hello_world.yml
. You can also check it with the CLI by running typhoon dag definition --dag-name hello_world
.
Tip
To get some editor completion and hints you can configure the JSON schema in dag_schema.json
as the schema for the DAG YAML file. This can be done in VS Code with the Red Hat YAML extension or in PyCharm natively.
name: hello_world
schedule_interval: rate(1 hours) # Can also be a Cron expression
granularity: hour
tasks:
send_data:
function: typhoon.flow_control.branch
args:
branches:
- filename: users.txt
contents: John, Amy, Adam, Jane
- filename: animals.txt
contents: dog, cat, mouse, elephant, giraffe
- filename: fruits.csv
contents: apple,pear,apricot
write_data:
input: send_data
function: typhoon.filesystem.write_data
args:
hook: !Hook data_lake
path: !MultiStep
- !Py $BATCH['filename']
- !Py $DAG_CONTEXT.interval_end
- !Py f'/store/{$2}/{$1}'
data: !Py $BATCH['contents']
create_intermediate_dirs: True
This DAG has three branches, each with a file name and list (YAML list of dictionaries). This will write 3 files; users.txt, animals.txt and fruits.csv.
Typically, this first step might be a database, set of CSVs or an API. This is just a trivial example including data.
Skim the code
For a step-by-step on this example - please start on our example Hello World walkthrough.
DAG basic concepts
The DAGs are composed in YAML. They are compiled to python. This means that once you have written your YAML and you build it the output is a normal, testable python file. It transpiles to Airflow compatible DAGS also.
Basic components:
- name: ...
- schedule_interval: can be a Rate or a Cron expression to express how often it should run.
- granularity: minutes, hour, hours, day, days ... or a Cron expression.
- tasks: this defines each node (or step) in the DAG workflow (i.e. do something)
- input: here we are connecting the input from the previous task (A -> B; B.input = A).
- function: this is the operation to carry out (e.g. read data, write data, branches, if, etc.). Full function list for reference.
- args: these are the specific arguments for the function (so they will differ). Common ones are below:
- hook: this is the connection to use that you have added from your
connectioons.yml
- path: this is the path, typical in filesystem hooks to read / write files
- data: this is the batch of data we are passing between tasks, referenced by
$BATCH
- hook: this is the connection to use that you have added from your
Syntax sugars:
- !Hook: e.g. !Hook data_lake
is equivilent of $HOOK.data_lake
. I.e. to get the connection.
- !MultiStep: allows you to make multi-line scripts easily (see example). Useful for chaining together a few Pandas transformations for example.
- !Py: this evaluates the line as normal python (allowing total flexibility therefore to apply your own transformations).
Running the DAG
We can run this with:
typhoon dag run --dag-name hello_world
We can check the files that have been written to /tmp/data_lake/store/
where /tmp/data_lake/
was the base directory defined for our connection (go back to connections.yml
to check) and the DAG wrote to /store/[FILE_NAME]
.