Simple Pandas Examples
Using Pandas means we get access to a hugely powerful and well known library. It's simply the easiest way to transform data many complex tasks:
- time-series analysis
- complex merging
- cleaning
- pivoting / melting etc.
Here is a very trivial example to illustrate two ways you can access Pandas in a DAG. From this you can integrate existing code or quickly add Pandas into your automations.
Direct usage in DAG YAML
It's very easy to use Pandas directly in !MultiStep transformations without even creating a function.
Here we have loaded a mock data set of test scores (attached) and created a small function to load the json.
name: pandas_example
schedule_interval: rate(1 hour)
tasks:
load_json:
function: typhoon.filesystem.read_data
args:
hook: !Hook data_lake
path: /test_scores.json
pandas_transform:
input: load_json
function: typhoon.debug.echo
args:
data: !MultiStep
- !Py transformations.data.json_loads_to_dict($BATCH.data)
- !Py typhoon.data.to_dataframe($1)
- !Py $2.groupby('Country').mean()['Score']
- !Py $2.merge($3, how='inner', on='Country')
- !Py $4.rename(columns={"Score_x":"Score","Score_y":"Country_median_score"})
The simple function to load the json:
def json_loads_to_dict(data: Union[str, bytes]) -> dict:
import json
return json.loads(data)
Data set - mock test scores
curl wget https://raw.githubusercontent.com/typhoon-data-org/typhoon-orchestrator/master/docs/examples/data/test_scores.json > /tmp/data_lake/test_scores.json
Usage in functions
We can also wrap this into a function:
def df_add_country_median(data: pd.DataFrame) -> pd.DataFrame:
df = data.groupby('Country').mean()['Score']
data = data.merge(df, how = 'inner', on = 'Country')
return data.rename(columns={"Score_x": "Score", "Score_y": "Country_median_score"})
This can then simplify the DAG:
pandas_transform:
input: load_json
function: typhoon.debug.echo
args:
data: !MultiStep
- !Py transformations.data.json_loads_to_dict($BATCH.data)
- !Py typhoon.data.to_dataframe($1)
- !Py transformations.data.df_add_country_median($2)