Skip to content

Functions

Functions are the key building blocks of DAG tasks. Each task has to have a function. Usually functions act on Hooks to extract or load data.

In the example below we are using a custom function exchange_rates_api.get_history that would in the file exchange_rates_api.py in the functions directory. In this file you would put any function that your custom ExchangeRate Hook might need.

  • typhoon.function_file.function_name for built-in / contrib functions.
  • functions.my_function_file.function_name for accessing your project functions.

The two main task keys that interact with the function are the input (if required) and the args. These match the input paramater names of the function. So if you are unsure of how to make the task you can always check the function code.

For example the write_data function has the following parameters that are matched in the YAML DAG below it. Optional ones are omitted.

    def write_data(
            data: Union[str, bytes, BytesIO],
            hook: FileSystemHookInterface,
            path: Union[Path, str],
            create_intermediate_dirs: bool = False,
            metadata: Optional[dict] = None,
            return_path_format: str = 'relative',
    ) -> Iterable[str]:

Example built-in / contrib. function

  write_csv:
    input: exchange_rate
    function: typhoon.filesystem.write_data
    args:
      hook: !Hook echo
      data: !MultiStep
        - !Py transformations.xr.flatten_response($BATCH)
        - !Py typhoon.data.dicts_to_csv($1, delimiter='|')
      path: !Py f'{$DAG_CONTEXT.interval_start.isoformat()}_xr_data.csv'

Example project function

  exchange_rate:
    function: functions.exchange_rates_api.get_history
    args:
      start_at: !Py $DAG_CONTEXT.interval_start
      end_at: !Py $DAG_CONTEXT.interval_end