Skip to content

Modules

Copyright (c) 2020 Bell Eapen

This software is released under the MIT License. https://opensource.org/licenses/MIT

Fhiry

Bases: BaseFhiry

Read and process FHIR Bundles (.json) from file or folder.

Parameters:

Name Type Description Default
config_json

Optional JSON string or file path with column transforms.

None
Source code in src/fhiry/fhiry.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class Fhiry(BaseFhiry):
    """Read and process FHIR Bundles (.json) from file or folder.

    Args:
        config_json: Optional JSON string or file path with column transforms.
    """

    def __init__(self, config_json=None):
        self._filename = ""
        self._folder = ""
        super().__init__(config_json=config_json)

    @property
    def df(self):
        """pd.DataFrame | None: The current working dataframe, if any."""
        return self._df

    @property
    def filename(self):
        """str: The path to the currently selected input file, if any."""
        return self._filename

    @property
    def folder(self):
        """str: The path to the input folder containing Bundle JSON files."""
        return self._folder

    @property
    def delete_col_raw_coding(self):
        """bool: Whether to drop raw coding/display columns after extraction."""
        return self._delete_col_raw_coding

    @filename.setter
    def filename(self, filename):
        """Set the input file and load it into a dataframe.

        Args:
            filename (str): Path to a FHIR Bundle JSON file.
        """
        self._filename = filename
        self._df = self.read_bundle_from_file(filename)

    @folder.setter
    def folder(self, folder):
        """Set the input folder for processing Bundle JSON files.

        Args:
            folder (str): Path to a directory containing JSON files.
        """
        self._folder = folder

    @delete_col_raw_coding.setter
    def delete_col_raw_coding(self, delete_col_raw_coding):
        """Set whether to drop raw coding/display columns after extraction."""
        self._delete_col_raw_coding = delete_col_raw_coding

    def read_bundle_from_file(self, filename):
        """Load a FHIR Bundle JSON file and normalize its entries.

        Args:
            filename (str): Path to a FHIR Bundle JSON file.

        Returns:
            pd.DataFrame: Dataframe of the Bundle entries.
        """
        with open(filename, encoding="utf8", mode="r") as f:
            json_in = f.read()
            json_in = json.loads(json_in)
            return pd.json_normalize(json_in["entry"])

    def process_source(self):
        """Process either the selected file or the entire folder.

        Only columns common across resources will be mapped.
        """
        if self._folder:
            df = pd.DataFrame(columns=[])
            for file in tqdm(os.listdir(self._folder)):
                if file.endswith(".json"):
                    self._df = self.read_bundle_from_file(
                        os.path.join(self._folder, file)
                    )
                    self.process_df()
                    if df.empty:
                        df = self._df
                    else:
                        df = pd.concat([df, self._df])
            self._df = df
        elif self._filename:
            self._df = self.read_bundle_from_file(self._filename)
        super().process_df()

    def process_file(self, filename):
        """Process a single Bundle JSON file and return its dataframe."""
        self._df = self.read_bundle_from_file(filename)
        self.process_df()
        return self._df

    def process_bundle_dict(self, bundle_dict):
        """Process a FHIR Bundle dictionary and return its dataframe."""
        self._df = self.read_bundle_from_bundle_dict(bundle_dict)
        self.process_df()
        return self._df

delete_col_raw_coding property writable

bool: Whether to drop raw coding/display columns after extraction.

df property

pd.DataFrame | None: The current working dataframe, if any.

filename property writable

str: The path to the currently selected input file, if any.

folder property writable

str: The path to the input folder containing Bundle JSON files.

process_bundle_dict(bundle_dict)

Process a FHIR Bundle dictionary and return its dataframe.

Source code in src/fhiry/fhiry.py
118
119
120
121
122
def process_bundle_dict(self, bundle_dict):
    """Process a FHIR Bundle dictionary and return its dataframe."""
    self._df = self.read_bundle_from_bundle_dict(bundle_dict)
    self.process_df()
    return self._df

process_file(filename)

Process a single Bundle JSON file and return its dataframe.

Source code in src/fhiry/fhiry.py
112
113
114
115
116
def process_file(self, filename):
    """Process a single Bundle JSON file and return its dataframe."""
    self._df = self.read_bundle_from_file(filename)
    self.process_df()
    return self._df

process_source()

Process either the selected file or the entire folder.

Only columns common across resources will be mapped.

Source code in src/fhiry/fhiry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def process_source(self):
    """Process either the selected file or the entire folder.

    Only columns common across resources will be mapped.
    """
    if self._folder:
        df = pd.DataFrame(columns=[])
        for file in tqdm(os.listdir(self._folder)):
            if file.endswith(".json"):
                self._df = self.read_bundle_from_file(
                    os.path.join(self._folder, file)
                )
                self.process_df()
                if df.empty:
                    df = self._df
                else:
                    df = pd.concat([df, self._df])
        self._df = df
    elif self._filename:
        self._df = self.read_bundle_from_file(self._filename)
    super().process_df()

read_bundle_from_file(filename)

Load a FHIR Bundle JSON file and normalize its entries.

Parameters:

Name Type Description Default
filename str

Path to a FHIR Bundle JSON file.

required

Returns:

Type Description

pd.DataFrame: Dataframe of the Bundle entries.

Source code in src/fhiry/fhiry.py
76
77
78
79
80
81
82
83
84
85
86
87
88
def read_bundle_from_file(self, filename):
    """Load a FHIR Bundle JSON file and normalize its entries.

    Args:
        filename (str): Path to a FHIR Bundle JSON file.

    Returns:
        pd.DataFrame: Dataframe of the Bundle entries.
    """
    with open(filename, encoding="utf8", mode="r") as f:
        json_in = f.read()
        json_in = json.loads(json_in)
        return pd.json_normalize(json_in["entry"])

Copyright (c) 2020 Bell Eapen

This software is released under the MIT License. https://opensource.org/licenses/MIT

BaseFhiry

Bases: object

Base class providing common dataframe processing utilities for FHIR.

This class encapsulates common logic for transforming FHIR bundle data into a pandas DataFrame, including column cleanup, code extraction, and patient ID derivation.

Parameters:

Name Type Description Default
config_json

Either a JSON string or a path to a JSON file specifying transformations with keys: - "REMOVE": list[str] of column prefixes to remove - "RENAME": dict[str, str] mapping old->new column names If None, a sensible default is used.

None
Source code in src/fhiry/base_fhiry.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class BaseFhiry(object):
    """Base class providing common dataframe processing utilities for FHIR.

    This class encapsulates common logic for transforming FHIR bundle data into
    a pandas DataFrame, including column cleanup, code extraction, and patient
    ID derivation.

    Args:
        config_json: Either a JSON string or a path to a JSON file specifying
            transformations with keys:
            - "REMOVE": list[str] of column prefixes to remove
            - "RENAME": dict[str, str] mapping old->new column names
            If None, a sensible default is used.
    """

    def __init__(self, config_json=None):
        self._df = None

        # Codes from the FHIR datatype "coding"
        # (f.e. element resource.code.coding or element resource.clinicalStatus.coding)
        # are extracted to a col "codingcodes"
        # (f.e. col resource.code.codingcodes or col resource.clinicalStatus.codingcodes)
        # without other for analysis often not needed metadata like f.e. codesystem URI
        # or FHIR extensions for coding entries.
        # The full / raw object in col "coding" is deleted after this extraction.
        # If you want to analyze more than the content of code and display from codings
        # (like f.e. different codesystem URIs or further codes in extensions
        # in the raw data/object), you can disable deletion of the raw source object "coding"
        # (f.e. col "resource.code.coding") by setting property delete_col_raw_coding to False
        self._delete_col_raw_coding = True
        if config_json is not None:
            try:
                with open(config_json, "r") as f:  # config_json is a file path
                    self.config = json.load(f)
            except:
                self.config = json.loads(config_json)  # config_json is a json string
        else:
            self.config = json.loads(
                '{ "REMOVE": ["resource.text.div"], "RENAME": { "resource.id": "id" } }'
            )

    @property
    def df(self):
        """pd.DataFrame | None: The current working dataframe, if any."""
        return self._df

    @property
    def delete_col_raw_coding(self):
        """bool: Whether to drop raw coding/display columns after extraction."""
        return self._delete_col_raw_coding

    @delete_col_raw_coding.setter
    def delete_col_raw_coding(self, delete_col_raw_coding):
        """Set whether to drop raw coding/display columns after extraction.

        Args:
            delete_col_raw_coding (bool): True to delete raw columns after creating
                derived columns, False to keep them.
        """
        self._delete_col_raw_coding = delete_col_raw_coding

    def read_bundle_from_bundle_dict(self, bundle_dict):
        """Normalize a FHIR Bundle dict to a dataframe of entries.

        Args:
            bundle_dict (dict): A FHIR Bundle object with an "entry" list.

        Returns:
            pd.DataFrame: Dataframe where each row corresponds to a Bundle entry.
        """
        return pd.json_normalize(bundle_dict["entry"])

    def delete_unwanted_cols(self):
        """Delete unwanted columns from the dataframe.

        Uses the "REMOVE" list from the configuration. Any column that equals a
        listed value or starts with that value followed by a dot will be removed.
        Safely no-ops if the dataframe or configuration is missing.
        """
        if self._df is None:
            logger.warning("Dataframe is empty, nothing to delete")
            return
        if "REMOVE" not in self.config:
            logger.warning("No columns to remove defined in config")
            return
        if not isinstance(self.config["REMOVE"], list):
            logger.warning(
                "REMOVE in config is not a list, expected a list of column names to remove"
            )
            return
        if len(self.config["REMOVE"]) == 0:
            logger.warning("No columns to remove defined in config")
            return
        for col in self.config["REMOVE"]:
            cols_to_remove = [
                c for c in self._df.columns if c == col or c.startswith(f"{col}.")
            ]
            for c in cols_to_remove:
                del self._df[c]

    def rename_cols(self):
        """Rename dataframe columns according to the configuration.

        Uses the "RENAME" mapping from the configuration. Safely no-ops if the
        dataframe is empty.
        """
        if self._df is not None:
            self._df.rename(columns=self.config["RENAME"], inplace=True)
        else:
            logger.warning("Dataframe is empty, nothing to rename")

    def remove_string_from_columns(self, string_to_remove="resource."):
        """Remove a literal substring from all column names.

        Args:
            string_to_remove: Substring to remove from column names.

        Returns:
            pd.DataFrame | None: The updated dataframe or None if unset.
        """
        if self._df is not None:
            self._df.columns = self._df.columns.str.replace(
                string_to_remove, "", regex=False
            )
        else:
            logger.warning("Dataframe is empty, cannot remove string from columns")
        return self._df

    def process_df(self):
        """Run the standard transformation pipeline on the dataframe.

        Steps include:
        - Extracting codes from coding/display objects to flat columns
        - Adding a patientId column
        - Removing common prefix from column names
        - Converting empty lists to NaN
        - Dropping empty columns
        - Deleting unwanted columns
        - Renaming columns per config

        Returns:
            pd.DataFrame | None: The processed dataframe, or None if unset.
        """
        self.convert_object_to_list()
        self.add_patient_id()
        self.remove_string_from_columns(string_to_remove="resource.")
        self.empty_list_to_nan()
        self.drop_empty_cols()
        self.delete_unwanted_cols()
        self.rename_cols()
        return self._df

    def empty_list_to_nan(self):
        """Convert empty list values in object columns to NaN."""
        if self._df is None:
            logger.warning("Dataframe is empty, nothing to convert")
            return
        for col in self._df.columns:
            if self._df[col].dtype == "object":
                self._df[col] = self._df[col].apply(
                    lambda x: float("nan") if isinstance(x, list) and len(x) == 0 else x
                )

    def drop_empty_cols(self):
        """Drop columns that are completely empty (all NaN values)."""
        if self._df is None:
            logger.warning("Dataframe is empty, nothing to drop")
            return
        self._df.dropna(axis=1, how="all", inplace=True)
        if self._df is not None and self._df.empty:
            logger.warning("Dataframe is empty after dropping empty columns")
        return self._df

    def process_bundle_dict(self, bundle_dict):
        """Load and process a FHIR Bundle dictionary.

        Args:
            bundle_dict (dict): A FHIR Bundle object.

        Returns:
            pd.DataFrame | None: The processed dataframe, or None if empty.
        """
        self._df = self.read_bundle_from_bundle_dict(bundle_dict)
        if self._df is None or self._df.empty:
            logger.warning("Dataframe is empty, nothing to process")
            return None
        self._df = self.process_df()
        return self._df

    def convert_object_to_list(self):
        """Extract codes/display from nested objects into flat list columns.

        For columns containing "coding" or "display" in their names, extract a
        list of codes or display texts into new columns with ".codes" or
        ".display" suffixes. Optionally drops raw source columns.
        """
        if self._df is None:
            logger.warning("Dataframe is empty, nothing to convert")
            return

        def _codes_comma_series(src_col: str) -> pd.Series:
            """Return a Series with comma-separated strings from list-like values.

            Args:
                src_col: Column name to extract and stringify.

            Returns:
                pd.Series: Comma-separated strings (or empty string when None).
            """
            codes = self._df.apply(lambda x: self.process_list(x[src_col]), axis=1)
            return codes.apply(
                lambda x: (
                    ", ".join(x)
                    if isinstance(x, list) and x is not None
                    else (x if x is not None else "")
                )
            )

        for col in self._df.columns:
            if "coding" in col:
                codes_as_comma_separated = _codes_comma_series(col)
                self._df = pd.concat(
                    [self._df, codes_as_comma_separated.to_frame(name=col + ".codes")],
                    axis=1,
                )
                if self._delete_col_raw_coding:
                    del self._df[col]
            if "display" in col:
                codes_as_comma_separated = _codes_comma_series(col)
                self._df = pd.concat(
                    [
                        self._df,
                        codes_as_comma_separated.to_frame(name=col + ".display"),
                    ],
                    axis=1,
                )
                del self._df[col]

    def add_patient_id(self):
        """Add a patientId column inferred from resource fields.

        If the resource type is Patient, uses the resource id; otherwise attempts
        to derive the patient identifier from known subject/patient reference fields.
        """
        if self._df is None:
            logger.warning("Dataframe is empty, cannot add patientId")
            return
        try:
            # PerformanceWarning: DataFrame is highly fragmented.  This is usually the result of calling `frame.insert` many times, which has poor performance.  Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()`
            newframe = self._df.copy()
            newframe["patientId"] = self._df.apply(
                lambda x: (
                    x["resource.id"]
                    if x["resource.resourceType"] == "Patient"
                    else self.check_subject_reference(x)
                ),
                axis=1,
            )
            self._df = newframe
        except:
            try:
                newframe = self._df.copy()
                newframe["patientId"] = self._df.apply(
                    lambda x: (
                        x["id"]
                        if x["resourceType"] == "Patient"
                        else self.check_subject_reference(x)
                    ),
                    axis=1,
                )
                self._df = newframe
            except:
                pass

    def check_subject_reference(self, row):
        """Extract patient id from subject/patient reference fields.

        Args:
            row (Mapping[str, Any]): A dataframe row as a mapping.

        Returns:
            str: The patient id (without "Patient/" or "urn:uuid:" prefix) or
            an empty string if not found.
        """
        keys = [
            "resource.subject.reference",
            "resource.patient.reference",
            "subject.reference",
            "patient.reference",
        ]

        def _clean(ref):
            if not isinstance(ref, str):
                return ""
            return ref.replace("Patient/", "").replace("urn:uuid:", "")

        for key in keys:
            ref = row.get(key, None)
            if pd.notna(ref):
                return _clean(ref)

        return ""

    def get_info(self):
        """Return a concise info string for the current dataframe.

        Returns:
            str: Dataframe info text or a message if no dataframe is set.
        """
        if self._df is None:
            return "Dataframe is empty"
        return self._df.info()

    def process_list(self, myList):
        """Extract code or display strings from a list of coding-like dicts.

        Args:
            myList (list): A list of dictionaries that may contain "code" or
                "display" keys.

        Returns:
            list[str]: A list of extracted codes/display texts.
        """
        myCodes = []
        if isinstance(myList, list):
            for entry in myList:
                if "code" in entry:
                    myCodes.append(entry["code"])
                elif "display" in entry:
                    myCodes.append(entry["display"])
        return myCodes

    def llm_query(self, query, llm, embed_model=None, verbose=True):
        """Execute a natural language query against the dataframe using LLM tools.

        Args:
            query (str): The natural language question.
            llm (Any): The language model instance usable by llama_index.
            embed_model (str | None): Optional HuggingFace embedding model name.
            verbose (bool): Whether to enable verbose output from the query engine.

        Raises:
            Exception: If required libraries are not installed.
            Exception: If the dataframe is empty.

        Returns:
            Any: The query result from the underlying engine.
        """
        try:
            from langchain_huggingface import HuggingFaceEmbeddings
            from llama_index.core import Settings
            from llama_index.experimental.query_engine import PandasQueryEngine
        except Exception:
            raise Exception("llama_index or HuggingFaceEmbeddings not installed")
        if self._df is None:
            raise Exception("Dataframe is empty")
        if embed_model is None:
            embed_model = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
        else:
            embed_model = HuggingFaceEmbeddings(model_name=embed_model)
        Settings.llm = llm
        Settings.embed_model = embed_model
        query_engine = PandasQueryEngine(
            df=self._df,
            verbose=verbose,
        )
        return query_engine.query(query)

delete_col_raw_coding property writable

bool: Whether to drop raw coding/display columns after extraction.

df property

pd.DataFrame | None: The current working dataframe, if any.

add_patient_id()

Add a patientId column inferred from resource fields.

If the resource type is Patient, uses the resource id; otherwise attempts to derive the patient identifier from known subject/patient reference fields.

Source code in src/fhiry/base_fhiry.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def add_patient_id(self):
    """Add a patientId column inferred from resource fields.

    If the resource type is Patient, uses the resource id; otherwise attempts
    to derive the patient identifier from known subject/patient reference fields.
    """
    if self._df is None:
        logger.warning("Dataframe is empty, cannot add patientId")
        return
    try:
        # PerformanceWarning: DataFrame is highly fragmented.  This is usually the result of calling `frame.insert` many times, which has poor performance.  Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()`
        newframe = self._df.copy()
        newframe["patientId"] = self._df.apply(
            lambda x: (
                x["resource.id"]
                if x["resource.resourceType"] == "Patient"
                else self.check_subject_reference(x)
            ),
            axis=1,
        )
        self._df = newframe
    except:
        try:
            newframe = self._df.copy()
            newframe["patientId"] = self._df.apply(
                lambda x: (
                    x["id"]
                    if x["resourceType"] == "Patient"
                    else self.check_subject_reference(x)
                ),
                axis=1,
            )
            self._df = newframe
        except:
            pass

check_subject_reference(row)

Extract patient id from subject/patient reference fields.

Parameters:

Name Type Description Default
row Mapping[str, Any]

A dataframe row as a mapping.

required

Returns:

Name Type Description
str

The patient id (without "Patient/" or "urn:uuid:" prefix) or

an empty string if not found.

Source code in src/fhiry/base_fhiry.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def check_subject_reference(self, row):
    """Extract patient id from subject/patient reference fields.

    Args:
        row (Mapping[str, Any]): A dataframe row as a mapping.

    Returns:
        str: The patient id (without "Patient/" or "urn:uuid:" prefix) or
        an empty string if not found.
    """
    keys = [
        "resource.subject.reference",
        "resource.patient.reference",
        "subject.reference",
        "patient.reference",
    ]

    def _clean(ref):
        if not isinstance(ref, str):
            return ""
        return ref.replace("Patient/", "").replace("urn:uuid:", "")

    for key in keys:
        ref = row.get(key, None)
        if pd.notna(ref):
            return _clean(ref)

    return ""

convert_object_to_list()

Extract codes/display from nested objects into flat list columns.

For columns containing "coding" or "display" in their names, extract a list of codes or display texts into new columns with ".codes" or ".display" suffixes. Optionally drops raw source columns.

Source code in src/fhiry/base_fhiry.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def convert_object_to_list(self):
    """Extract codes/display from nested objects into flat list columns.

    For columns containing "coding" or "display" in their names, extract a
    list of codes or display texts into new columns with ".codes" or
    ".display" suffixes. Optionally drops raw source columns.
    """
    if self._df is None:
        logger.warning("Dataframe is empty, nothing to convert")
        return

    def _codes_comma_series(src_col: str) -> pd.Series:
        """Return a Series with comma-separated strings from list-like values.

        Args:
            src_col: Column name to extract and stringify.

        Returns:
            pd.Series: Comma-separated strings (or empty string when None).
        """
        codes = self._df.apply(lambda x: self.process_list(x[src_col]), axis=1)
        return codes.apply(
            lambda x: (
                ", ".join(x)
                if isinstance(x, list) and x is not None
                else (x if x is not None else "")
            )
        )

    for col in self._df.columns:
        if "coding" in col:
            codes_as_comma_separated = _codes_comma_series(col)
            self._df = pd.concat(
                [self._df, codes_as_comma_separated.to_frame(name=col + ".codes")],
                axis=1,
            )
            if self._delete_col_raw_coding:
                del self._df[col]
        if "display" in col:
            codes_as_comma_separated = _codes_comma_series(col)
            self._df = pd.concat(
                [
                    self._df,
                    codes_as_comma_separated.to_frame(name=col + ".display"),
                ],
                axis=1,
            )
            del self._df[col]

delete_unwanted_cols()

Delete unwanted columns from the dataframe.

Uses the "REMOVE" list from the configuration. Any column that equals a listed value or starts with that value followed by a dot will be removed. Safely no-ops if the dataframe or configuration is missing.

Source code in src/fhiry/base_fhiry.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def delete_unwanted_cols(self):
    """Delete unwanted columns from the dataframe.

    Uses the "REMOVE" list from the configuration. Any column that equals a
    listed value or starts with that value followed by a dot will be removed.
    Safely no-ops if the dataframe or configuration is missing.
    """
    if self._df is None:
        logger.warning("Dataframe is empty, nothing to delete")
        return
    if "REMOVE" not in self.config:
        logger.warning("No columns to remove defined in config")
        return
    if not isinstance(self.config["REMOVE"], list):
        logger.warning(
            "REMOVE in config is not a list, expected a list of column names to remove"
        )
        return
    if len(self.config["REMOVE"]) == 0:
        logger.warning("No columns to remove defined in config")
        return
    for col in self.config["REMOVE"]:
        cols_to_remove = [
            c for c in self._df.columns if c == col or c.startswith(f"{col}.")
        ]
        for c in cols_to_remove:
            del self._df[c]

drop_empty_cols()

Drop columns that are completely empty (all NaN values).

Source code in src/fhiry/base_fhiry.py
180
181
182
183
184
185
186
187
188
def drop_empty_cols(self):
    """Drop columns that are completely empty (all NaN values)."""
    if self._df is None:
        logger.warning("Dataframe is empty, nothing to drop")
        return
    self._df.dropna(axis=1, how="all", inplace=True)
    if self._df is not None and self._df.empty:
        logger.warning("Dataframe is empty after dropping empty columns")
    return self._df

empty_list_to_nan()

Convert empty list values in object columns to NaN.

Source code in src/fhiry/base_fhiry.py
169
170
171
172
173
174
175
176
177
178
def empty_list_to_nan(self):
    """Convert empty list values in object columns to NaN."""
    if self._df is None:
        logger.warning("Dataframe is empty, nothing to convert")
        return
    for col in self._df.columns:
        if self._df[col].dtype == "object":
            self._df[col] = self._df[col].apply(
                lambda x: float("nan") if isinstance(x, list) and len(x) == 0 else x
            )

get_info()

Return a concise info string for the current dataframe.

Returns:

Name Type Description
str

Dataframe info text or a message if no dataframe is set.

Source code in src/fhiry/base_fhiry.py
320
321
322
323
324
325
326
327
328
def get_info(self):
    """Return a concise info string for the current dataframe.

    Returns:
        str: Dataframe info text or a message if no dataframe is set.
    """
    if self._df is None:
        return "Dataframe is empty"
    return self._df.info()

llm_query(query, llm, embed_model=None, verbose=True)

Execute a natural language query against the dataframe using LLM tools.

Parameters:

Name Type Description Default
query str

The natural language question.

required
llm Any

The language model instance usable by llama_index.

required
embed_model str | None

Optional HuggingFace embedding model name.

None
verbose bool

Whether to enable verbose output from the query engine.

True

Raises:

Type Description
Exception

If required libraries are not installed.

Exception

If the dataframe is empty.

Returns:

Name Type Description
Any

The query result from the underlying engine.

Source code in src/fhiry/base_fhiry.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def llm_query(self, query, llm, embed_model=None, verbose=True):
    """Execute a natural language query against the dataframe using LLM tools.

    Args:
        query (str): The natural language question.
        llm (Any): The language model instance usable by llama_index.
        embed_model (str | None): Optional HuggingFace embedding model name.
        verbose (bool): Whether to enable verbose output from the query engine.

    Raises:
        Exception: If required libraries are not installed.
        Exception: If the dataframe is empty.

    Returns:
        Any: The query result from the underlying engine.
    """
    try:
        from langchain_huggingface import HuggingFaceEmbeddings
        from llama_index.core import Settings
        from llama_index.experimental.query_engine import PandasQueryEngine
    except Exception:
        raise Exception("llama_index or HuggingFaceEmbeddings not installed")
    if self._df is None:
        raise Exception("Dataframe is empty")
    if embed_model is None:
        embed_model = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
    else:
        embed_model = HuggingFaceEmbeddings(model_name=embed_model)
    Settings.llm = llm
    Settings.embed_model = embed_model
    query_engine = PandasQueryEngine(
        df=self._df,
        verbose=verbose,
    )
    return query_engine.query(query)

process_bundle_dict(bundle_dict)

Load and process a FHIR Bundle dictionary.

Parameters:

Name Type Description Default
bundle_dict dict

A FHIR Bundle object.

required

Returns:

Type Description

pd.DataFrame | None: The processed dataframe, or None if empty.

Source code in src/fhiry/base_fhiry.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def process_bundle_dict(self, bundle_dict):
    """Load and process a FHIR Bundle dictionary.

    Args:
        bundle_dict (dict): A FHIR Bundle object.

    Returns:
        pd.DataFrame | None: The processed dataframe, or None if empty.
    """
    self._df = self.read_bundle_from_bundle_dict(bundle_dict)
    if self._df is None or self._df.empty:
        logger.warning("Dataframe is empty, nothing to process")
        return None
    self._df = self.process_df()
    return self._df

process_df()

Run the standard transformation pipeline on the dataframe.

Steps include: - Extracting codes from coding/display objects to flat columns - Adding a patientId column - Removing common prefix from column names - Converting empty lists to NaN - Dropping empty columns - Deleting unwanted columns - Renaming columns per config

Returns:

Type Description

pd.DataFrame | None: The processed dataframe, or None if unset.

Source code in src/fhiry/base_fhiry.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def process_df(self):
    """Run the standard transformation pipeline on the dataframe.

    Steps include:
    - Extracting codes from coding/display objects to flat columns
    - Adding a patientId column
    - Removing common prefix from column names
    - Converting empty lists to NaN
    - Dropping empty columns
    - Deleting unwanted columns
    - Renaming columns per config

    Returns:
        pd.DataFrame | None: The processed dataframe, or None if unset.
    """
    self.convert_object_to_list()
    self.add_patient_id()
    self.remove_string_from_columns(string_to_remove="resource.")
    self.empty_list_to_nan()
    self.drop_empty_cols()
    self.delete_unwanted_cols()
    self.rename_cols()
    return self._df

process_list(myList)

Extract code or display strings from a list of coding-like dicts.

Parameters:

Name Type Description Default
myList list

A list of dictionaries that may contain "code" or "display" keys.

required

Returns:

Type Description

list[str]: A list of extracted codes/display texts.

Source code in src/fhiry/base_fhiry.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def process_list(self, myList):
    """Extract code or display strings from a list of coding-like dicts.

    Args:
        myList (list): A list of dictionaries that may contain "code" or
            "display" keys.

    Returns:
        list[str]: A list of extracted codes/display texts.
    """
    myCodes = []
    if isinstance(myList, list):
        for entry in myList:
            if "code" in entry:
                myCodes.append(entry["code"])
            elif "display" in entry:
                myCodes.append(entry["display"])
    return myCodes

read_bundle_from_bundle_dict(bundle_dict)

Normalize a FHIR Bundle dict to a dataframe of entries.

Parameters:

Name Type Description Default
bundle_dict dict

A FHIR Bundle object with an "entry" list.

required

Returns:

Type Description

pd.DataFrame: Dataframe where each row corresponds to a Bundle entry.

Source code in src/fhiry/base_fhiry.py
78
79
80
81
82
83
84
85
86
87
def read_bundle_from_bundle_dict(self, bundle_dict):
    """Normalize a FHIR Bundle dict to a dataframe of entries.

    Args:
        bundle_dict (dict): A FHIR Bundle object with an "entry" list.

    Returns:
        pd.DataFrame: Dataframe where each row corresponds to a Bundle entry.
    """
    return pd.json_normalize(bundle_dict["entry"])

remove_string_from_columns(string_to_remove='resource.')

Remove a literal substring from all column names.

Parameters:

Name Type Description Default
string_to_remove

Substring to remove from column names.

'resource.'

Returns:

Type Description

pd.DataFrame | None: The updated dataframe or None if unset.

Source code in src/fhiry/base_fhiry.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def remove_string_from_columns(self, string_to_remove="resource."):
    """Remove a literal substring from all column names.

    Args:
        string_to_remove: Substring to remove from column names.

    Returns:
        pd.DataFrame | None: The updated dataframe or None if unset.
    """
    if self._df is not None:
        self._df.columns = self._df.columns.str.replace(
            string_to_remove, "", regex=False
        )
    else:
        logger.warning("Dataframe is empty, cannot remove string from columns")
    return self._df

rename_cols()

Rename dataframe columns according to the configuration.

Uses the "RENAME" mapping from the configuration. Safely no-ops if the dataframe is empty.

Source code in src/fhiry/base_fhiry.py
117
118
119
120
121
122
123
124
125
126
def rename_cols(self):
    """Rename dataframe columns according to the configuration.

    Uses the "RENAME" mapping from the configuration. Safely no-ops if the
    dataframe is empty.
    """
    if self._df is not None:
        self._df.rename(columns=self.config["RENAME"], inplace=True)
    else:
        logger.warning("Dataframe is empty, nothing to rename")

Copyright (c) 2023 Bell Eapen

This software is released under the MIT License. https://opensource.org/licenses/MIT

BQsearch

Bases: BaseFhiry

Query FHIR datasets in Google BigQuery and process results.

Source code in src/fhiry/bqsearch.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BQsearch(BaseFhiry):
    """Query FHIR datasets in Google BigQuery and process results."""

    def __init__(self, config_json=None):
        # Construct a BigQuery client object.
        self._client = bigquery.Client()
        super().__init__(config_json=config_json)

    def search(self, query=None):
        """Run a BigQuery SQL query and return a processed dataframe.

        Args:
            query (str | None): Either a SQL string, a path to a .sql file, or
                None to run a default sample query.

        Returns:
            pd.DataFrame: The query results after standard processing.
        """
        if query is None:
            _query = """
                SELECT *
                FROM `bigquery-public-data.fhir_synthea.patient`
                LIMIT 20
            """
        else:
            try:
                with open(query, "r") as f:
                    _query = f.read()
            except:
                _query = query

        self._df = self._client.query(_query).to_dataframe()
        super().process_df()
        return self._df

search(query=None)

Run a BigQuery SQL query and return a processed dataframe.

Parameters:

Name Type Description Default
query str | None

Either a SQL string, a path to a .sql file, or None to run a default sample query.

None

Returns:

Type Description

pd.DataFrame: The query results after standard processing.

Source code in src/fhiry/bqsearch.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def search(self, query=None):
    """Run a BigQuery SQL query and return a processed dataframe.

    Args:
        query (str | None): Either a SQL string, a path to a .sql file, or
            None to run a default sample query.

    Returns:
        pd.DataFrame: The query results after standard processing.
    """
    if query is None:
        _query = """
            SELECT *
            FROM `bigquery-public-data.fhir_synthea.patient`
            LIMIT 20
        """
    else:
        try:
            with open(query, "r") as f:
                _query = f.read()
        except:
            _query = query

    self._df = self._client.query(_query).to_dataframe()
    super().process_df()
    return self._df

Fhirsearch

Bases: BaseFhiry

Search FHIR servers and aggregate results into a dataframe.

This client pages through FHIR search results and builds a unified pandas DataFrame using the BaseFhiry processing pipeline.

Parameters:

Name Type Description Default
fhir_base_url str

Base URL of the FHIR server (e.g., "https://.../fhir").

required
config_json

Optional JSON string or file path with column transforms.

None
Source code in src/fhiry/fhirsearch.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Fhirsearch(BaseFhiry):
    """Search FHIR servers and aggregate results into a dataframe.

    This client pages through FHIR search results and builds a unified
    pandas DataFrame using the BaseFhiry processing pipeline.

    Args:
        fhir_base_url (str): Base URL of the FHIR server (e.g., "https://.../fhir").
        config_json: Optional JSON string or file path with column transforms.
    """

    def __init__(self, fhir_base_url, config_json=None):
        self.fhir_base_url = fhir_base_url

        # Batch size (entries per page)
        self.page_size = 500

        # Keyword arguments for HTTP(s) requests (f.e. for auth)
        # Example parameters:
        # Authentication: https://requests.readthedocs.io/en/latest/user/authentication/#basic-authentication
        # Proxies: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
        # SSL Certificates: https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification
        self.requests_kwargs = {}
        super().__init__(config_json=config_json)

    def search(self, resource_type="Patient", search_parameters={}):
        """Search the FHIR server and return the combined results.

        Args:
            resource_type (str): FHIR resource type to search (e.g., "Patient").
            search_parameters (dict): Query parameters per FHIR spec; _count is
                auto-set to the configured page size if absent.

        Returns:
            pd.DataFrame: Combined search results across all pages.
        """

        headers = {"Content-Type": "application/fhir+json"}

        if "_count" not in search_parameters:
            search_parameters["_count"] = self.page_size

        search_url = f"{self.fhir_base_url}/{resource_type}"
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        bundle_dict = r.json()

        if "entry" in bundle_dict:
            df = super().process_bundle_dict(bundle_dict)

            next_page_url = get_next_page_url(bundle_dict)

            while next_page_url:
                r = requests.get(next_page_url, headers=headers, **self.requests_kwargs)
                r.raise_for_status()
                bundle_dict = r.json()
                df_page = super().process_bundle_dict(bundle_dict)
                df = pd.concat([df, df_page])

                next_page_url = get_next_page_url(bundle_dict)
        else:
            df = pd.DataFrame(columns=[])

        self._df = df
        return self._df

search(resource_type='Patient', search_parameters={})

Search the FHIR server and return the combined results.

Parameters:

Name Type Description Default
resource_type str

FHIR resource type to search (e.g., "Patient").

'Patient'
search_parameters dict

Query parameters per FHIR spec; _count is auto-set to the configured page size if absent.

{}

Returns:

Type Description

pd.DataFrame: Combined search results across all pages.

Source code in src/fhiry/fhirsearch.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def search(self, resource_type="Patient", search_parameters={}):
    """Search the FHIR server and return the combined results.

    Args:
        resource_type (str): FHIR resource type to search (e.g., "Patient").
        search_parameters (dict): Query parameters per FHIR spec; _count is
            auto-set to the configured page size if absent.

    Returns:
        pd.DataFrame: Combined search results across all pages.
    """

    headers = {"Content-Type": "application/fhir+json"}

    if "_count" not in search_parameters:
        search_parameters["_count"] = self.page_size

    search_url = f"{self.fhir_base_url}/{resource_type}"
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    bundle_dict = r.json()

    if "entry" in bundle_dict:
        df = super().process_bundle_dict(bundle_dict)

        next_page_url = get_next_page_url(bundle_dict)

        while next_page_url:
            r = requests.get(next_page_url, headers=headers, **self.requests_kwargs)
            r.raise_for_status()
            bundle_dict = r.json()
            df_page = super().process_bundle_dict(bundle_dict)
            df = pd.concat([df, df_page])

            next_page_url = get_next_page_url(bundle_dict)
    else:
        df = pd.DataFrame(columns=[])

    self._df = df
    return self._df

get_next_page_url(bundle_dict)

Return the URL of the next page from a FHIR Bundle, if present.

Parameters:

Name Type Description Default
bundle_dict dict

The FHIR Bundle JSON object.

required

Returns:

Type Description

str | None: The 'next' page URL, or None if no more pages.

Source code in src/fhiry/fhirsearch.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_next_page_url(bundle_dict):
    """Return the URL of the next page from a FHIR Bundle, if present.

    Args:
        bundle_dict (dict): The FHIR Bundle JSON object.

    Returns:
        str | None: The 'next' page URL, or None if no more pages.
    """
    links = bundle_dict.get("link")
    if links:
        for link in links:
            relation = link.get("relation")
            if relation == "next":
                return link.get("url")

    return None

Copyright (c) 2024 Bell Eapen

This software is released under the MIT License. https://opensource.org/licenses/MIT

FlattenFhir

Bases: ABC

Flatten FHIR resources to concise human-readable text.

Parameters:

Name Type Description Default
fhirobject dict

A FHIR resource or Bundle to flatten.

{}
config_json

Currently unused placeholder for future options.

None
Source code in src/fhiry/flattenfhir.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class FlattenFhir(ABC):
    """Flatten FHIR resources to concise human-readable text.

    Args:
        fhirobject (dict): A FHIR resource or Bundle to flatten.
        config_json: Currently unused placeholder for future options.
    """

    def __init__(self, fhirobject={}, config_json=None):
        self._flattened = ""
        self._fhirobject = Prodict.from_dict(fhirobject)
        if fhirobject:
            self.flatten()

    @property
    def flattened(self):
        """str: The last flattened output string."""
        return self._flattened

    @property
    def fhirobject(self):
        """Prodict: The current FHIR object as Prodict."""
        return self._fhirobject

    @fhirobject.setter
    def fhirobject(self, fhirobject):
        """Set a FHIR object and immediately refresh the flattened output.

        Args:
            fhirobject (dict): A FHIR resource or Bundle.
        """
        self._fhirobject = Prodict.from_dict(fhirobject)
        self.flatten()

    def flatten(self):
        """Compute the flattened text for the current FHIR object.

        Returns:
            str: The flattened string.
        """
        if not self._fhirobject:
            _logger.info("FHIR object is not set.")
            raise ValueError("FHIR object is not set.")
        self._flattened = ""
        if self._fhirobject.resourceType == "Bundle":
            for entry in self._fhirobject.entry:
                _entry = Prodict.from_dict(entry)
                self.get_flattened_text(_entry.resource)
        else:
            self.get_flattened_text(self._fhirobject)
        return self._flattened

    def get_flattened_text(self, entry):
        """Append flattened text for a single FHIR entry to the buffer.

        Args:
            entry (Prodict): A FHIR resource object.

        Returns:
            str: The updated flattened string.
        """
        if entry.resourceType == "Patient":
            self._flattened += self.flatten_patient(entry)
        elif entry.resourceType == "Observation":
            self._flattened += self.flatten_observation(entry)
        elif entry.resourceType == "Medication":
            self._flattened += self.flatten_medication(entry)
        elif entry.resourceType == "Procedure":
            self._flattened += self.flatten_procedure(entry)
        elif entry.resourceType == "Condition":
            self._flattened += self.flatten_condition(entry)
        elif entry.resourceType == "AllergyIntolerance":
            self._flattened += self.flatten_allergyintolerance(entry)
        elif entry.resourceType == "DocumentReference":
            self._flattened += self.flatten_documentreference(entry)
        else:
            _logger.info(f"Resource type not supported: {entry.resourceType}")
        return self._flattened

    def get_timeago(self, datestring) -> str:
        """Return a human-friendly time-ago string for the given date.

        Args:
            datestring (str): ISO-like date string (YYYY-MM-DD...).

        Returns:
            str: Human-friendly relative time.
        """
        datestring = datestring[0:10]
        return timeago.format(datestring, datetime.datetime.now())

    def flatten_patient(self, patient) -> str:
        """Flatten a Patient into a short sentence.

        Args:
            patient: Patient resource object.

        Returns:
            str: Flattened snippet.
        """
        flat_patient = ""
        if "gender" in patient:
            flat_patient += f"Medical record of a {patient.gender} patient "
        else:
            _logger.info(f"Gender not found for patient {patient.id}")
            flat_patient += "Medical record of a patient "
        if "birthDate" in patient:
            flat_patient += f"born {self.get_timeago(patient.birthDate)}. "
        else:
            _logger.info(f"Birthdate not found for patient {patient.id}")
            flat_patient += "of unknown age. "
        return flat_patient

    def flatten_observation(self, observation) -> str:
        """Flatten an Observation into a short sentence."""
        flat_observation = ""
        if "code" in observation:
            _display = observation.code.coding[0]
            flat_observation += f"{_display['display']} "
        else:
            _logger.info(f"Code not found for observation {observation.id}")
            flat_observation += "Observation "
        if "effectiveDateTime" in observation:
            flat_observation += (
                f"recorded {self.get_timeago(observation.effectiveDateTime)} was "
            )
        else:
            _logger.info(f"Effective date not found for observation {observation.id}")
            flat_observation += "of unknown date was "
        if "valueQuantity" in observation and "value" in observation.valueQuantity:
            flat_observation += f"Value: {observation.valueQuantity.value} "
            if "unit" in observation.valueQuantity:
                flat_observation += f"{observation.valueQuantity.unit}. "
        elif "valueString" in observation:
            flat_observation += f"Value: {observation.valueString}. "
        elif "valueBoolean" in observation:
            flat_observation += f"Value: {observation.valueBoolean}. "
        elif (
            "valueRange" in observation
            and "low" in observation.valueRange
            and "high" in observation.valueRange
        ):
            flat_observation += f"Value: {observation.valueRange.low.value} - {observation.valueRange.high.value} {observation.valueRange.low.unit}. "
        elif (
            "valueRatio" in observation
            and "numerator" in observation.valueRatio
            and "denominator" in observation.valueRatio
        ):
            flat_observation += f"Value: {observation.valueRatio.numerator.value} {observation.valueRatio.numerator.unit} / {observation.valueRatio.denominator.value} {observation.valueRatio.denominator.unit}. "
        elif (
            "valuePeriod" in observation
            and "start" in observation.valuePeriod
            and "end" in observation.valuePeriod
        ):
            flat_observation += f"Value: {observation.valuePeriod.start} - {observation.valuePeriod.end}. "
        elif "valueDateTime" in observation and observation.valueDateTime != "":
            flat_observation += f"Value: {observation.valueDateTime}. "
        elif "valueTime" in observation and observation.valueTime != "":
            flat_observation += f"Value: {observation.valueTime}. "
        elif (
            "valueSampledData" in observation and "data" in observation.valueSampledData
        ):
            flat_observation += f"Value: {observation.valueSampledData.data}. "
        else:
            _logger.info(f"Value not found for observation {observation.id}")
            flat_observation += "Value: unknown. "
        try:
            if (
                "interpretation" in observation
                and "coding" in observation.interpretation[0]
            ):
                if "coding" in observation.interpretation[0]:
                    _text = observation.interpretation[0]["coding"][0]
                    flat_observation += f"Interpretation: {_text['display']}. "
        except:
            _logger.info(f"Interpretation not found for observation {observation.id}")
            flat_observation += "Interpretation: unknown. "
        return flat_observation

    def flatten_medication(self, medication) -> str:
        """Flatten a Medication into a short sentence."""
        flat_medication = ""
        if "code" in medication:
            flat_medication += f"{medication.code.coding[0]['display']} "
        else:
            _logger.info(f"Code not found for medication {medication.id}")
            flat_medication += "Medication "
        if "status" in medication:
            flat_medication += f"Status: {medication.status}. "
        else:
            _logger.info(f"Status not found for medication {medication.id}")
            flat_medication += "Status: unknown. "
        return flat_medication

    def flatten_procedure(self, procedure) -> str:
        """Flatten a Procedure into a short sentence."""
        flat_procedure = ""
        if (
            "code" in procedure
            and "coding" in procedure.code
            and "display" in procedure.code.coding[0]
        ):
            flat_procedure += f"{procedure.code.coding[0]['display']} was "
        else:
            _logger.info(f"Code not found for procedure {procedure.id}")
            flat_procedure += "Procedure was"
        if "occurrenceDateTime" in procedure:
            flat_procedure += (
                f"{procedure.status} {self.get_timeago(procedure.occurrenceDateTime)}. "
            )
        elif "occurrencePeriod" in procedure:
            flat_procedure += f"{procedure.status} {self.get_timeago(procedure.occurrencePeriod.start)}. "
        else:
            _logger.info(f"Performed date not found for procedure {procedure.id}")
            flat_procedure += "on unknown date. "
        return flat_procedure

    def flatten_condition(self, condition) -> str:
        """Flatten a Condition into a short sentence."""
        flat_condition = ""
        if "code" in condition:
            flat_condition += f"{condition.code.coding[0]['display']} "
        else:
            _logger.info(f"Code not found for condition {condition.id}")
            flat_condition += "Condition "
        if condition.onsetDateTime:
            flat_condition += (
                f"was diagnosed {self.get_timeago(condition.onsetDateTime)}. "
            )
        else:
            _logger.info(f"Onset date not found for condition {condition.id}")
            flat_condition += "was diagnosed. "
        return flat_condition

    def flatten_allergyintolerance(self, allergyintolerance) -> str:
        """Flatten an AllergyIntolerance into a short sentence."""
        flat_allergyintolerance = ""
        _display = allergyintolerance.code.coding[0]
        if "code" in allergyintolerance and "display" in _display:
            flat_allergyintolerance += f"{_display['display']} "
        else:
            _logger.info(
                f"Code not found for allergyintolerance {allergyintolerance.id}"
            )
            flat_allergyintolerance += "AllergyIntolerance "
        if "onsetDateTime" in allergyintolerance:
            flat_allergyintolerance += f" allergy was reported on {self.get_timeago(allergyintolerance.onsetDateTime)}. "
        else:
            _logger.info(
                f"Onset date not found for allergyintolerance {allergyintolerance.id}"
            )
            flat_allergyintolerance += "allergy reported. "
        return flat_allergyintolerance

    def flatten_documentreference(self, documentreference) -> str:
        """Flatten a DocumentReference into a short sentence."""
        flat_documentreference = ""
        for content in documentreference.content:
            content = Prodict.from_dict(content)
            if content.attachment.contentType == "text/plain":
                flat_documentreference += (
                    f"{content.attachment.title}: {content.attachment.data}"
                )
            else:
                _logger.info(
                    f"Attachment for documentreference {documentreference.id} is not text/plain."
                )
        if "date" in documentreference:
            flat_documentreference += (
                f" was created {self.get_timeago(documentreference.date)}. "
            )
        else:
            _logger.info(f"Date not found for documentreference {documentreference.id}")
            flat_documentreference += " was created. "
        return flat_documentreference

fhirobject property writable

Prodict: The current FHIR object as Prodict.

flattened property

str: The last flattened output string.

flatten()

Compute the flattened text for the current FHIR object.

Returns:

Name Type Description
str

The flattened string.

Source code in src/fhiry/flattenfhir.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def flatten(self):
    """Compute the flattened text for the current FHIR object.

    Returns:
        str: The flattened string.
    """
    if not self._fhirobject:
        _logger.info("FHIR object is not set.")
        raise ValueError("FHIR object is not set.")
    self._flattened = ""
    if self._fhirobject.resourceType == "Bundle":
        for entry in self._fhirobject.entry:
            _entry = Prodict.from_dict(entry)
            self.get_flattened_text(_entry.resource)
    else:
        self.get_flattened_text(self._fhirobject)
    return self._flattened

flatten_allergyintolerance(allergyintolerance)

Flatten an AllergyIntolerance into a short sentence.

Source code in src/fhiry/flattenfhir.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def flatten_allergyintolerance(self, allergyintolerance) -> str:
    """Flatten an AllergyIntolerance into a short sentence."""
    flat_allergyintolerance = ""
    _display = allergyintolerance.code.coding[0]
    if "code" in allergyintolerance and "display" in _display:
        flat_allergyintolerance += f"{_display['display']} "
    else:
        _logger.info(
            f"Code not found for allergyintolerance {allergyintolerance.id}"
        )
        flat_allergyintolerance += "AllergyIntolerance "
    if "onsetDateTime" in allergyintolerance:
        flat_allergyintolerance += f" allergy was reported on {self.get_timeago(allergyintolerance.onsetDateTime)}. "
    else:
        _logger.info(
            f"Onset date not found for allergyintolerance {allergyintolerance.id}"
        )
        flat_allergyintolerance += "allergy reported. "
    return flat_allergyintolerance

flatten_condition(condition)

Flatten a Condition into a short sentence.

Source code in src/fhiry/flattenfhir.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def flatten_condition(self, condition) -> str:
    """Flatten a Condition into a short sentence."""
    flat_condition = ""
    if "code" in condition:
        flat_condition += f"{condition.code.coding[0]['display']} "
    else:
        _logger.info(f"Code not found for condition {condition.id}")
        flat_condition += "Condition "
    if condition.onsetDateTime:
        flat_condition += (
            f"was diagnosed {self.get_timeago(condition.onsetDateTime)}. "
        )
    else:
        _logger.info(f"Onset date not found for condition {condition.id}")
        flat_condition += "was diagnosed. "
    return flat_condition

flatten_documentreference(documentreference)

Flatten a DocumentReference into a short sentence.

Source code in src/fhiry/flattenfhir.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def flatten_documentreference(self, documentreference) -> str:
    """Flatten a DocumentReference into a short sentence."""
    flat_documentreference = ""
    for content in documentreference.content:
        content = Prodict.from_dict(content)
        if content.attachment.contentType == "text/plain":
            flat_documentreference += (
                f"{content.attachment.title}: {content.attachment.data}"
            )
        else:
            _logger.info(
                f"Attachment for documentreference {documentreference.id} is not text/plain."
            )
    if "date" in documentreference:
        flat_documentreference += (
            f" was created {self.get_timeago(documentreference.date)}. "
        )
    else:
        _logger.info(f"Date not found for documentreference {documentreference.id}")
        flat_documentreference += " was created. "
    return flat_documentreference

flatten_medication(medication)

Flatten a Medication into a short sentence.

Source code in src/fhiry/flattenfhir.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def flatten_medication(self, medication) -> str:
    """Flatten a Medication into a short sentence."""
    flat_medication = ""
    if "code" in medication:
        flat_medication += f"{medication.code.coding[0]['display']} "
    else:
        _logger.info(f"Code not found for medication {medication.id}")
        flat_medication += "Medication "
    if "status" in medication:
        flat_medication += f"Status: {medication.status}. "
    else:
        _logger.info(f"Status not found for medication {medication.id}")
        flat_medication += "Status: unknown. "
    return flat_medication

flatten_observation(observation)

Flatten an Observation into a short sentence.

Source code in src/fhiry/flattenfhir.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def flatten_observation(self, observation) -> str:
    """Flatten an Observation into a short sentence."""
    flat_observation = ""
    if "code" in observation:
        _display = observation.code.coding[0]
        flat_observation += f"{_display['display']} "
    else:
        _logger.info(f"Code not found for observation {observation.id}")
        flat_observation += "Observation "
    if "effectiveDateTime" in observation:
        flat_observation += (
            f"recorded {self.get_timeago(observation.effectiveDateTime)} was "
        )
    else:
        _logger.info(f"Effective date not found for observation {observation.id}")
        flat_observation += "of unknown date was "
    if "valueQuantity" in observation and "value" in observation.valueQuantity:
        flat_observation += f"Value: {observation.valueQuantity.value} "
        if "unit" in observation.valueQuantity:
            flat_observation += f"{observation.valueQuantity.unit}. "
    elif "valueString" in observation:
        flat_observation += f"Value: {observation.valueString}. "
    elif "valueBoolean" in observation:
        flat_observation += f"Value: {observation.valueBoolean}. "
    elif (
        "valueRange" in observation
        and "low" in observation.valueRange
        and "high" in observation.valueRange
    ):
        flat_observation += f"Value: {observation.valueRange.low.value} - {observation.valueRange.high.value} {observation.valueRange.low.unit}. "
    elif (
        "valueRatio" in observation
        and "numerator" in observation.valueRatio
        and "denominator" in observation.valueRatio
    ):
        flat_observation += f"Value: {observation.valueRatio.numerator.value} {observation.valueRatio.numerator.unit} / {observation.valueRatio.denominator.value} {observation.valueRatio.denominator.unit}. "
    elif (
        "valuePeriod" in observation
        and "start" in observation.valuePeriod
        and "end" in observation.valuePeriod
    ):
        flat_observation += f"Value: {observation.valuePeriod.start} - {observation.valuePeriod.end}. "
    elif "valueDateTime" in observation and observation.valueDateTime != "":
        flat_observation += f"Value: {observation.valueDateTime}. "
    elif "valueTime" in observation and observation.valueTime != "":
        flat_observation += f"Value: {observation.valueTime}. "
    elif (
        "valueSampledData" in observation and "data" in observation.valueSampledData
    ):
        flat_observation += f"Value: {observation.valueSampledData.data}. "
    else:
        _logger.info(f"Value not found for observation {observation.id}")
        flat_observation += "Value: unknown. "
    try:
        if (
            "interpretation" in observation
            and "coding" in observation.interpretation[0]
        ):
            if "coding" in observation.interpretation[0]:
                _text = observation.interpretation[0]["coding"][0]
                flat_observation += f"Interpretation: {_text['display']}. "
    except:
        _logger.info(f"Interpretation not found for observation {observation.id}")
        flat_observation += "Interpretation: unknown. "
    return flat_observation

flatten_patient(patient)

Flatten a Patient into a short sentence.

Parameters:

Name Type Description Default
patient

Patient resource object.

required

Returns:

Name Type Description
str str

Flattened snippet.

Source code in src/fhiry/flattenfhir.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def flatten_patient(self, patient) -> str:
    """Flatten a Patient into a short sentence.

    Args:
        patient: Patient resource object.

    Returns:
        str: Flattened snippet.
    """
    flat_patient = ""
    if "gender" in patient:
        flat_patient += f"Medical record of a {patient.gender} patient "
    else:
        _logger.info(f"Gender not found for patient {patient.id}")
        flat_patient += "Medical record of a patient "
    if "birthDate" in patient:
        flat_patient += f"born {self.get_timeago(patient.birthDate)}. "
    else:
        _logger.info(f"Birthdate not found for patient {patient.id}")
        flat_patient += "of unknown age. "
    return flat_patient

flatten_procedure(procedure)

Flatten a Procedure into a short sentence.

Source code in src/fhiry/flattenfhir.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def flatten_procedure(self, procedure) -> str:
    """Flatten a Procedure into a short sentence."""
    flat_procedure = ""
    if (
        "code" in procedure
        and "coding" in procedure.code
        and "display" in procedure.code.coding[0]
    ):
        flat_procedure += f"{procedure.code.coding[0]['display']} was "
    else:
        _logger.info(f"Code not found for procedure {procedure.id}")
        flat_procedure += "Procedure was"
    if "occurrenceDateTime" in procedure:
        flat_procedure += (
            f"{procedure.status} {self.get_timeago(procedure.occurrenceDateTime)}. "
        )
    elif "occurrencePeriod" in procedure:
        flat_procedure += f"{procedure.status} {self.get_timeago(procedure.occurrencePeriod.start)}. "
    else:
        _logger.info(f"Performed date not found for procedure {procedure.id}")
        flat_procedure += "on unknown date. "
    return flat_procedure

get_flattened_text(entry)

Append flattened text for a single FHIR entry to the buffer.

Parameters:

Name Type Description Default
entry Prodict

A FHIR resource object.

required

Returns:

Name Type Description
str

The updated flattened string.

Source code in src/fhiry/flattenfhir.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_flattened_text(self, entry):
    """Append flattened text for a single FHIR entry to the buffer.

    Args:
        entry (Prodict): A FHIR resource object.

    Returns:
        str: The updated flattened string.
    """
    if entry.resourceType == "Patient":
        self._flattened += self.flatten_patient(entry)
    elif entry.resourceType == "Observation":
        self._flattened += self.flatten_observation(entry)
    elif entry.resourceType == "Medication":
        self._flattened += self.flatten_medication(entry)
    elif entry.resourceType == "Procedure":
        self._flattened += self.flatten_procedure(entry)
    elif entry.resourceType == "Condition":
        self._flattened += self.flatten_condition(entry)
    elif entry.resourceType == "AllergyIntolerance":
        self._flattened += self.flatten_allergyintolerance(entry)
    elif entry.resourceType == "DocumentReference":
        self._flattened += self.flatten_documentreference(entry)
    else:
        _logger.info(f"Resource type not supported: {entry.resourceType}")
    return self._flattened

get_timeago(datestring)

Return a human-friendly time-ago string for the given date.

Parameters:

Name Type Description Default
datestring str

ISO-like date string (YYYY-MM-DD...).

required

Returns:

Name Type Description
str str

Human-friendly relative time.

Source code in src/fhiry/flattenfhir.py
 97
 98
 99
100
101
102
103
104
105
106
107
def get_timeago(self, datestring) -> str:
    """Return a human-friendly time-ago string for the given date.

    Args:
        datestring (str): ISO-like date string (YYYY-MM-DD...).

    Returns:
        str: Human-friendly relative time.
    """
    datestring = datestring[0:10]
    return timeago.format(datestring, datetime.datetime.now())

Copyright (c) 2020 Bell Eapen

This software is released under the MIT License. https://opensource.org/licenses/MIT

Fhirndjson

Bases: BaseFhiry

Read and process NDJSON FHIR resources from a folder.

Parameters:

Name Type Description Default
config_json

Optional JSON string or file path with column transforms.

None
Source code in src/fhiry/fhirndjson.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Fhirndjson(BaseFhiry):
    """Read and process NDJSON FHIR resources from a folder.

    Args:
        config_json: Optional JSON string or file path with column transforms.
    """

    def __init__(self, config_json=None):
        self._folder = ""
        super().__init__(config_json=config_json)

    @property
    def df(self):
        """pd.DataFrame | None: The current working dataframe, if any."""
        return self._df

    @property
    def folder(self):
        """str: The folder containing NDJSON files to process."""
        return self._folder

    @folder.setter
    def folder(self, folder):
        """Set the NDJSON input folder.

        Args:
            folder (str): Path to a directory with .ndjson files.
        """
        self._folder = folder

    def read_resource_from_line(self, line):
        """Normalize a single NDJSON line (JSON object) to a dataframe row."""
        return pd.json_normalize(json.loads(line))

    def process_source(self):
        """Process all NDJSON files in the folder into a single dataframe.

        Only columns common across resources will be mapped.
        """
        if self._folder:
            for file in tqdm(os.listdir(self._folder)):
                self.process_file(file)

    def process_file(self, file):
        """Process a single NDJSON file and append its rows to the dataframe.

        Args:
            file (str): Filename within the configured folder to process.

        Returns:
            pd.DataFrame | None: The updated dataframe.
        """
        df = self._df
        if file.endswith(".ndjson"):
            with open(os.path.join(self._folder, file)) as fp:
                Lines = fp.readlines()
                for line in tqdm(Lines):
                    self._df = self.read_resource_from_line(line)
                    self.process_df()
                    df = pd.concat([df, self._df])
        self._df = df
        return self._df

df property

pd.DataFrame | None: The current working dataframe, if any.

folder property writable

str: The folder containing NDJSON files to process.

process_file(file)

Process a single NDJSON file and append its rows to the dataframe.

Parameters:

Name Type Description Default
file str

Filename within the configured folder to process.

required

Returns:

Type Description

pd.DataFrame | None: The updated dataframe.

Source code in src/fhiry/fhirndjson.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def process_file(self, file):
    """Process a single NDJSON file and append its rows to the dataframe.

    Args:
        file (str): Filename within the configured folder to process.

    Returns:
        pd.DataFrame | None: The updated dataframe.
    """
    df = self._df
    if file.endswith(".ndjson"):
        with open(os.path.join(self._folder, file)) as fp:
            Lines = fp.readlines()
            for line in tqdm(Lines):
                self._df = self.read_resource_from_line(line)
                self.process_df()
                df = pd.concat([df, self._df])
    self._df = df
    return self._df

process_source()

Process all NDJSON files in the folder into a single dataframe.

Only columns common across resources will be mapped.

Source code in src/fhiry/fhirndjson.py
51
52
53
54
55
56
57
58
def process_source(self):
    """Process all NDJSON files in the folder into a single dataframe.

    Only columns common across resources will be mapped.
    """
    if self._folder:
        for file in tqdm(os.listdir(self._folder)):
            self.process_file(file)

read_resource_from_line(line)

Normalize a single NDJSON line (JSON object) to a dataframe row.

Source code in src/fhiry/fhirndjson.py
47
48
49
def read_resource_from_line(self, line):
    """Normalize a single NDJSON line (JSON object) to a dataframe row."""
    return pd.json_normalize(json.loads(line))

ndjson(folder, config_json=None)

Process many NDJSON files in parallel.

Parameters:

Name Type Description Default
folder str

Directory path or a single file path.

required
config_json

Optional JSON string or file path with column transforms.

None

Returns:

Type Description

pd.DataFrame: Concatenated dataframe across all processed files.

Source code in src/fhiry/parallel.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def ndjson(folder, config_json=None):
    """Process many NDJSON files in parallel.

    Args:
        folder (str): Directory path or a single file path.
        config_json: Optional JSON string or file path with column transforms.

    Returns:
        pd.DataFrame: Concatenated dataframe across all processed files.
    """
    logger.info("CPU count: {}".format(mp.cpu_count()))
    f = Fhirndjson(config_json=config_json)
    filenames = []

    if os.path.isdir(folder):
        for filename in os.listdir(folder):
            if filename.endswith(".ndjson"):
                filenames.append(folder + "/" + filename)
    else:
        filenames.append(folder)

    with mp.Pool(mp.cpu_count()) as pool:
        list_of_dataframes = list(
            tqdm(
                pool.imap(f.process_file, filenames),
                total=len(filenames),
                desc="Processing NDJSON files",
            )
        )
    return pd.concat(list_of_dataframes)

process(folder, config_json=None)

Process many Bundle JSON files in parallel.

Parameters:

Name Type Description Default
folder str

Directory path or a single file path.

required
config_json

Optional JSON string or file path with column transforms.

None

Returns:

Type Description

pd.DataFrame: Concatenated dataframe across all processed files.

Source code in src/fhiry/parallel.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def process(folder, config_json=None):
    """Process many Bundle JSON files in parallel.

    Args:
        folder (str): Directory path or a single file path.
        config_json: Optional JSON string or file path with column transforms.

    Returns:
        pd.DataFrame: Concatenated dataframe across all processed files.
    """
    logger.info("CPU count: {}".format(mp.cpu_count()))
    f = Fhiry(config_json=config_json)
    filenames = []
    if os.path.isdir(folder):
        for filename in os.listdir(folder):
            if filename.endswith(".json"):
                filenames.append(folder + "/" + filename)
    else:
        filenames.append(folder)

    with mp.Pool(mp.cpu_count()) as pool:
        list_of_dataframes = list(
            tqdm(
                pool.imap(f.process_file, filenames),
                total=len(filenames),
                desc="Processing JSON files",
            )
        )
    return pd.concat(list_of_dataframes)

Copyright (c) 2025 Bell Eapen

This software is released under the MIT License. https://opensource.org/licenses/MIT