Skip to content

Modules

Copyright 2023 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

BaseAgent

Source code in src/dhti_elixir_base/agent.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseAgent:

    class AgentInput(BaseModel):
        """Chat history with the bot."""
        input: str
        model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

    def __init__(
        self,
        name=None,
        description=None,
        llm=None,
        prompt={},
        input_type: type[BaseModel] | None = None,
        prefix=None,
        suffix=None,
        tools: List = [],
        mcp=None,
    ):
        self.llm = llm or get_di("function_llm")
        self.prefix = prefix or get_di("prefix")
        self.suffix = suffix or get_di("suffix")
        self.prompt = prompt or get_di("agent_prompt") or "You are a helpful assistant."
        self.tools = tools
        self._name = (
            name or re.sub(r"(?<!^)(?=[A-Z])", "_", self.__class__.__name__).lower()
        )
        self._description = description or f"Agent for {self._name}"
        # current_patient_context = MessagesPlaceholder(variable_name="current_patient_context")
        # memory = ConversationBufferMemory(memory_key="current_patient_context", return_messages=True)
        self.agent_kwargs = {
            "prefix": self.prefix,
            "suffix": self.suffix,
            # "memory_prompts": [current_patient_context],
            "input_variables": ["input", "agent_scratchpad", "current_patient_context"],
        }
        if input_type is None:
            self.input_type = self.AgentInput
        else:
            self.input_type = input_type
        if mcp is not None:
            self.client = MultiServerMCPClient(mcp)

    @property
    def name(self):
        return self._name

    @property
    def description(self):
        return self._description

    @name.setter
    def name(self, value):
        self._name = value

    @description.setter
    def description(self, value):
        self._description = value

    def get_agent(self):
        if self.llm is None:
            raise ValueError("llm must not be None when initializing the agent.")
        return initialize_agent(
            tools=self.tools,
            llm=self.llm,
            agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
            stop=["\nObservation:"],
            max_iterations=len(self.tools) + 3,
            handle_parsing_errors=True,
            agent_kwargs=self.agent_kwargs,
            verbose=True,
        ).with_types(
            input_type=self.input_type # type: ignore
        )

    def get_react_agent(self):
        if self.llm is None:
            raise ValueError("llm must not be None when initializing the agent.")
        return create_react_agent(
            model=self.llm,
            tools=self.tools,
            prompt=self.prompt,
        ).with_types(
            input_type=self.input_type # type: ignore
        )

    # ! This is currently supported only for models supporting llm.bind_tools. See function return
    def get_agent_prompt(self):
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    "{prefix}"
                    " You have access to the following tools: {tool_names}.\n{system_message}",
                ),
                MessagesPlaceholder(variable_name="messages"),
            ]
        )
        prompt = prompt.partial(prefix=self.prefix)
        prompt = prompt.partial(system_message=self.suffix)
        prompt = prompt.partial(
            tool_names=", ".join([tool.name for tool in self.tools])
        )
        return prompt

    def get_agent_chat_prompt_with_memory(self):
        prompt = ChatPromptTemplate.from_messages(
            [
                ("system", "You are a helpful assistant."),
                # First put the history
                ("placeholder", "{chat_history}"),
                # Then the new input
                ("human", "{input}"),
                # Finally the scratchpad
                ("placeholder", "{agent_scratchpad}"),
            ]
        )

    def langgraph_agent(self):
        """Create an agent."""
        prompt = self.get_agent_prompt()
        if not hasattr(self.llm, "bind_tools"):
            raise ValueError(
                "The LLM does not support binding tools. Please use a compatible LLM."
            )
        return prompt | self.llm.bind_tools(self.tools)  # type: ignore

    def get_langgraph_agent_executor(self):
        """Get the agent executor."""
        if self.llm is None:
            raise ValueError("llm must not be None when initializing the agent executor.")
        agent = create_tool_calling_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.get_agent_prompt(),
        )
        agent_executor = AgentExecutor(agent=agent, tools=self.tools)
        return agent_executor

    def get_langgraph_agent_executor_with_memory(self):
        from langchain_core.chat_history import InMemoryChatMessageHistory
        from langchain_core.runnables.history import RunnableWithMessageHistory
        if self.llm is None:
            raise ValueError(
                "llm must not be None when initializing the agent executor."
            )
        memory = InMemoryChatMessageHistory()
        prompt = ChatPromptTemplate.from_messages(
            [
                ("system", "You are a helpful assistant."),
                # First put the history
                ("placeholder", "{chat_history}"),
                # Then the new input
                ("human", "{input}"),
                # Finally the scratchpad
                ("placeholder", "{agent_scratchpad}"),
            ]
        )
        agent = create_tool_calling_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=prompt,
        )
        agent_executor = AgentExecutor(agent=agent, tools=self.tools)
        return RunnableWithMessageHistory(
            agent_executor,  # type: ignore
            # This is needed because in most real world scenarios, a session id is needed
            # It isn't really used here because we are using a simple in memory ChatMessageHistory
            lambda session_id: memory,
            input_messages_key="input",
            history_messages_key="chat_history",
        )

    async def get_langgraph_mcp_agent(self):
        """Get the agent executor for async execution."""
        if self.llm is None:
            raise ValueError("llm must not be None when initializing the agent executor.")
        if self.client is None:
            raise ValueError("MCP client must not be None when initializing the agent.")
        tools = await self.get_langgraph_mcp_tools()
        agent = create_react_agent(
            model=self.llm,
            tools=tools,
            prompt=self.prompt,
        )
        return agent

    async def get_langgraph_mcp_tools(self, session_name="dhti"):
        """Get the agent executor for async execution with session."""
        if self.client is None:
            raise ValueError("MCP client must not be None when initializing the agent.")
        async with self.client.session(session_name) as session:
            tools = await load_mcp_tools(session)
        return tools

AgentInput

Bases: BaseModel

Chat history with the bot.

Source code in src/dhti_elixir_base/agent.py
33
34
35
36
class AgentInput(BaseModel):
    """Chat history with the bot."""
    input: str
    model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

get_langgraph_agent_executor()

Get the agent executor.

Source code in src/dhti_elixir_base/agent.py
158
159
160
161
162
163
164
165
166
167
168
def get_langgraph_agent_executor(self):
    """Get the agent executor."""
    if self.llm is None:
        raise ValueError("llm must not be None when initializing the agent executor.")
    agent = create_tool_calling_agent(
        llm=self.llm,
        tools=self.tools,
        prompt=self.get_agent_prompt(),
    )
    agent_executor = AgentExecutor(agent=agent, tools=self.tools)
    return agent_executor

get_langgraph_mcp_agent() async

Get the agent executor for async execution.

Source code in src/dhti_elixir_base/agent.py
204
205
206
207
208
209
210
211
212
213
214
215
216
async def get_langgraph_mcp_agent(self):
    """Get the agent executor for async execution."""
    if self.llm is None:
        raise ValueError("llm must not be None when initializing the agent executor.")
    if self.client is None:
        raise ValueError("MCP client must not be None when initializing the agent.")
    tools = await self.get_langgraph_mcp_tools()
    agent = create_react_agent(
        model=self.llm,
        tools=tools,
        prompt=self.prompt,
    )
    return agent

get_langgraph_mcp_tools(session_name='dhti') async

Get the agent executor for async execution with session.

Source code in src/dhti_elixir_base/agent.py
218
219
220
221
222
223
224
async def get_langgraph_mcp_tools(self, session_name="dhti"):
    """Get the agent executor for async execution with session."""
    if self.client is None:
        raise ValueError("MCP client must not be None when initializing the agent.")
    async with self.client.session(session_name) as session:
        tools = await load_mcp_tools(session)
    return tools

langgraph_agent()

Create an agent.

Source code in src/dhti_elixir_base/agent.py
149
150
151
152
153
154
155
156
def langgraph_agent(self):
    """Create an agent."""
    prompt = self.get_agent_prompt()
    if not hasattr(self.llm, "bind_tools"):
        raise ValueError(
            "The LLM does not support binding tools. Please use a compatible LLM."
        )
    return prompt | self.llm.bind_tools(self.tools)  # type: ignore

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

BaseChain

Source code in src/dhti_elixir_base/chain.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@inject
class BaseChain:

    class ChainInput(BaseModel):
        """
        Input model for BaseChain.

        Attributes:
            input (str | CDSHookRequest): The input string or CDSHookRequest object for the chain.
        """

        input: str | CDSHookRequest
        model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

    def __init__(
        self,
        chain=None,
        prompt={},
        name=None,
        description=None,
        main_llm=None,
        clinical_llm=None,
        grounding_llm=None,
        input_type=None,
        output_type=None,
    ):
        self._chain = chain
        self._prompt = prompt or get_di("main_prompt")
        self._main_llm = main_llm or get_di("base_main_llm")
        self._clinical_llm = clinical_llm or get_di("base_clinical_llm")
        self._grounding_llm = grounding_llm or get_di("base_grounding_llm")
        self._input_type = input_type or self.ChainInput
        self._output_type = output_type
        self._name = name
        self._description = description
        self.init_prompt()

    @property
    def chain(self):
        if self._chain is None:
            """Get the runnable chain."""
            """ RunnableParallel / RunnablePassthrough / RunnableSequential / RunnableLambda / RunnableMap / RunnableBranch """
            if self.prompt is None:
                raise ValueError("Prompt must not be None when building the chain.")
            _sequential = (
                RunnablePassthrough()
                | get_context  # function to extract context from input # type: ignore
                | self.prompt  # "{input}""
                | self.main_llm
                | StrOutputParser()
                | add_card  # function to wrap output in CDSHookCard
            )
            chain = _sequential.with_types(input_type=self.input_type)
            return chain

    @property
    def prompt(self):
        return self._prompt

    @property
    def main_llm(self):
        if self._main_llm is None:
            self._main_llm = get_di("base_main_llm")
        return self._main_llm

    @property
    def clinical_llm(self):
        if self._clinical_llm is None:
            self._clinical_llm = get_di("base_clinical_llm")
        return self._clinical_llm

    @property
    def grounding_llm(self):
        if self._grounding_llm is None:
            self._grounding_llm = get_di("base_grounding_llm")
        return self._grounding_llm

    @property
    def input_type(self):
        if self._input_type is None:
            self._input_type = self.ChainInput
        return self._input_type

    @property
    def output_type(self):
        return self._output_type

    @property
    def name(self):
        if self._name is None:
            return re.sub(r"(?<!^)(?=[A-Z])", "_", self.__class__.__name__).lower()

    @property
    def description(self):
        if self._description is None:
            self._description = f"Chain for {self.name}"
        return self._description

    @chain.setter
    def chain(self, value):
        self._chain = value

    @prompt.setter
    def prompt(self, value):
        self._prompt = value
        self.init_prompt()

    @main_llm.setter
    def main_llm(self, value):
        self._main_llm = value

    @clinical_llm.setter
    def clinical_llm(self, value):
        self._clinical_llm = value

    @grounding_llm.setter
    def grounding_llm(self, value):
        self._grounding_llm = value

    @input_type.setter
    def input_type(self, value):
        self._input_type = value

    @output_type.setter
    def output_type(self, value):
        self._output_type = value

    @name.setter
    def name(self, value):
        self._name = value

    @description.setter
    def description(self, value):
        self._description = value

    def invoke(self, **kwargs):
        if self.chain is None:
            raise ValueError("Chain is not initialized.")
        return self.chain.invoke(kwargs)

    def __call__(self, **kwargs):
        return self.invoke(**kwargs)

    @DeprecationWarning
    def get_runnable(self, **kwargs):
        return self.chain

    # * Override these methods in subclasses
    def init_prompt(self):
        pass

    def generate_llm_config(self):
        """
        Generate the configuration schema for the LLM function call.

        Returns:
            dict: A dictionary containing the function schema for the LLM, including name, description, and parameters.
        """
        # Use Pydantic v2 API; `schema()` is deprecated in favor of `model_json_schema()`
        _input_schema = self.input_type.model_json_schema()
        function_schema = {
            "name": (self.name or self.__class__.__name__).lower().replace(" ", "_"),
            "description": self.description,
            "parameters": {
                "type": _input_schema.get("type", "object"),
                "properties": _input_schema.get("properties", {}),
                "required": _input_schema.get("required", []),
            },
        }
        return function_schema

    def get_chain_as_langchain_tool(self):
        """
        Convert the chain to a LangChain StructuredTool.

        Returns:
            StructuredTool: An instance of LangChain StructuredTool wrapping the chain.
        """
        from langchain.tools import StructuredTool

        def _run(**kwargs):
            # Invoke the underlying runnable chain with provided kwargs
            return self.chain.invoke(kwargs)  # type: ignore

        return StructuredTool.from_function(
            func=_run,
            name=self.name or self.__class__.__name__,
            description=self.description or f"Chain for {self.name}",
            args_schema=self.input_type,
        )

    def get_chain_as_mcp_tool(self):
        """
        Convert the chain to an MCP tool using the FastMCP adapter.

        Returns:
            Any: An MCP tool instance wrapping the chain.
        """
        _fast_mcp = to_fastmcp(
            self.get_chain_as_langchain_tool(),
        )
        _fast_mcp.title = self.name or self.__class__.__name__
        return _fast_mcp

ChainInput

Bases: BaseModel

Input model for BaseChain.

Attributes:

Name Type Description
input str | CDSHookRequest

The input string or CDSHookRequest object for the chain.

Source code in src/dhti_elixir_base/chain.py
35
36
37
38
39
40
41
42
43
44
class ChainInput(BaseModel):
    """
    Input model for BaseChain.

    Attributes:
        input (str | CDSHookRequest): The input string or CDSHookRequest object for the chain.
    """

    input: str | CDSHookRequest
    model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

generate_llm_config()

Generate the configuration schema for the LLM function call.

Returns:

Name Type Description
dict

A dictionary containing the function schema for the LLM, including name, description, and parameters.

Source code in src/dhti_elixir_base/chain.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def generate_llm_config(self):
    """
    Generate the configuration schema for the LLM function call.

    Returns:
        dict: A dictionary containing the function schema for the LLM, including name, description, and parameters.
    """
    # Use Pydantic v2 API; `schema()` is deprecated in favor of `model_json_schema()`
    _input_schema = self.input_type.model_json_schema()
    function_schema = {
        "name": (self.name or self.__class__.__name__).lower().replace(" ", "_"),
        "description": self.description,
        "parameters": {
            "type": _input_schema.get("type", "object"),
            "properties": _input_schema.get("properties", {}),
            "required": _input_schema.get("required", []),
        },
    }
    return function_schema

get_chain_as_langchain_tool()

Convert the chain to a LangChain StructuredTool.

Returns:

Name Type Description
StructuredTool

An instance of LangChain StructuredTool wrapping the chain.

Source code in src/dhti_elixir_base/chain.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def get_chain_as_langchain_tool(self):
    """
    Convert the chain to a LangChain StructuredTool.

    Returns:
        StructuredTool: An instance of LangChain StructuredTool wrapping the chain.
    """
    from langchain.tools import StructuredTool

    def _run(**kwargs):
        # Invoke the underlying runnable chain with provided kwargs
        return self.chain.invoke(kwargs)  # type: ignore

    return StructuredTool.from_function(
        func=_run,
        name=self.name or self.__class__.__name__,
        description=self.description or f"Chain for {self.name}",
        args_schema=self.input_type,
    )

get_chain_as_mcp_tool()

Convert the chain to an MCP tool using the FastMCP adapter.

Returns:

Name Type Description
Any

An MCP tool instance wrapping the chain.

Source code in src/dhti_elixir_base/chain.py
223
224
225
226
227
228
229
230
231
232
233
234
def get_chain_as_mcp_tool(self):
    """
    Convert the chain to an MCP tool using the FastMCP adapter.

    Returns:
        Any: An MCP tool instance wrapping the chain.
    """
    _fast_mcp = to_fastmcp(
        self.get_chain_as_langchain_tool(),
    )
    _fast_mcp.title = self.name or self.__class__.__name__
    return _fast_mcp

Copyright 2024 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

BaseLLM

Bases: LLM

Source code in src/dhti_elixir_base/llm.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class BaseLLM(LLM):

    hosted_url: Optional[str] = Field(
        None, alias="hosted_url"
    )  #! Alias is important when inheriting from LLM
    model_name: Optional[str] = Field(None, alias="model_name")
    params: Mapping[str, Any] = Field(default_factory=dict, alias="params")

    backend: Optional[str] = "dhti"
    temperature: Optional[float] = 0.1
    top_p: Optional[float] = 0.8
    top_k: Optional[int] = 40
    n_batch: Optional[int] = 8
    n_threads: Optional[int] = 4
    n_predict: Optional[int] = 256
    max_output_tokens: Optional[int] = 512
    repeat_last_n: Optional[int] = 64
    repeat_penalty: Optional[float] = 1.18

    def __init__(self, hosted_url: str, model_name: str, **kwargs):
        super().__init__(**kwargs)
        self.hosted_url = hosted_url
        self.model_name = model_name
        self.params = {**self._get_model_default_parameters, **kwargs}

    @property
    def _get_model_default_parameters(self):
        return {
            "max_output_tokens": self.max_output_tokens,
            "n_predict": self.n_predict,
            "top_k": self.top_k,
            "top_p": self.top_p,
            "temperature": self.temperature,
            "n_batch": self.n_batch,
            "repeat_penalty": self.repeat_penalty,
            "repeat_last_n": self.repeat_last_n,
        }

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """
        Get all the identifying parameters
        """
        return {
            "model_name": self.model_name,
            "hosted_url": self.hosted_url,
            "model_parameters": self._get_model_default_parameters,
        }

    @property
    def _llm_type(self) -> str:
        return "dhti"

    @abstractmethod
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[Any] = None,
        **kwargs
    ) -> str:
        """
        Args:
            prompt: The prompt to pass into the model.
            stop: A list of strings to stop generation when encountered
            run_manager: Optional run manager for callbacks and tracing

        Returns:
            The string generated by the model
        """

        pass

BaseMCPServer

Bases: FastMCP

Base class for MCP servers, extending FastMCP for custom functionality.

Source code in src/dhti_elixir_base/mcp.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class BaseMCPServer(FastMCP):
    """Base class for MCP servers, extending FastMCP for custom functionality."""

    def __init__(self, name: str | None = None):
        self._name = name or "BaseMCPServer"
        super().__init__(name=self._name)

    @property
    def name(self):
        """Return the name of this MCP server instance."""
        return self._name

name property

Return the name of this MCP server instance.

BaseDhtiModel

Bases: ABC

A model class to lead the model and tokenizer

Source code in src/dhti_elixir_base/model.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class BaseDhtiModel(ABC):
    """A model class to lead the model and tokenizer"""

    model: Any = None

    def __init__(
        self,
        model: Any,
    ) -> None:
        self.model = model

    @classmethod
    @abstractmethod
    def load(cls) -> None:
        if cls.model is None:
            log.info("Loading model")
            t0 = perf_counter()
            # Load the model here
            elapsed = 1000 * (perf_counter() - t0)
            log.info("Model warm-up time: %d ms.", elapsed)
        else:
            log.info("Model is already loaded")

    @classmethod
    @abstractmethod
    def predict(cls, input: Any, **kwargs) -> Any:
        assert input is not None and cls.model is not None  # Sanity check

        # Make sure the model is loaded.
        cls.load()
        t0 = perf_counter()
        # Predict here
        elapsed = 1000 * (perf_counter() - t0)
        log.info("Model prediction time: %d ms.", elapsed)
        return None

BaseServer

Bases: ABC

A server class to load the model and tokenizer

Source code in src/dhti_elixir_base/server.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class BaseServer(ABC):
    """A server class to load the model and tokenizer"""

    class RequestSchema(BaseModel):
        text: str = Field()
        labels: list = Field()
        required: list = Field()

    class ResponseSchema(BaseModel):
        text: str = Field()

    request_schema = RequestSchema
    response_schema = ResponseSchema

    def __init__(
        self, model: BaseDhtiModel, request_schema: Any = None, response_schema: Any = None
    ) -> None:
        self.model = model
        if request_schema is not None:
            self.request_schema = request_schema
        if response_schema is not None:
            self.response_schema = response_schema

    @property
    def name(self):
        return re.sub(r"(?<!^)(?=[A-Z])", "_", self.__class__.__name__).lower()

    def health_check(self) -> Any:
        """Health check endpoint"""
        self.model.load()
        return {"status": "ok"}

    def get_schema(self) -> Any:
        """Get the request schema"""
        return self.request_schema

    def predict(self, input: Any, **kwargs) -> Any:
        _input = self.request_schema(**input)  # type: ignore
        _result = self.model.predict(_input, **kwargs)
        result = self.response_schema(**_result)  # type: ignore
        return result

get_schema()

Get the request schema

Source code in src/dhti_elixir_base/server.py
48
49
50
def get_schema(self) -> Any:
    """Get the request schema"""
    return self.request_schema

health_check()

Health check endpoint

Source code in src/dhti_elixir_base/server.py
43
44
45
46
def health_check(self) -> Any:
    """Health check endpoint"""
    self.model.load()
    return {"status": "ok"}

BaseSpace

Bases: Agent

Source code in src/dhti_elixir_base/space.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class BaseSpace(Agent):

    from typing import Optional

    def __init__(self, agent: Optional[BaseAgent] = None, *args, **kwargs):
        if agent:
            self.agent = agent.get_agent()
            super().__init__(id=agent.name, *args, **kwargs)

    @action
    def say(self, content: str, current_patient_context: str = ""):
        """Search for a patient in the FHIR database."""
        #! TODO: Needs bootstrapping here.

        message = {
            "input": content,
            "current_patient_context": current_patient_context,
        }
        response_content = self.agent.invoke(message)
        self.send(
            {
                "to": self.current_message()["from"], # type: ignore
                "action": {
                    "name": "say",
                    "args": {
                        "content": response_content["output"],
                    },
                },
            }
        )
        return True

say(content, current_patient_context='')

Search for a patient in the FHIR database.

Source code in src/dhti_elixir_base/space.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@action
def say(self, content: str, current_patient_context: str = ""):
    """Search for a patient in the FHIR database."""
    #! TODO: Needs bootstrapping here.

    message = {
        "input": content,
        "current_patient_context": current_patient_context,
    }
    response_content = self.agent.invoke(message)
    self.send(
        {
            "to": self.current_message()["from"], # type: ignore
            "action": {
                "name": "say",
                "args": {
                    "content": response_content["output"],
                },
            },
        }
    )
    return True

Pydantic Model for CDS Hook Card

Example:

{ "summary": "Patient is at high risk for opioid overdose.", "detail": "According to CDC guidelines, the patient's opioid dosage should be tapered to less than 50 MME. Link to CDC Guideline", "indicator": "warning", "source": { "label": "CDC Opioid Prescribing Guidelines", "url": "https://www.cdc.gov/drugoverdose/prescribing/guidelines.html", "icon": "https://example.org/img/cdc-icon.png" }, "links": [ { "label": "View MME Conversion Table", "url": "https://www.cdc.gov/drugoverdose/prescribing/mme.html" } ] }

CDSHookCard

Bases: BaseModel

CDS Hook Card Model

Source code in src/dhti_elixir_base/cds_hook/card.py
38
39
40
41
42
43
44
class CDSHookCard(BaseModel):
    """CDS Hook Card Model"""
    summary: str
    detail: Optional[str] = None
    indicator: Optional[Literal["info", "warning", "hard-stop"]] = None
    source: Optional[CDSHookCardSource] = None
    links: Optional[List[CDSHookCardLink]] = None

Bases: BaseModel

Link associated with the CDS Hook Card

Source code in src/dhti_elixir_base/cds_hook/card.py
33
34
35
36
class CDSHookCardLink(BaseModel):
    """Link associated with the CDS Hook Card"""
    label: str
    url: str

CDSHookCardSource

Bases: BaseModel

Source of the CDS Hook Card

Source code in src/dhti_elixir_base/cds_hook/card.py
27
28
29
30
31
class CDSHookCardSource(BaseModel):
    """Source of the CDS Hook Card"""
    label: str
    url: Optional[str] = None
    icon: Optional[str] = None

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

add_card(output, input=[])

Add a CDSHookCard to the output list.

Source code in src/dhti_elixir_base/cds_hook/generate_cards.py
20
21
22
23
24
25
26
27
28
def add_card(output: str | CDSHookCard, input=[]) -> dict:
    """Add a CDSHookCard to the output list."""
    if isinstance(output, CDSHookCard):
        input.append(output)
    elif isinstance(output, str):
        input.append(CDSHookCard(summary=output))
    else:
        raise ValueError("Output must be a string or CDSHookCard")
    return {"cards": input}

get_card(output)

Get a CDSHookCard as a dictionary.

Source code in src/dhti_elixir_base/cds_hook/generate_cards.py
30
31
32
33
34
35
36
37
def get_card(output: str | CDSHookCard) -> dict:
    """Get a CDSHookCard as a dictionary."""
    if isinstance(output, CDSHookCard):
        return output.model_dump()
    elif isinstance(output, str):
        return {"cards": [CDSHookCard(summary=output).model_dump()]}
    else:
        raise ValueError("Output must be a string or CDSHookCard")

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

CDS Hook Request Model

Pydantic Model for CDS Hook Request

Example: { "hookInstance": "d1577c69-dfbe-44ad-ba6d-3e05e953b2ea", "fhirServer": "https://example.com/fhir", "fhirAuthorization": { ... }, "hook": "patient-view", "context": { ... }, "prefetch": { ... } }

CDSHookRequest

Bases: BaseModel

CDS Hook Request Model

Source code in src/dhti_elixir_base/cds_hook/request.py
19
20
21
22
23
24
25
26
class CDSHookRequest(BaseModel):
    """CDS Hook Request Model"""
    hookInstance: Optional[str] = None
    fhirServer: Optional[HttpUrl] = None
    fhirAuthorization: Optional[Any] = None
    hook: Optional[str] = None  # e.g., "patient-view", "order-select", etc.
    context: Optional[Any] = None
    prefetch: Optional[Any] = None

Pydantic models for CDS Hook Service

Example: { "services": [ { "hook": "patient-view", "name": "Static CDS Service Example", "description": "An example of a CDS Service that returns a card with SMART app recommendations.", "id": "static-patient-view", "prefetch": { "patientToGreet": "Patient/{{context.patientId}}" } } ] }

CDSHookService

Bases: BaseModel

CDS Hook Service Model

Source code in src/dhti_elixir_base/cds_hook/service.py
23
24
25
26
27
28
29
class CDSHookService(BaseModel):
    """CDS Hook Service Model"""
    hook: str
    name: str
    description: Optional[str] = None
    id: str
    prefetch: Optional[dict] = None

CDSHookServicesResponse

Bases: BaseModel

Response model containing a list of CDS Hook Services

Source code in src/dhti_elixir_base/cds_hook/service.py
31
32
33
class CDSHookServicesResponse(BaseModel):
    """Response model containing a list of CDS Hook Services"""
    services: List[CDSHookService]

DhtiFhirSearch

Source code in src/dhti_elixir_base/fhir/fhir_search.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class DhtiFhirSearch:

    def __init__(self):
        self.fhir_base_url = get_di("fhir_base_url") or "http://hapi.fhir.org/baseR4"
        self.page_size = get_di("fhir_page_size") or 10
        self.requests_kwargs = get_di("fhir_requests_kwargs") or {}
        self.access_token = get_di("fhir_access_token") or ""

    def get_patient_id(self, input):
        # patient_id is the value for key patientId or patient_id or id or PatientId, patientID, PatientID etc
        try:
            patient_id = (
                input.get("patientId")
                or input.get("patient_id")
                or input.get("id")
                or input.get("PatientId")
                or input.get("patientID")
                or input.get("PatientID")
                or input.get("ID")
                or input.get("Id")
                or input.get("patient")
                or input.get("Patient")
                or input.get("subject")
            )
            return patient_id
        except AttributeError:
            return input

    def get_everything_for_patient(self, input={}, fhirpath=None):
        """Fetch all resources related to a specific patient using the $everything operation.
        Args:
            input (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined resources related to the patient.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/fhir+json",
            "Accept": "application/fhir+json",
        }
        everything_url = f"{self.fhir_base_url}/Patient/{patient_id}/$everything"
        r = requests.get(everything_url, headers=headers, **self.requests_kwargs)
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

    def get_conditions_for_patient(self, input={}, fhirpath=None):
        """Fetch all Condition resources related to a specific patient.
        Args:
            input (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined Condition resources related to the patient.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {"Content-Type": "application/fhir+json"}
        search_url = f"{self.fhir_base_url}/Condition"
        search_parameters = {"patient": patient_id, "_count": self.page_size}
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

    def get_observations_for_patient(self, input={}, fhirpath=None):
        """Fetch all Observation resources related to a specific patient.
        Args:
            input (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined Observation resources related to the patient.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {"Content-Type": "application/fhir+json"}
        search_url = f"{self.fhir_base_url}/Observation"
        search_parameters = {"patient": patient_id, "_count": self.page_size}
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

    def get_procedures_for_patient(self, input={}, fhirpath=None):
        """Fetch all Procedure resources related to a specific patient.
        Args:
            input (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined Procedure resources related to the patient.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {"Content-Type": "application/fhir+json"}
        search_url = f"{self.fhir_base_url}/Procedure"
        search_parameters = {"patient": patient_id, "_count": self.page_size}
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

    def get_medication_requests_for_patient(self, input={}, fhirpath=None):
        """Fetch all MedicationRequest resources related to a specific patient.
        Args:
            input (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined MedicationRequest resources related to the patient.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {"Content-Type": "application/fhir+json"}
        search_url = f"{self.fhir_base_url}/MedicationRequest"
        search_parameters = {"patient": patient_id, "_count": self.page_size}
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

    def get_allergy_intolerances_for_patient(self, input={}, fhirpath=None):
        """Fetch all AllergyIntolerance resources related to a specific patient.
        Args:
            input (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined AllergyIntolerance resources related to the patient.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {"Content-Type": "application/fhir+json"}
        search_url = f"{self.fhir_base_url}/AllergyIntolerance"
        search_parameters = {"patient": patient_id, "_count": self.page_size}
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

    def search(self, resource_type="Patient", search_parameters={}, fhirpath=None):
        """Search the FHIR server and return the combined results.

        Args:
            resource_type (str): FHIR resource type to search (e.g., "Patient").
            search_parameters (dict): Query parameters per FHIR spec; _count is
                auto-set to the configured page size if absent.

        Returns:
            dict: Combined search results from the FHIR server.
        """

        headers = {"Content-Type": "application/fhir+json"}

        if "_count" not in search_parameters:
            search_parameters["_count"] = self.page_size

        search_url = f"{self.fhir_base_url}/{resource_type}"
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        if fhirpath:
            return evaluate(r.json(), fhirpath, {})
        return r.json()

get_allergy_intolerances_for_patient(input={}, fhirpath=None)

Fetch all AllergyIntolerance resources related to a specific patient. Args: input (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined AllergyIntolerance resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def get_allergy_intolerances_for_patient(self, input={}, fhirpath=None):
    """Fetch all AllergyIntolerance resources related to a specific patient.
    Args:
        input (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined AllergyIntolerance resources related to the patient.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {"Content-Type": "application/fhir+json"}
    search_url = f"{self.fhir_base_url}/AllergyIntolerance"
    search_parameters = {"patient": patient_id, "_count": self.page_size}
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

get_conditions_for_patient(input={}, fhirpath=None)

Fetch all Condition resources related to a specific patient. Args: input (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined Condition resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def get_conditions_for_patient(self, input={}, fhirpath=None):
    """Fetch all Condition resources related to a specific patient.
    Args:
        input (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined Condition resources related to the patient.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {"Content-Type": "application/fhir+json"}
    search_url = f"{self.fhir_base_url}/Condition"
    search_parameters = {"patient": patient_id, "_count": self.page_size}
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

get_everything_for_patient(input={}, fhirpath=None)

Fetch all resources related to a specific patient using the $everything operation. Args: input (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_everything_for_patient(self, input={}, fhirpath=None):
    """Fetch all resources related to a specific patient using the $everything operation.
    Args:
        input (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined resources related to the patient.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {
        "Authorization": f"Bearer {self.access_token}",
        "Content-Type": "application/fhir+json",
        "Accept": "application/fhir+json",
    }
    everything_url = f"{self.fhir_base_url}/Patient/{patient_id}/$everything"
    r = requests.get(everything_url, headers=headers, **self.requests_kwargs)
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

get_medication_requests_for_patient(input={}, fhirpath=None)

Fetch all MedicationRequest resources related to a specific patient. Args: input (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined MedicationRequest resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_medication_requests_for_patient(self, input={}, fhirpath=None):
    """Fetch all MedicationRequest resources related to a specific patient.
    Args:
        input (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined MedicationRequest resources related to the patient.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {"Content-Type": "application/fhir+json"}
    search_url = f"{self.fhir_base_url}/MedicationRequest"
    search_parameters = {"patient": patient_id, "_count": self.page_size}
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

get_observations_for_patient(input={}, fhirpath=None)

Fetch all Observation resources related to a specific patient. Args: input (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined Observation resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_observations_for_patient(self, input={}, fhirpath=None):
    """Fetch all Observation resources related to a specific patient.
    Args:
        input (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined Observation resources related to the patient.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {"Content-Type": "application/fhir+json"}
    search_url = f"{self.fhir_base_url}/Observation"
    search_parameters = {"patient": patient_id, "_count": self.page_size}
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

get_procedures_for_patient(input={}, fhirpath=None)

Fetch all Procedure resources related to a specific patient. Args: input (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined Procedure resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_procedures_for_patient(self, input={}, fhirpath=None):
    """Fetch all Procedure resources related to a specific patient.
    Args:
        input (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined Procedure resources related to the patient.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {"Content-Type": "application/fhir+json"}
    search_url = f"{self.fhir_base_url}/Procedure"
    search_parameters = {"patient": patient_id, "_count": self.page_size}
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

search(resource_type='Patient', search_parameters={}, fhirpath=None)

Search the FHIR server and return the combined results.

Parameters:

Name Type Description Default
resource_type str

FHIR resource type to search (e.g., "Patient").

'Patient'
search_parameters dict

Query parameters per FHIR spec; _count is auto-set to the configured page size if absent.

{}

Returns:

Name Type Description
dict

Combined search results from the FHIR server.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def search(self, resource_type="Patient", search_parameters={}, fhirpath=None):
    """Search the FHIR server and return the combined results.

    Args:
        resource_type (str): FHIR resource type to search (e.g., "Patient").
        search_parameters (dict): Query parameters per FHIR spec; _count is
            auto-set to the configured page size if absent.

    Returns:
        dict: Combined search results from the FHIR server.
    """

    headers = {"Content-Type": "application/fhir+json"}

    if "_count" not in search_parameters:
        search_parameters["_count"] = self.page_size

    search_url = f"{self.fhir_base_url}/{resource_type}"
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    if fhirpath:
        return evaluate(r.json(), fhirpath, {})
    return r.json()

SmartOnFhirSearch

SMART-on-FHIR backed search helper mirroring DhtiFhirSearch API.

Uses fhirclient's resource model search pattern, e.g.:

    settings = { 'app_id': 'my_web_app', 'api_base': 'https://r4.smarthealthit.org' }
    smart = client.FHIRClient(settings=settings)
    patient = Patient.read('<id>', smart.server)

Each method returns raw JSON like DhtiFhirSearch and optionally applies a FHIRPath expression via fhirpathpy.evaluate.

Source code in src/dhti_elixir_base/fhir/smart_on_fhir.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
class SmartOnFhirSearch:
    """SMART-on-FHIR backed search helper mirroring DhtiFhirSearch API.

    Uses fhirclient's resource model search pattern, e.g.:

            settings = { 'app_id': 'my_web_app', 'api_base': 'https://r4.smarthealthit.org' }
            smart = client.FHIRClient(settings=settings)
            patient = Patient.read('<id>', smart.server)

    Each method returns raw JSON like DhtiFhirSearch and optionally applies a
    FHIRPath expression via fhirpathpy.evaluate.
    """

    def __init__(self):
        app_id = get_di("fhir_app_id") or "my_web_app"
        base_url = get_di("fhir_base_url") or "http://hapi.fhir.org/baseR4"
        token = get_di("fhir_access_token") or ""
        settings = {
            "app_id": app_id,
            "api_base": base_url,
        }
        if token:
            settings["access_token"] = token

        self.smart = client.FHIRClient(settings=settings)
        self.fhir_base_url = base_url
        self.page_size = get_di("fhir_page_size") or 10
        self.requests_kwargs = get_di("fhir_requests_kwargs") or {}
        self.access_token = token
        # OAuth settings (optional)
        self.oauth_token_url = (
            get_di("fhir_oauth_token_url") or get_di("oauth_token_url") or None
        )
        self.oauth_client_id = (
            get_di("fhir_oauth_client_id") or get_di("oauth_client_id") or None
        )
        self.oauth_client_secret = (
            get_di("fhir_oauth_client_secret") or get_di("oauth_client_secret") or None
        )
        self.oauth_scope = get_di("fhir_oauth_scope") or get_di("oauth_scope") or None
        self.oauth_requests_kwargs = get_di("fhir_oauth_requests_kwargs") or {}
        self._token_expires_at = 0  # epoch seconds
        # Ensure any provided token is applied to the fhirclient session
        self._apply_auth_to_server()

    # ------------------------ utils ------------------------
    def _headers(self) -> dict:
        headers = {
            "Content-Type": "application/fhir+json",
            "Accept": "application/fhir+json",
        }
        if self.access_token and self.access_token.strip():
            headers["Authorization"] = f"Bearer {self.access_token}"
        return headers

    def _apply_auth_to_server(self) -> None:
        """Apply Authorization header to the fhirclient server session if possible."""
        try:
            server = getattr(self.smart, "server", None)
            session = getattr(server, "session", None)
            if session is not None and self.access_token:
                session.headers["Authorization"] = f"Bearer {self.access_token}"
        except Exception:
            pass

    def _fetch_token_client_credentials(self) -> None:
        """Fetch OAuth token using client_credentials flow if configured."""
        if not (
            self.oauth_token_url and self.oauth_client_id and self.oauth_client_secret
        ):
            return
        data = {"grant_type": "client_credentials"}
        if self.oauth_scope:
            data["scope"] = self.oauth_scope
        # Use HTTP Basic auth; many servers also accept in-body client credentials
        auth = (self.oauth_client_id, self.oauth_client_secret)
        r = requests.post(
            self.oauth_token_url,
            data=data,
            auth=auth,
            headers={"Accept": "application/json"},
            **self.oauth_requests_kwargs,
        )
        r.raise_for_status()
        payload = r.json() or {}
        token = payload.get("access_token")
        token_type = payload.get("token_type", "Bearer")
        expires_in = payload.get("expires_in", 0)
        if token:
            self.access_token = token if token_type.lower() == "bearer" else token
            # Set a small safety margin of 30 seconds
            import time

            self._token_expires_at = (
                int(time.time()) + int(expires_in) - 30 if expires_in else 0
            )
            self._apply_auth_to_server()

    def _ensure_token(self) -> None:
        """Ensure a valid access token is available and applied."""
        # If we already have a token and no known expiry, assume valid
        if self.access_token and self._token_expires_at == 0:
            self._apply_auth_to_server()
            return
        # If expired or missing, try to fetch
        import time

        now = int(time.time())
        if not self.access_token or (
            self._token_expires_at and now >= self._token_expires_at
        ):
            self._fetch_token_client_credentials()
            self._apply_auth_to_server()

    def _model_class(self, resource_type: str):
        """Resolve a fhirclient model class for a given resource type name.

        Returns None if the module/class cannot be resolved.
        """
        try:
            module_name = resource_type.lower()
            mod = importlib.import_module(f"fhirclient.models.{module_name}")
            return getattr(mod, resource_type)
        except Exception:
            return None

    def get_patient_id(self, input):
        # Same extraction behavior as DhtiFhirSearch
        try:
            patient_id = (
                input.get("patientId")
                or input.get("patient_id")
                or input.get("id")
                or input.get("PatientId")
                or input.get("patientID")
                or input.get("PatientID")
                or input.get("ID")
                or input.get("Id")
                or input.get("patient")
                or input.get("Patient")
                or input.get("subject")
            )
            return patient_id
        except AttributeError:
            return input

    # ---------------------- operations ---------------------
    def get_everything_for_patient(
        self, input: dict | str = {}, fhirpath: str | None = None
    ):
        """Fetch resources related to a patient using $everything operation.

        Returns JSON Bundle like DhtiFhirSearch.
        """
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")

        # Ensure token present for authenticated endpoints
        self._ensure_token()
        # Use explicit HTTP for predictable headers and testing
        path = f"Patient/{patient_id}/$everything"
        url = f"{self.fhir_base_url}/{path}"
        r = requests.get(url, headers=self._headers(), **self.requests_kwargs)
        r.raise_for_status()
        data = r.json()

        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_conditions_for_patient(
        self, input: dict | str = {}, fhirpath: str | None = None
    ):
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = Condition.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_observations_for_patient(
        self, input: dict | str = {}, fhirpath: str | None = None
    ):
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = Observation.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_procedures_for_patient(
        self, input: dict | str = {}, fhirpath: str | None = None
    ):
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = Procedure.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_medication_requests_for_patient(
        self, input: dict | str = {}, fhirpath: str | None = None
    ):
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = MedicationRequest.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_allergy_intolerances_for_patient(
        self, input: dict | str = {}, fhirpath: str | None = None
    ):
        patient_id = self.get_patient_id(input)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = AllergyIntolerance.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def search(
        self,
        resource_type: str = "Patient",
        search_parameters: dict | None = None,
        fhirpath: str | None = None,
    ):
        """Generic search for any resource type.

        Tries to resolve the appropriate fhirclient model class and perform a
        model-based search; if not possible, falls back to an HTTP GET.
        """
        params = dict(search_parameters or {})
        if "_count" not in params:
            params["_count"] = self.page_size

        self._ensure_token()
        cls = self._model_class(resource_type)
        data = None
        if cls is not None and hasattr(cls, "where"):
            try:
                bundle = cls.where(struct=params).perform(self.smart.server)
                data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
            except Exception:
                data = None

        if data is None:
            # Fallback to HTTP (works for unknown/extension resource types)
            url = f"{self.fhir_base_url}/{resource_type}"
            r = requests.get(
                url, params=params, headers=self._headers(), **self.requests_kwargs
            )
            r.raise_for_status()
            data = r.json()

        return evaluate(data, fhirpath, {}) if fhirpath else data

get_everything_for_patient(input={}, fhirpath=None)

Fetch resources related to a patient using $everything operation.

Returns JSON Bundle like DhtiFhirSearch.

Source code in src/dhti_elixir_base/fhir/smart_on_fhir.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def get_everything_for_patient(
    self, input: dict | str = {}, fhirpath: str | None = None
):
    """Fetch resources related to a patient using $everything operation.

    Returns JSON Bundle like DhtiFhirSearch.
    """
    patient_id = self.get_patient_id(input)
    if not patient_id:
        raise ValueError("Patient ID is required.")

    # Ensure token present for authenticated endpoints
    self._ensure_token()
    # Use explicit HTTP for predictable headers and testing
    path = f"Patient/{patient_id}/$everything"
    url = f"{self.fhir_base_url}/{path}"
    r = requests.get(url, headers=self._headers(), **self.requests_kwargs)
    r.raise_for_status()
    data = r.json()

    return evaluate(data, fhirpath, {}) if fhirpath else data

search(resource_type='Patient', search_parameters=None, fhirpath=None)

Generic search for any resource type.

Tries to resolve the appropriate fhirclient model class and perform a model-based search; if not possible, falls back to an HTTP GET.

Source code in src/dhti_elixir_base/fhir/smart_on_fhir.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def search(
    self,
    resource_type: str = "Patient",
    search_parameters: dict | None = None,
    fhirpath: str | None = None,
):
    """Generic search for any resource type.

    Tries to resolve the appropriate fhirclient model class and perform a
    model-based search; if not possible, falls back to an HTTP GET.
    """
    params = dict(search_parameters or {})
    if "_count" not in params:
        params["_count"] = self.page_size

    self._ensure_token()
    cls = self._model_class(resource_type)
    data = None
    if cls is not None and hasattr(cls, "where"):
        try:
            bundle = cls.where(struct=params).perform(self.smart.server)
            data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        except Exception:
            data = None

    if data is None:
        # Fallback to HTTP (works for unknown/extension resource types)
        url = f"{self.fhir_base_url}/{resource_type}"
        r = requests.get(
            url, params=params, headers=self._headers(), **self.requests_kwargs
        )
        r.raise_for_status()
        data = r.json()

    return evaluate(data, fhirpath, {}) if fhirpath else data