from hyflow.fetch.icu.icu_episode_slices import icu_episode_slices_from_emap
Under the hood
What follows is a quick tour under the hood of Hylode…
In vignette_1_training_set
, we miraculously wound up with a usable set of features on running retro_dataset
. This is of course because they had been lovingly built by Nel in advance.
This notebook is aimed as a leg-up in getting your bearings around how Hylode ingests and processes data from EMAP. I include the different pieces of code and ways of thinking that have helped me, in the hope they will help others.
An over-arching view
As good a starting point as any is the HySys architectural diagram linked to here. (You need to be logged into GitHub to view)
This picture gives an overview of how the system fits together. In terms of data ingestion, we can see HyFlow and HyGear, the two components responsible for fetching and transforming the data from EMAP (& other sources). Then sitting above them, there is HyCommand which controls different requests for the various subcomponents.
In the current version, an example of how HyCommand does its work can be found in the ICU Demand dag. This scheduled code triggers the appropriate actions from HyFlow and HyGear for initial ingestion and transformation of the data.
(The PowerPoint slide DAG.pptx
in this directory (download by opening in a new window) shows you the complete set operations the DAG triggers. Don’t be disheartened if this seems like a bit much, we will have a look at it piece-by-piece…)
An example: HyFlow fetch_episode_slices
Looking at that file for the dag, let’s start by looking at the code here:
fetch_episode_slices_task = SimpleHttpOperator(
task_id=f"fetch_episode_slices-{ward}",
http_conn_id="hyflow_api",
method="POST",
endpoint="/trigger/fetch/icu/episode_slices",
headers=default_http_headers,
data=json.dumps(fetch_task_input),
extra_options={
"check_response": False
}, # turn off the default response check
response_check=skip_on_empty_result_set, # add a custom response check
)
This makes the API call to the hyflow_api to fetch the episode slices for a given ward. This can be found beautifully documented by looking at the HyFlow API docs (here at the time of writing). Here we can see that fetch_episode_slices
is designed to: >Append Hylode episode slices to the hyflow.icu_episode_slices_log
table for episodes which were active on the ward at the horizon.
A Hylode episode is defined as a stay on a specific ward with a limited allowable break between bed location visits on that ward. An episode slice is a fraction, up to & incl 100%, of an episode.
SQL extraction code
Digging a little bit deeper, we can trace this back to the SQL code. The code corresponding to fetch_episode_slices
can be found in the function fetch_episode_slices
found in the definition of the endpoint here. Here we can see the following code slice:
episode_slices_df = icu_episode_slices_from_emap(
ward, horizon_dt, list(beds_df.hl7_location)
)
Let’s perhaps have a look at what this icu_episode_slices_from_emap
function is…
??icu_episode_slices_from_emap
Okay… so looking at this we can see that this function first call icu_location_visits_from_emap
from hyflow.fetch.icu.icu_episode_slices import _icu_location_visits_from_emap
??_icu_location_visits_from_emap
…which in turn is running an sql query from file emap__icu_location_visit_history.sql
. Looking this up in the Hylode code, we find the corresponding file here and can run the corresponding query in DBForge (being sure to substitute for the parameters prefixed by %)
Alternatively we can do that here in a notebook… (see appendix 1)
Processing and storage in hylode_db
Following through on the rest of the definition of icu_episode_slices_from_emap
, we can see this function goes onto call _coalesce_icu_location_visits_into_episode_slices
which generates our notion of ICU location visits (as described in the functions docstring - see using the ?? shortcut).
Then returning back again to the code for fetch_icu_episode_slices
we can see a call df_to_hylode_db
. This is where the dataframe extracted from EMAP and then restructured to episode slices is stored in the Hylode databases.
A very comparable process happens to bring in the observations into Hylode, so with some ferreting out (along the lines above) it should be possible to find the corresponding pieces of code. Next up is to transform the data from there…
Another example: HyGear transformers
In talking about how the Hylode ML system works, often a lot of discussions come back to the transformers. These are the pieces of code that take the data from a format not a million miles from that in EMAP into reproducible features for both retrospective model training and deployment.
As in our section above on HyFlow the HyGear transformers are called on a schedule from the ICU Demand dag.
Take for instance the code here:
generate_icu_temporal_task = SimpleHttpOperator(
task_id=f"generate_icu_temporal-{ward}",
http_conn_id="hygear_api",
method="POST",
endpoint="/trigger/generate/icu/cog1/temporal",
headers=default_http_headers,
data=json.dumps(transform_task_input),
)
This makes the API call to the hygear_api to generate the ICU patient temporal features (age, elapsed length-of-stay etc.) for a given ward. This can be found beautifully documented by looking at the HyGear API docs (here at the time of writing). Here we can see that temporal
is designed to: >Append temporal features to the hygear.icu_temporal_log
table for episode slices active on the ward at the horizon.
Transformers under the hood
Again we can go back to the definition of the endpoint found in this case here where we have the function generate_icu_temporal
. This code allows us to actually look under the hood of the transformer. What we can see happening is that this code is pulling out the icu_patients_from_hyflow
and applying a series of Transformer functions to them, namely: AdmissionAgeTransformer
and LengthOfStayTransformer
. Let’s have a look at one of these…
from hygear.transform.cog1.icu_temporal import AdmissionAgeTransformer
AdmissionAgeTransformer??
We can see that the transformer is a class. It takes a series of specified input_cols
, and then has a defined transform
method to output a specified set of output cols
. Included in Appendix 2 is some code to run this transformer across a dataframe. You can use this same structure to develop and test new transformers in a notebook.
Once this feature transformation is done and dusted, we are ready to use Nels’ HyCastle machinery to pull together our feature sets for model training.
Appendix 1: Running SQL in Jupyter
(magpied from Nels’ existing HyMind exemplar)
from datetime import datetime, timedelta
import os
from pathlib import Path
from pprint import pprint
import urllib
import arrow
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from hylib.dt import LONDON_TZ, convert_dt_columns_to_london_tz
= 'T03' ward
EMAP credentials
EMAP credentials are allocated per user and not stored in the environment variables. You do not want your credentials to leak into the source repository.
One way of safeguarding is to create a file called secret
at the top level of the HyMind repository (one above this notebook).
Do this here in Jupyter and not a local copy of the repo.
The first line should be your UDS username and the second line should be your UDS password.
secret
has been added to .gitignore
and will be excluded from the repository.
Read your username & password into the environment:
'EMAP_DB_USER'], os.environ['EMAP_DB_PASSWORD'] = Path('../secret').read_text().strip().split('\n') os.environ[
= os.getenv('EMAP_DB_HOST')
uds_host = os.getenv('EMAP_DB_NAME')
uds_name = os.getenv('EMAP_DB_PORT')
uds_port = os.getenv('EMAP_DB_USER')
uds_user = os.getenv('EMAP_DB_PASSWORD') uds_passwd
Create a SQLAlchemy Engine for accessing the UDS:
= create_engine(f'postgresql://{uds_user}:{uds_passwd}@{uds_host}:{uds_port}/{uds_name}') emapdb_engine
from hyflow.settings import SQL_DIR
= (SQL_DIR / "emap__icu_location_visit_history.sql").read_text() visits_sql
# the point-in-time we are interested in: 7am on 17/07/2021 BST
= datetime(2021, 7, 17, 7, 0, 0).astimezone(LONDON_TZ) horizon_dt
from hylib.load.hydef import beds_from_hydef
= beds_from_hydef(ward) beds_df
= pd.read_sql(
visits_df
visits_sql,
emapdb_engine,={"horizon_dt": horizon_dt, "locations": list(beds_df.hl7_location)},
params )
visits_df.head()
Appendix 2: Running some Transformer code
from datetime import datetime
import logging
from fastapi import APIRouter
from hylib.load.hydef import icu_observation_types_from_hydef
from hyflow.load.icu.icu_episode_slices import icu_episode_slices_from_hyflow
from hyflow.load.icu.icu_observations import icu_observations_from_hyflow
from hyflow.load.icu.icu_patients import icu_patients_from_hyflow
from hygear.transform.cog1.base import BaseCog1Transformer
from typing import List
class AdmissionAgeTransformer(BaseCog1Transformer):
"""
An transformer for age at admission
Output Features:
`admission_age_years`: float
Patient's age in years
"""
= ["episode_slice_id", "admission_dt", "dob"]
input_cols
@property
def output_cols(self) -> List[str]:
return ["episode_slice_id", "admission_age_years"]
def years(self, row: pd.Series) -> float:
if pd.isnull(row.dob):
return np.nan
else:
return int(row["admission_dt"].year) - int(row["dob"].year)
def transform(self) -> pd.DataFrame:
= self.input_df
output_df
"admission_age_years"] = output_df.apply(self.years, axis=1)
output_df[
return output_df.loc[:, self.output_cols]
ward
= datetime(2021, 10, 12, 11, 00).astimezone(LONDON_TZ) horizon_dt
= icu_episode_slices_from_hyflow(ward, horizon_dt) episode_slices_df
episode_slices_df.shape
= icu_patients_from_hyflow(
patients_df list(episode_slices_df.episode_slice_id)
ward, horizon_dt, )
= episode_slices_df.loc[:, ["episode_slice_id", "admission_dt"]].join(
age_input_df "episode_slice_id", "dob"]].set_index("episode_slice_id"),
patients_df.loc[:, [="episode_slice_id",
on )
= AdmissionAgeTransformer(ward, horizon_dt, age_input_df).transform()
age_df = episode_slices_df.loc[:, ["episode_slice_id"]].join(
output_df "episode_slice_id"), on="episode_slice_id"
age_df.set_index( )
age_df