Source code for fhiry.base_fhiry
"""
Copyright (c) 2020 Bell Eapen
This software is released under the MIT License.
https://opensource.org/licenses/MIT
"""
import pandas as pd
import json
[docs]class BaseFhiry(object):
def __init__(self, config_json=None):
self._df = None
# Codes from the FHIR datatype "coding"
# (f.e. element resource.code.coding or element resource.clinicalStatus.coding)
# are extracted to a col "codingcodes"
# (f.e. col resource.code.codingcodes or col resource.clinicalStatus.codingcodes)
# without other for analysis often not needed metadata like f.e. codesystem URI
# or FHIR extensions for coding entries.
# The full / raw object in col "coding" is deleted after this extraction.
# If you want to analyze more than the content of code and display from codings
# (like f.e. different codesystem URIs or further codes in extensions
# in the raw data/object), you can disable deletion of the raw source object "coding"
# (f.e. col "resource.code.coding") by setting property delete_col_raw_coding to False
self._delete_col_raw_coding = True
if config_json is not None:
try:
with open(config_json, 'r') as f: # config_json is a file path
self.config = json.load(f)
except:
self.config = json.loads(config_json) # config_json is a json string
else:
self.config = json.loads('{ "REMOVE": ["resource.text.div"], "RENAME": { "resource.id": "id" } }')
@property
def df(self):
return self._df
@property
def delete_col_raw_coding(self):
return self._delete_col_raw_coding
@delete_col_raw_coding.setter
def delete_col_raw_coding(self, delete_col_raw_coding):
self._delete_col_raw_coding = delete_col_raw_coding
[docs] def read_bundle_from_bundle_dict(self, bundle_dict):
return pd.json_normalize(bundle_dict['entry'])
[docs] def delete_unwanted_cols(self):
for col in self.config['REMOVE']:
if col in self._df.columns:
del self._df[col]
[docs] def rename_cols(self):
self._df.rename(columns=self.config['RENAME'], inplace=True)
[docs] def process_df(self):
self.delete_unwanted_cols()
self.convert_object_to_list()
self.add_patient_id()
self.rename_cols()
[docs] def process_bundle_dict(self, bundle_dict):
self._df = self.read_bundle_from_bundle_dict(bundle_dict)
self.delete_unwanted_cols()
self.convert_object_to_list()
self.add_patient_id()
self.rename_cols()
return self._df
[docs] def convert_object_to_list(self):
"""Convert object to a list of codes
"""
for col in self._df.columns:
if 'coding' in col:
codes = self._df.apply(
lambda x: self.process_list(x[col]), axis=1)
self._df = pd.concat(
[self._df, codes.to_frame(name=col+'codes')], axis=1)
if self._delete_col_raw_coding:
del self._df[col]
if 'display' in col:
codes = self._df.apply(
lambda x: self.process_list(x[col]), axis=1)
self._df = pd.concat(
[self._df, codes.to_frame(name=col+'display')], axis=1)
del self._df[col]
[docs] def add_patient_id(self):
"""Create a patientId column with the resource.id if a Patient resource or with the resource.subject.reference if other resource type
"""
try:
# PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()`
newframe = self._df.copy()
newframe['patientId'] = self._df.apply(lambda x: x['resource.id'] if x['resource.resourceType']
== 'Patient' else self.check_subject_reference(x), axis=1)
self._df = newframe
except:
try:
newframe = self._df.copy()
newframe['patientId'] = self._df.apply(lambda x: x['id'] if x['resourceType']
== 'Patient' else self.check_subject_reference(x), axis=1)
self._df = newframe
except:
pass
[docs] def check_subject_reference(self, row):
try:
return row['resource.subject.reference'].replace('Patient/', '')
except:
return ""
[docs] def get_info(self):
if self._df is None:
return "Dataframe is empty"
return self._df.info()
[docs] def process_list(self, myList):
"""Extracts the codes from a list of objects
Args:
myList (list): A list of objects
Returns:
list: A list of codes
"""
myCodes = []
if isinstance(myList, list):
for entry in myList:
if 'code' in entry:
myCodes.append(entry['code'])
elif 'display' in entry:
myCodes.append(entry['display'])
return myCodes