Task flow control
Basic usage of making of a trivial DAG of read_data -> write_data
is expressed by naming the tasks and linking them with the input
argument.
name: write_to_csv
schedule_interval: rate(1 hours)
granularity: hour
tasks:
read_data:
function: functions.filesystem.read_data
args:
hook: !Hook files
path: /iris.data
write_data:
input: read_data
function: typhoon.filesystem.write_data
args:
hook: !Hook files
data: !Py $BATCH
path: !Py f"/out.csv"
However, we may also have more complex requirements of the logic flow.
In this case the typhoon.flow_control
functions are useful.
- branch
- to yield over each item in a sequence.
- filter
- to return only items that are True, skipping the rest
Branching
In some cases you need to make more advanced logic that iterates over many items (e.g. multiple tables in a DB to land in a DWH).
Using the function typhoon.flow_control.branch
we can yield each table from VARIABLES and run write_date
(and all tasks downstream).
name: read_many_tables
schedule_interval: rate(1 hours)
granularity: hour
list_tables:
function: typhoon.flow_control.branch
args:
branches: !Var sql_tables_list_1
write_data:
input: list_tables
function: typhoon.relational.execute_query
args:
hook: !Hook transaction_db
batch_size: 10
metadata:
table_name: !Py $BATCH
query: !Py $VARIABLE.mysql_read_3.format(table_name=$BATCH)
Filter
We can easily apply a simple condition using typhoon.flow_control.filter
. Items matching the condition will be passed to the next task while the rest skipped.
This example shows how it can be used to filter those ending with .csv
which are then passed to read them using Pandas and finally written to sql. You could easily make this into a Component which can use the UI.
name: pickup_data_to_sql
schedule_interval: rate(10 minutes)
tasks:
list_files:
function: typhoon.filesystem.list_directory
args:
hook: !Hook ftp
path: '/'
filter_csv_files:
input: list_files
function: typhoon.flow_control.filter
args:
filter_func: !Py "lambda x: x.endswith('.csv')"
data: !Py $BATCH
read_csv_to_df:
input: filter_csv_files
function: functions.pandas_data_helpers.csv_to_df
args:
hook: !Py $HOOK.ftp
path: !Py $BATCH
write_df_to_sql:
input: read_csv_to_df
function: functions.pandas_data_helpers.df_write
args:
hook: !Py $HOOK.transaction_db
df: !Py $BATCH[0]
table_name: !Py transformations.path_mapping.to_regex_name($BATCH[1])
tests:
read_csv_to_df:
batch: '/file.csv'
expected:
path: '/file.csv'
write_df_to_sql:
batch:
- !Py "pd.DataFrame({'a': [1,2,3,4], 'b': [6,7,8,9]})"
- "data_clients_2021-02-18T03_00_00.csv"
expected:
df: !Py "pd.DataFrame({'a': [1,2,3,4], 'b': [6,7,8,9]})"
table_name: "clients"
If statements
You can use the if component to do if-else logic:
name: test_if
schedule_interval: rate(1 hour)
tasks:
names:
function: typhoon.flow_control.branch
args:
branches:
- John
- Martha
- Mathew
- Karla
is_girl:
input: names
component: typhoon.ifthen
args:
condition: !Py "lambda name: name.endswith('a')"
data: !Py $BATCH
print_girl:
input: is_girl.then
function: functions.debug.echo
args:
data: !Py f'{$BATCH} is a girl'
print_boy:
input: is_girl.otherwise
function: functions.debug.echo
args:
data: !Py f'{$BATCH} is a boy'
The key parts above are the use of the typhoon.ifthen
component and that the tasks are linked using the condition.then
and condition.otherwise
.