Skip to content

Modules

Copyright 2023 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

BaseAgent

Source code in src/dhti_elixir_base/agent.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class BaseAgent:

    class AgentInput(BaseModel):
        """Chat history with the bot."""
        input: str
        model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

    def __init__(
        self,
        name=None,
        description=None,
        llm=None,
        prompt=None,
        input_type: type[BaseModel] | None = None,
        tools: list | None = None,
        mcp={
            "mcpx": {
                "transport": "http",
                "url": "http://mcpx:9000/mcp",
            }
        },
    ):
        self.llm = llm or get_di("function_llm")
        self.prompt = prompt or get_di("agent_prompt") or "You are a helpful assistant."
        self.tools = tools if tools is not None else []
        self._name = name or camel_to_snake(self.__class__.__name__)
        self._description = description or f"Agent for {self._name}"
        if input_type is None:
            self.input_type = self.AgentInput
        else:
            self.input_type = input_type
        self.client = MultiServerMCPClient(mcp)

    @property
    def name(self):
        return self._name

    @property
    def description(self):
        return self._description

    @name.setter
    def name(self, value):
        self._name = value

    @description.setter
    def description(self, value):
        self._description = value

    def has_tool(self) -> bool:
        """Check if the agent has any tools."""
        try:
            _tools = asyncio.run(self.client.get_tools())
            return bool(_tools)
        except Exception as e:
            logger.error(f"Error checking tools: {e}")
            return False

    def get_agent_response(self, context: str) -> str:
        if self.llm is None:
            raise ValueError("llm must not be None when initializing the agent.")
        result = "Agent encountered an error while processing your request."
        try:
            # if self.tools is an empty list, load tools from MCP
            if not self.tools:
                _tools = asyncio.run(self.client.get_tools())
            else:
                _tools = self.tools
            _agent = create_agent(model=self.llm, tools=_tools, system_prompt=self.prompt)

            result = asyncio.run(_agent.ainvoke(
                {"messages": [{"role": "user", "content": context}]}
            ))
            ai_message = result["messages"][-1].content
            return str(ai_message)
        except Exception as e:
            logger.error(f"Error in agent processing: {e}")
            return str(result)

    async def get_langgraph_mcp_agent(self):
        """Get the agent executor for async execution."""
        if self.llm is None:
            raise ValueError("llm must not be None when initializing the agent executor.")
        if self.client is None:
            raise ValueError("MCP client must not be None when initializing the agent.")
        tools = await self.get_langgraph_mcp_tools()
        agent = create_agent(
            model=self.llm,
            tools=tools,
            system_prompt=self.prompt,
        )
        return agent

    async def get_langgraph_mcp_tools(self, session_name="dhti"):
        """Get the agent executor for async execution with session."""
        if self.client is None:
            raise ValueError("MCP client must not be None when initializing the agent.")
        async with self.client.session(session_name) as session:
            tools = await load_mcp_tools(session)
        return tools

AgentInput

Bases: BaseModel

Chat history with the bot.

Source code in src/dhti_elixir_base/agent.py
31
32
33
34
class AgentInput(BaseModel):
    """Chat history with the bot."""
    input: str
    model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

get_langgraph_mcp_agent() async

Get the agent executor for async execution.

Source code in src/dhti_elixir_base/agent.py
108
109
110
111
112
113
114
115
116
117
118
119
120
async def get_langgraph_mcp_agent(self):
    """Get the agent executor for async execution."""
    if self.llm is None:
        raise ValueError("llm must not be None when initializing the agent executor.")
    if self.client is None:
        raise ValueError("MCP client must not be None when initializing the agent.")
    tools = await self.get_langgraph_mcp_tools()
    agent = create_agent(
        model=self.llm,
        tools=tools,
        system_prompt=self.prompt,
    )
    return agent

get_langgraph_mcp_tools(session_name='dhti') async

Get the agent executor for async execution with session.

Source code in src/dhti_elixir_base/agent.py
122
123
124
125
126
127
128
async def get_langgraph_mcp_tools(self, session_name="dhti"):
    """Get the agent executor for async execution with session."""
    if self.client is None:
        raise ValueError("MCP client must not be None when initializing the agent.")
    async with self.client.session(session_name) as session:
        tools = await load_mcp_tools(session)
    return tools

has_tool()

Check if the agent has any tools.

Source code in src/dhti_elixir_base/agent.py
78
79
80
81
82
83
84
85
def has_tool(self) -> bool:
    """Check if the agent has any tools."""
    try:
        _tools = asyncio.run(self.client.get_tools())
        return bool(_tools)
    except Exception as e:
        logger.error(f"Error checking tools: {e}")
        return False

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

BaseChain

Source code in src/dhti_elixir_base/chain.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@inject
class BaseChain:

    class ChainInput(BaseModel):
        """
        Input model for BaseChain.

        Attributes:
            input (Any): The input string or CDSHookRequest object for the chain.
        """

        input: Any
        model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

    def __init__(
        self,
        prompt=None,
        name=None,
        description=None,
        main_llm=None,
        clinical_llm=None,
        grounding_llm=None,
        input_type=None,
        output_type=None,
    ):
        self._prompt = prompt or get_di("main_prompt")
        self._main_llm = main_llm or get_di("base_main_llm")
        self._clinical_llm = clinical_llm or get_di("base_clinical_llm")
        self._grounding_llm = grounding_llm or get_di("base_grounding_llm")
        self._input_type = input_type or self.ChainInput
        self._output_type = output_type
        self._name = name
        self._description = description
        self.init_prompt()

    @property
    def chain(self):
        """Get the runnable chain.

        Example usage of an agent in the chain:
        BaseAgent takes llm, prompt, tools as input. If tools is not provided, it loads tools from MCP. default llm is function_llm from DI.
        Default prompt is "You are a helpful assistant."
        self.my_agent = BaseAgent().get_agent_response # in __init__
        _chain = (
            RunnablePassthrough()
            | get_string_message_to_agent
            | self.my_agent
            | StrOutputParser()
        )

        RunnableParallel / RunnablePassthrough / RunnableSequential / RunnableLambda / RunnableMap / RunnableBranch
        """
        if self.prompt is None:
            raise ValueError("Prompt must not be None when building the chain.")
        _sequential = (
            RunnablePassthrough()
            | get_context  # function to extract context from input # type: ignore
            | self.prompt  # "{input}""
            | self.main_llm
            | StrOutputParser()
            | add_card  # function to wrap output in CDSHookCard
        )
        chain = _sequential.with_types(input_type=self.input_type)
        return chain

    @property
    def prompt(self):
        return self._prompt

    @property
    def main_llm(self):
        if self._main_llm is None:
            self._main_llm = get_di("base_main_llm")
        return self._main_llm

    @property
    def clinical_llm(self):
        if self._clinical_llm is None:
            self._clinical_llm = get_di("base_clinical_llm")
        return self._clinical_llm

    @property
    def grounding_llm(self):
        if self._grounding_llm is None:
            self._grounding_llm = get_di("base_grounding_llm")
        return self._grounding_llm

    @property
    def input_type(self):
        if self._input_type is None:
            self._input_type = self.ChainInput
        return self._input_type

    @property
    def output_type(self):
        return self._output_type

    @property
    def name(self):
        if self._name is None:
            return camel_to_snake(self.__class__.__name__)

    @property
    def description(self):
        if self._description is None:
            self._description = f"Chain for {self.name}"
        return self._description

    @prompt.setter
    def prompt(self, value):
        self._prompt = value
        self.init_prompt()

    @main_llm.setter
    def main_llm(self, value):
        self._main_llm = value

    @clinical_llm.setter
    def clinical_llm(self, value):
        self._clinical_llm = value

    @grounding_llm.setter
    def grounding_llm(self, value):
        self._grounding_llm = value

    @input_type.setter
    def input_type(self, value):
        self._input_type = value

    @output_type.setter
    def output_type(self, value):
        self._output_type = value

    @name.setter
    def name(self, value):
        self._name = value

    @description.setter
    def description(self, value):
        self._description = value

    def invoke(self, **kwargs):
        if self.chain is None:
            raise ValueError("Chain is not initialized.")
        return self.chain.invoke(kwargs)

    def __call__(self, **kwargs):
        return self.invoke(**kwargs)

    @DeprecationWarning
    def get_runnable(self, **kwargs):
        return self.chain

    # * Override these methods in subclasses
    def init_prompt(self):
        pass

    def generate_llm_config(self):
        """
        Generate the configuration schema for the LLM function call.

        Returns:
            dict: A dictionary containing the function schema for the LLM, including name, description, and parameters.
        """
        # Use Pydantic v2 API; `schema()` is deprecated in favor of `model_json_schema()`
        _input_schema = self.input_type.model_json_schema()
        function_schema = {
            "name": (self.name or self.__class__.__name__).lower().replace(" ", "_"),
            "description": self.description,
            "parameters": {
                "type": _input_schema.get("type", "object"),
                "properties": _input_schema.get("properties", {}),
                "required": _input_schema.get("required", []),
            },
        }
        return function_schema

    def get_chain_as_langchain_tool(self):
        """
        Convert the chain to a LangChain StructuredTool.

        Returns:
            StructuredTool: An instance of LangChain StructuredTool wrapping the chain.
        """

        def _run(**kwargs):
            # Invoke the underlying runnable chain with provided kwargs
            return self.chain.invoke(kwargs)  # type: ignore

        return StructuredTool.from_function(
            func=_run,
            name=self.name or self.__class__.__name__,
            description=self.description or f"Chain for {self.name}",
            args_schema=self.input_type,
        )

    def get_chain_as_mcp_tool(self):
        """
        Convert the chain to an MCP tool using the FastMCP adapter.

        Returns:
            Any: An MCP tool instance wrapping the chain.
        """
        _fast_mcp = to_fastmcp(
            self.get_chain_as_langchain_tool(),
        )
        _fast_mcp.title = self.name or self.__class__.__name__
        return _fast_mcp

    def print_log(self, message):
        logger.info(message)
        return message

chain property

Get the runnable chain.

Example usage of an agent in the chain: BaseAgent takes llm, prompt, tools as input. If tools is not provided, it loads tools from MCP. default llm is function_llm from DI. Default prompt is "You are a helpful assistant." self.my_agent = BaseAgent().get_agent_response # in init _chain = ( RunnablePassthrough() | get_string_message_to_agent | self.my_agent | StrOutputParser() )

RunnableParallel / RunnablePassthrough / RunnableSequential / RunnableLambda / RunnableMap / RunnableBranch

ChainInput

Bases: BaseModel

Input model for BaseChain.

Attributes:

Name Type Description
input Any

The input string or CDSHookRequest object for the chain.

Source code in src/dhti_elixir_base/chain.py
36
37
38
39
40
41
42
43
44
45
class ChainInput(BaseModel):
    """
    Input model for BaseChain.

    Attributes:
        input (Any): The input string or CDSHookRequest object for the chain.
    """

    input: Any
    model_config = ConfigDict(extra="ignore", arbitrary_types_allowed=True)

generate_llm_config()

Generate the configuration schema for the LLM function call.

Returns:

Name Type Description
dict

A dictionary containing the function schema for the LLM, including name, description, and parameters.

Source code in src/dhti_elixir_base/chain.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def generate_llm_config(self):
    """
    Generate the configuration schema for the LLM function call.

    Returns:
        dict: A dictionary containing the function schema for the LLM, including name, description, and parameters.
    """
    # Use Pydantic v2 API; `schema()` is deprecated in favor of `model_json_schema()`
    _input_schema = self.input_type.model_json_schema()
    function_schema = {
        "name": (self.name or self.__class__.__name__).lower().replace(" ", "_"),
        "description": self.description,
        "parameters": {
            "type": _input_schema.get("type", "object"),
            "properties": _input_schema.get("properties", {}),
            "required": _input_schema.get("required", []),
        },
    }
    return function_schema

get_chain_as_langchain_tool()

Convert the chain to a LangChain StructuredTool.

Returns:

Name Type Description
StructuredTool

An instance of LangChain StructuredTool wrapping the chain.

Source code in src/dhti_elixir_base/chain.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_chain_as_langchain_tool(self):
    """
    Convert the chain to a LangChain StructuredTool.

    Returns:
        StructuredTool: An instance of LangChain StructuredTool wrapping the chain.
    """

    def _run(**kwargs):
        # Invoke the underlying runnable chain with provided kwargs
        return self.chain.invoke(kwargs)  # type: ignore

    return StructuredTool.from_function(
        func=_run,
        name=self.name or self.__class__.__name__,
        description=self.description or f"Chain for {self.name}",
        args_schema=self.input_type,
    )

get_chain_as_mcp_tool()

Convert the chain to an MCP tool using the FastMCP adapter.

Returns:

Name Type Description
Any

An MCP tool instance wrapping the chain.

Source code in src/dhti_elixir_base/chain.py
229
230
231
232
233
234
235
236
237
238
239
240
def get_chain_as_mcp_tool(self):
    """
    Convert the chain to an MCP tool using the FastMCP adapter.

    Returns:
        Any: An MCP tool instance wrapping the chain.
    """
    _fast_mcp = to_fastmcp(
        self.get_chain_as_langchain_tool(),
    )
    _fast_mcp.title = self.name or self.__class__.__name__
    return _fast_mcp

Copyright 2024 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

BaseLLM

Bases: LLM

Source code in src/dhti_elixir_base/llm.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class BaseLLM(LLM):

    base_url: str | None = Field(
        None, alias="base_url"
    )  #! Alias is important when inheriting from LLM
    model: str | None = Field(None, alias="model")
    api_key: str | None = Field(None, alias="api_key")
    params: Mapping[str, Any] = Field(default_factory=dict, alias="params")
    timeout: int = 60
    backend: str | None = "dhti"
    temperature: float | None = 0.1
    top_p: float | None = 0.8
    top_k: int | None = 40
    n_batch: int | None = 8
    n_threads: int | None = 4
    n_predict: int | None = 256
    max_output_tokens: int | None = 512
    repeat_last_n: int | None = 64
    repeat_penalty: float | None = 1.18

    def __init__(self, base_url: str, model: str, **kwargs):
        super().__init__(**kwargs)
        self.base_url = base_url
        self.model = model
        self.params = {**self._get_model_default_parameters, **kwargs}

    @property
    def _get_model_default_parameters(self):
        return {
            "max_output_tokens": self.max_output_tokens,
            "n_predict": self.n_predict,
            "top_k": self.top_k,
            "top_p": self.top_p,
            "temperature": self.temperature,
            "n_batch": self.n_batch,
            "repeat_penalty": self.repeat_penalty,
            "repeat_last_n": self.repeat_last_n,
        }

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """
        Get all the identifying parameters
        """
        return {
            "model": self.model,
            "base_url": self.base_url,
            "model_parameters": self._get_model_default_parameters,
        }

    @property
    def _llm_type(self) -> str:
        return "dhti"

    def _prepare_payload(self, prompt: str) -> dict:
        # Basic chat messages wrapper; user prompt placed as single user message
        return {
            "model": self.model,
            "options": self._get_model_default_parameters,
            "messages": [{"role": "user", "content": prompt}],
        }

    def _call(
        self,
        prompt: str,
        stop: list[str] | None = None,
        run_manager: Any | None = None,
        **kwargs,
    ) -> str:
        """
        Args:
            prompt: The prompt to pass into the model.
            stop: A list of strings to stop generation when encountered
            run_manager: Optional run manager for callbacks and tracing

        Returns:
            The string generated by the model
        """

        payload = self._prepare_payload(prompt)
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
        }
        resp = requests.post(
            self.base_url, headers=headers, json=payload, timeout=self.timeout  # type: ignore
        )
        try:
            resp.raise_for_status()
        except Exception as e:
            raise RuntimeError(
                f"API request failed: {e}; status={resp.status_code}; body={resp.text}"
            )

        data = resp.json()
        # Expecting structure like: { "choices": [ { "message": { "role":"assistant","content":"..." } } ] }
        # Adapt this path if the API differs
        if "choices" in data and len(data["choices"]) > 0:
            choice = data["choices"][0]
            # support both "message" and direct "text"
            text = None
            if (
                isinstance(choice, dict)
                and "message" in choice
                and isinstance(choice["message"], dict)
            ):
                text = choice["message"].get("content")
            elif "text" in choice:
                text = choice.get("text")
            if text is not None:
                return text
        # Fallback: return raw JSON string for debugging
        return json.dumps(data)

BaseMCPServer

Bases: FastMCP

Base class for MCP servers, extending FastMCP for custom functionality.

Source code in src/dhti_elixir_base/mcp.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class BaseMCPServer(FastMCP):
    """Base class for MCP servers, extending FastMCP for custom functionality."""

    def __init__(self, name: str | None = None):
        self._name = name or "BaseMCPServer"
        super().__init__(name=self._name)

    @property
    def name(self):
        """Return the name of this MCP server instance."""
        return self._name

name property

Return the name of this MCP server instance.

BaseDhtiModel

Bases: ABC

A model class to lead the model and tokenizer

Source code in src/dhti_elixir_base/model.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BaseDhtiModel(ABC):
    """A model class to lead the model and tokenizer"""

    model: Any = None

    def __init__(
        self,
        model: Any,
    ) -> None:
        self.model = model

    @classmethod
    @abstractmethod
    def load(cls) -> None:
        if cls.model is None:
            log.info("Loading model")
            t0 = perf_counter()
            # Load the model here
            elapsed = 1000 * (perf_counter() - t0)
            log.info("Model warm-up time: %d ms.", elapsed)
        else:
            log.info("Model is already loaded")

    @classmethod
    @abstractmethod
    def predict(cls, input: Any, **kwargs) -> Any:
        assert input is not None and cls.model is not None  # Sanity check

        # Make sure the model is loaded.
        cls.load()
        t0 = perf_counter()
        # Predict here
        elapsed = 1000 * (perf_counter() - t0)
        log.info("Model prediction time: %d ms.", elapsed)
        return None

camel_to_snake(name)

Convert CamelCase to snake_case using pre-compiled regex for efficiency.

Source code in src/dhti_elixir_base/mydi.py
 9
10
11
def camel_to_snake(name: str) -> str:
    """Convert CamelCase to snake_case using pre-compiled regex for efficiency."""
    return _CAMEL_TO_SNAKE_PATTERN.sub("_", name).lower()

BaseServer

Bases: ABC

A server class to load the model and tokenizer

Source code in src/dhti_elixir_base/server.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class BaseServer(ABC):
    """A server class to load the model and tokenizer"""

    class RequestSchema(BaseModel):
        text: str = Field()
        labels: list = Field()
        required: list = Field()

    class ResponseSchema(BaseModel):
        text: str = Field()

    request_schema = RequestSchema
    response_schema = ResponseSchema

    def __init__(
        self, model: BaseDhtiModel, request_schema: Any = None, response_schema: Any = None
    ) -> None:
        self.model = model
        if request_schema is not None:
            self.request_schema = request_schema
        if response_schema is not None:
            self.response_schema = response_schema

    @property
    def name(self):
        return camel_to_snake(self.__class__.__name__)

    def health_check(self) -> Any:
        """Health check endpoint"""
        self.model.load()
        return {"status": "ok"}

    def get_schema(self) -> Any:
        """Get the request schema"""
        return self.request_schema

    def predict(self, input: Any, **kwargs) -> Any:
        _input = self.request_schema(**input)  # type: ignore
        _result = self.model.predict(_input, **kwargs)
        result = self.response_schema(**_result)  # type: ignore
        return result

get_schema()

Get the request schema

Source code in src/dhti_elixir_base/server.py
48
49
50
def get_schema(self) -> Any:
    """Get the request schema"""
    return self.request_schema

health_check()

Health check endpoint

Source code in src/dhti_elixir_base/server.py
43
44
45
46
def health_check(self) -> Any:
    """Health check endpoint"""
    self.model.load()
    return {"status": "ok"}

Pydantic Model for CDS Hook Card

Example:

{ "summary": "Patient is at high risk for opioid overdose.", "detail": "According to CDC guidelines, the patient's opioid dosage should be tapered to less than 50 MME. Link to CDC Guideline", "indicator": "warning", "source": { "label": "CDC Opioid Prescribing Guidelines", "url": "https://www.cdc.gov/drugoverdose/prescribing/guidelines.html", "icon": "https://example.org/img/cdc-icon.png" }, "links": [ { "label": "View MME Conversion Table", "url": "https://www.cdc.gov/drugoverdose/prescribing/mme.html" } ] }

CDSHookCard

Bases: BaseModel

CDS Hook Card Model

Source code in src/dhti_elixir_base/cds_hook/card.py
40
41
42
43
44
45
46
class CDSHookCard(BaseModel):
    """CDS Hook Card Model"""
    summary: str
    detail: str | None = None
    indicator: Literal["info", "warning", "hard-stop"] | None = None
    source: CDSHookCardSource | None = None
    links: list[CDSHookCardLink] | None = None

Bases: BaseModel

Link associated with the CDS Hook Card

Source code in src/dhti_elixir_base/cds_hook/card.py
35
36
37
38
class CDSHookCardLink(BaseModel):
    """Link associated with the CDS Hook Card"""
    label: str
    url: str

CDSHookCardSource

Bases: BaseModel

Source of the CDS Hook Card

Source code in src/dhti_elixir_base/cds_hook/card.py
29
30
31
32
33
class CDSHookCardSource(BaseModel):
    """Source of the CDS Hook Card"""
    label: str
    url: str | None = None
    icon: str | None = None

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

add_card(output, cards=None)

Add a CDSHookCard to the output list.

Source code in src/dhti_elixir_base/cds_hook/generate_cards.py
20
21
22
23
24
25
26
27
28
29
30
def add_card(output: str | CDSHookCard, cards: list | None = None) -> dict:
    """Add a CDSHookCard to the output list."""
    if cards is None:
        cards = []
    if isinstance(output, CDSHookCard):
        cards.append(output)
    elif isinstance(output, str):
        cards.append(CDSHookCard(summary=output))
    else:
        raise ValueError("Output must be a string or CDSHookCard")
    return {"cards": cards}

get_card(output)

Get a CDSHookCard as a dictionary.

Source code in src/dhti_elixir_base/cds_hook/generate_cards.py
32
33
34
35
36
37
38
39
def get_card(output: str | CDSHookCard) -> dict:
    """Get a CDSHookCard as a dictionary."""
    if isinstance(output, CDSHookCard):
        return output.model_dump()
    elif isinstance(output, str):
        return {"cards": [CDSHookCard(summary=output).model_dump()]}
    else:
        raise ValueError("Output must be a string or CDSHookCard")

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

CDS Hook Request Model

Pydantic Model for CDS Hook Request. Typically context has "patientId" and "input" keys.

Example: { "hookInstance": "d1577c69-dfbe-44ad-ba6d-3e05e953b2ea", "fhirServer": "https://example.com/fhir", "fhirAuthorization": { ... }, "hook": "patient-view", "context": { ... }, "prefetch": { ... } }

CDSHookRequest

Bases: BaseModel

CDS Hook Request Model

Source code in src/dhti_elixir_base/cds_hook/request.py
22
23
24
25
26
27
28
29
class CDSHookRequest(BaseModel):
    """CDS Hook Request Model"""
    hookInstance: str | None = None
    fhirServer: HttpUrl | None = None
    fhirAuthorization: Any | None = None
    hook: str | None = None  # e.g., "patient-view", "order-select", etc.
    context: Any | None = None
    prefetch: Any | None = None

Pydantic models for CDS Hook Service

Example: { "services": [ { "hook": "patient-view", "name": "Static CDS Service Example", "description": "An example of a CDS Service that returns a card with SMART app recommendations.", "id": "static-patient-view", "prefetch": { "patientToGreet": "Patient/{{context.patientId}}" } } ] }

CDSHookService

Bases: BaseModel

CDS Hook Service Model

Source code in src/dhti_elixir_base/cds_hook/service.py
24
25
26
27
28
29
30
class CDSHookService(BaseModel):
    """CDS Hook Service Model"""
    hook: str
    name: str
    description: str | None = None
    id: str
    prefetch: dict | None = None

CDSHookServicesResponse

Bases: BaseModel

Response model containing a list of CDS Hook Services

Source code in src/dhti_elixir_base/cds_hook/service.py
32
33
34
class CDSHookServicesResponse(BaseModel):
    """Response model containing a list of CDS Hook Services"""
    services: list[CDSHookService]

DhtiFhirSearch

Source code in src/dhti_elixir_base/fhir/fhir_search.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class DhtiFhirSearch:

    def __init__(self):
        self.fhir_base_url = get_di("fhir_base_url") or "http://hapi.fhir.org/baseR4"
        self.page_size = get_di("fhir_page_size") or 10
        self.requests_kwargs = get_di("fhir_requests_kwargs") or {}
        self.access_token = get_di("fhir_access_token") or ""

    def get_patient_id(self, input_data):
        # patient_id is the value for key patientId or patient_id or id or PatientId, patientID, PatientID etc
        try:
            patient_id = (
                input_data.get("patientId")
                or input_data.get("patient_id")
                or input_data.get("id")
                or input_data.get("PatientId")
                or input_data.get("patientID")
                or input_data.get("PatientID")
                or input_data.get("ID")
                or input_data.get("Id")
                or input_data.get("patient")
                or input_data.get("Patient")
                or input_data.get("subject")
            )
            return patient_id
        except AttributeError:
            return input_data

    def _search_patient_resources(self, resource_type: str, input_data=None, fhirpath=None):
        """Internal helper to search for patient-related resources.

        This method consolidates the common search logic for patient resources,
        reducing code duplication and improving maintainability.

        Args:
            resource_type (str): FHIR resource type (e.g., "Condition", "Observation").
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.

        Returns:
            dict: Search results from the FHIR server.
        """
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {"Content-Type": "application/fhir+json"}
        search_url = f"{self.fhir_base_url}/{resource_type}"
        search_parameters = {"patient": patient_id, "_count": self.page_size}
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        data = r.json()
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_everything_for_patient(self, input_data=None, fhirpath=None):
        """Fetch all resources related to a specific patient using the $everything operation.
        Args:
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined resources related to the patient.
        """
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        headers = {
            "Authorization": f"Basic {self.access_token}",
            "Content-Type": "application/fhir+json",
            "Accept": "application/fhir+json",
        }
        everything_url = f"{self.fhir_base_url}/Patient/{patient_id}/$everything"
        r = requests.get(everything_url, headers=headers, **self.requests_kwargs)
        r.raise_for_status()
        data = r.json()
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_conditions_for_patient(self, input_data=None, fhirpath=None):
        """Fetch all Condition resources related to a specific patient.
        Args:
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined Condition resources related to the patient.
        """
        return self._search_patient_resources("Condition", input_data, fhirpath)

    def get_observations_for_patient(self, input_data=None, fhirpath=None):
        """Fetch all Observation resources related to a specific patient.
        Args:
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined Observation resources related to the patient.
        """
        return self._search_patient_resources("Observation", input_data, fhirpath)

    def get_procedures_for_patient(self, input_data=None, fhirpath=None):
        """Fetch all Procedure resources related to a specific patient.
        Args:
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined Procedure resources related to the patient.
        """
        return self._search_patient_resources("Procedure", input_data, fhirpath)

    def get_medication_requests_for_patient(self, input_data=None, fhirpath=None):
        """Fetch all MedicationRequest resources related to a specific patient.
        Args:
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined MedicationRequest resources related to the patient.
        """
        return self._search_patient_resources("MedicationRequest", input_data, fhirpath)

    def get_allergy_intolerances_for_patient(self, input_data=None, fhirpath=None):
        """Fetch all AllergyIntolerance resources related to a specific patient.
        Args:
            input_data (dict or str): Input containing patient ID or the patient ID itself.
            fhirpath (str, optional): FHIRPath expression to apply to the results.
        Returns:
            dict: Combined AllergyIntolerance resources related to the patient.
        """
        return self._search_patient_resources("AllergyIntolerance", input_data, fhirpath)

    def search(self, resource_type="Patient", search_parameters=None, fhirpath=None):
        """Search the FHIR server and return the combined results.

        Args:
            resource_type (str): FHIR resource type to search (e.g., "Patient").
            search_parameters (dict): Query parameters per FHIR spec; _count is
                auto-set to the configured page size if absent.

        Returns:
            dict: Combined search results from the FHIR server.
        """
        if search_parameters is None:
            search_parameters = {}

        headers = {"Content-Type": "application/fhir+json"}

        if "_count" not in search_parameters:
            search_parameters["_count"] = self.page_size

        search_url = f"{self.fhir_base_url}/{resource_type}"
        r = requests.get(
            search_url,
            params=search_parameters,
            headers=headers,
            **self.requests_kwargs,
        )
        r.raise_for_status()
        data = r.json()
        return evaluate(data, fhirpath, {}) if fhirpath else data

get_allergy_intolerances_for_patient(input_data=None, fhirpath=None)

Fetch all AllergyIntolerance resources related to a specific patient. Args: input_data (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined AllergyIntolerance resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
131
132
133
134
135
136
137
138
139
def get_allergy_intolerances_for_patient(self, input_data=None, fhirpath=None):
    """Fetch all AllergyIntolerance resources related to a specific patient.
    Args:
        input_data (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined AllergyIntolerance resources related to the patient.
    """
    return self._search_patient_resources("AllergyIntolerance", input_data, fhirpath)

get_conditions_for_patient(input_data=None, fhirpath=None)

Fetch all Condition resources related to a specific patient. Args: input_data (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined Condition resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
91
92
93
94
95
96
97
98
99
def get_conditions_for_patient(self, input_data=None, fhirpath=None):
    """Fetch all Condition resources related to a specific patient.
    Args:
        input_data (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined Condition resources related to the patient.
    """
    return self._search_patient_resources("Condition", input_data, fhirpath)

get_everything_for_patient(input_data=None, fhirpath=None)

Fetch all resources related to a specific patient using the $everything operation. Args: input_data (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_everything_for_patient(self, input_data=None, fhirpath=None):
    """Fetch all resources related to a specific patient using the $everything operation.
    Args:
        input_data (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined resources related to the patient.
    """
    if input_data is None:
        input_data = {}
    patient_id = self.get_patient_id(input_data)
    if not patient_id:
        raise ValueError("Patient ID is required.")
    headers = {
        "Authorization": f"Basic {self.access_token}",
        "Content-Type": "application/fhir+json",
        "Accept": "application/fhir+json",
    }
    everything_url = f"{self.fhir_base_url}/Patient/{patient_id}/$everything"
    r = requests.get(everything_url, headers=headers, **self.requests_kwargs)
    r.raise_for_status()
    data = r.json()
    return evaluate(data, fhirpath, {}) if fhirpath else data

get_medication_requests_for_patient(input_data=None, fhirpath=None)

Fetch all MedicationRequest resources related to a specific patient. Args: input_data (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined MedicationRequest resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
121
122
123
124
125
126
127
128
129
def get_medication_requests_for_patient(self, input_data=None, fhirpath=None):
    """Fetch all MedicationRequest resources related to a specific patient.
    Args:
        input_data (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined MedicationRequest resources related to the patient.
    """
    return self._search_patient_resources("MedicationRequest", input_data, fhirpath)

get_observations_for_patient(input_data=None, fhirpath=None)

Fetch all Observation resources related to a specific patient. Args: input_data (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined Observation resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
101
102
103
104
105
106
107
108
109
def get_observations_for_patient(self, input_data=None, fhirpath=None):
    """Fetch all Observation resources related to a specific patient.
    Args:
        input_data (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined Observation resources related to the patient.
    """
    return self._search_patient_resources("Observation", input_data, fhirpath)

get_procedures_for_patient(input_data=None, fhirpath=None)

Fetch all Procedure resources related to a specific patient. Args: input_data (dict or str): Input containing patient ID or the patient ID itself. fhirpath (str, optional): FHIRPath expression to apply to the results. Returns: dict: Combined Procedure resources related to the patient.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
111
112
113
114
115
116
117
118
119
def get_procedures_for_patient(self, input_data=None, fhirpath=None):
    """Fetch all Procedure resources related to a specific patient.
    Args:
        input_data (dict or str): Input containing patient ID or the patient ID itself.
        fhirpath (str, optional): FHIRPath expression to apply to the results.
    Returns:
        dict: Combined Procedure resources related to the patient.
    """
    return self._search_patient_resources("Procedure", input_data, fhirpath)

search(resource_type='Patient', search_parameters=None, fhirpath=None)

Search the FHIR server and return the combined results.

Parameters:

Name Type Description Default
resource_type str

FHIR resource type to search (e.g., "Patient").

'Patient'
search_parameters dict

Query parameters per FHIR spec; _count is auto-set to the configured page size if absent.

None

Returns:

Name Type Description
dict

Combined search results from the FHIR server.

Source code in src/dhti_elixir_base/fhir/fhir_search.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def search(self, resource_type="Patient", search_parameters=None, fhirpath=None):
    """Search the FHIR server and return the combined results.

    Args:
        resource_type (str): FHIR resource type to search (e.g., "Patient").
        search_parameters (dict): Query parameters per FHIR spec; _count is
            auto-set to the configured page size if absent.

    Returns:
        dict: Combined search results from the FHIR server.
    """
    if search_parameters is None:
        search_parameters = {}

    headers = {"Content-Type": "application/fhir+json"}

    if "_count" not in search_parameters:
        search_parameters["_count"] = self.page_size

    search_url = f"{self.fhir_base_url}/{resource_type}"
    r = requests.get(
        search_url,
        params=search_parameters,
        headers=headers,
        **self.requests_kwargs,
    )
    r.raise_for_status()
    data = r.json()
    return evaluate(data, fhirpath, {}) if fhirpath else data

SmartOnFhirSearch

SMART-on-FHIR backed search helper mirroring DhtiFhirSearch API.

Uses fhirclient's resource model search pattern, e.g.:

    settings = { 'app_id': 'my_web_app', 'api_base': 'https://r4.smarthealthit.org' }
    smart = client.FHIRClient(settings=settings)
    patient = Patient.read('<id>', smart.server)

Each method returns raw JSON like DhtiFhirSearch and optionally applies a FHIRPath expression via fhirpathpy.evaluate.

Source code in src/dhti_elixir_base/fhir/smart_on_fhir.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class SmartOnFhirSearch:
    """SMART-on-FHIR backed search helper mirroring DhtiFhirSearch API.

    Uses fhirclient's resource model search pattern, e.g.:

            settings = { 'app_id': 'my_web_app', 'api_base': 'https://r4.smarthealthit.org' }
            smart = client.FHIRClient(settings=settings)
            patient = Patient.read('<id>', smart.server)

    Each method returns raw JSON like DhtiFhirSearch and optionally applies a
    FHIRPath expression via fhirpathpy.evaluate.
    """

    def __init__(self):
        app_id = get_di("fhir_app_id") or "my_web_app"
        base_url = get_di("fhir_base_url") or "http://hapi.fhir.org/baseR4"
        token = get_di("fhir_access_token") or ""
        settings = {
            "app_id": app_id,
            "api_base": base_url,
        }
        if token:
            settings["access_token"] = token

        self.smart = client.FHIRClient(settings=settings)
        self.fhir_base_url = base_url
        self.page_size = get_di("fhir_page_size") or 10
        self.requests_kwargs = get_di("fhir_requests_kwargs") or {}
        self.access_token = token
        # OAuth settings (optional)
        self.oauth_token_url = (
            get_di("fhir_oauth_token_url") or get_di("oauth_token_url") or None
        )
        self.oauth_client_id = (
            get_di("fhir_oauth_client_id") or get_di("oauth_client_id") or None
        )
        self.oauth_client_secret = (
            get_di("fhir_oauth_client_secret") or get_di("oauth_client_secret") or None
        )
        self.oauth_scope = get_di("fhir_oauth_scope") or get_di("oauth_scope") or None
        self.oauth_requests_kwargs = get_di("fhir_oauth_requests_kwargs") or {}
        self._token_expires_at = 0  # epoch seconds
        # Ensure any provided token is applied to the fhirclient session
        self._apply_auth_to_server()

    # ------------------------ utils ------------------------
    def _headers(self) -> dict:
        headers = {
            "Content-Type": "application/fhir+json",
            "Accept": "application/fhir+json",
        }
        if self.access_token and self.access_token.strip():
            headers["Authorization"] = f"Bearer {self.access_token}"
        return headers

    def _apply_auth_to_server(self) -> None:
        """Apply Authorization header to the fhirclient server session if possible."""
        try:
            server = getattr(self.smart, "server", None)
            session = getattr(server, "session", None)
            if session is not None and self.access_token:
                session.headers["Authorization"] = f"Bearer {self.access_token}"
        except Exception:
            pass

    def _fetch_token_client_credentials(self) -> None:
        """Fetch OAuth token using client_credentials flow if configured."""
        if not (
            self.oauth_token_url and self.oauth_client_id and self.oauth_client_secret
        ):
            return
        data = {"grant_type": "client_credentials"}
        if self.oauth_scope:
            data["scope"] = self.oauth_scope
        # Use HTTP Basic auth; many servers also accept in-body client credentials
        auth = (self.oauth_client_id, self.oauth_client_secret)
        r = requests.post(
            self.oauth_token_url,
            data=data,
            auth=auth,
            headers={"Accept": "application/json"},
            **self.oauth_requests_kwargs,
        )
        r.raise_for_status()
        payload = r.json() or {}
        token = payload.get("access_token")
        token_type = payload.get("token_type", "Bearer")
        expires_in = payload.get("expires_in", 0)
        if token:
            self.access_token = token if token_type.lower() == "bearer" else token
            # Set a small safety margin of 30 seconds
            import time

            self._token_expires_at = (
                int(time.time()) + int(expires_in) - 30 if expires_in else 0
            )
            self._apply_auth_to_server()

    def _ensure_token(self) -> None:
        """Ensure a valid access token is available and applied."""
        # If we already have a token and no known expiry, assume valid
        if self.access_token and self._token_expires_at == 0:
            self._apply_auth_to_server()
            return
        # If expired or missing, try to fetch
        import time

        now = int(time.time())
        if not self.access_token or (
            self._token_expires_at and now >= self._token_expires_at
        ):
            self._fetch_token_client_credentials()
            self._apply_auth_to_server()

    def _model_class(self, resource_type: str):
        """Resolve a fhirclient model class for a given resource type name.

        Returns None if the module/class cannot be resolved.
        """
        try:
            module_name = resource_type.lower()
            mod = importlib.import_module(f"fhirclient.models.{module_name}")
            return getattr(mod, resource_type)
        except Exception:
            return None

    def get_patient_id(self, input):
        # Same extraction behavior as DhtiFhirSearch
        try:
            patient_id = (
                input.get("patientId")
                or input.get("patient_id")
                or input.get("id")
                or input.get("PatientId")
                or input.get("patientID")
                or input.get("PatientID")
                or input.get("ID")
                or input.get("Id")
                or input.get("patient")
                or input.get("Patient")
                or input.get("subject")
            )
            return patient_id
        except AttributeError:
            return input

    # ---------------------- operations ---------------------
    def get_everything_for_patient(
        self, input_data: dict | str | None = None, fhirpath: str | None = None
    ):
        """Fetch resources related to a patient using $everything operation.

        Returns JSON Bundle like DhtiFhirSearch.
        """
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")

        # Ensure token present for authenticated endpoints
        self._ensure_token()
        # Use explicit HTTP for predictable headers and testing
        path = f"Patient/{patient_id}/$everything"
        url = f"{self.fhir_base_url}/{path}"
        r = requests.get(url, headers=self._headers(), **self.requests_kwargs)
        r.raise_for_status()
        data = r.json()

        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_conditions_for_patient(
        self, input_data: dict | str | None = None, fhirpath: str | None = None
    ):
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = Condition.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_observations_for_patient(
        self, input_data: dict | str | None = None, fhirpath: str | None = None
    ):
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = Observation.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_procedures_for_patient(
        self, input_data: dict | str | None = None, fhirpath: str | None = None
    ):
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = Procedure.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_medication_requests_for_patient(
        self, input_data: dict | str | None = None, fhirpath: str | None = None
    ):
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = MedicationRequest.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def get_allergy_intolerances_for_patient(
        self, input_data: dict | str | None = None, fhirpath: str | None = None
    ):
        if input_data is None:
            input_data = {}
        patient_id = self.get_patient_id(input_data)
        if not patient_id:
            raise ValueError("Patient ID is required.")
        self._ensure_token()
        search = AllergyIntolerance.where(
            struct={"patient": patient_id, "_count": self.page_size}
        )
        bundle = search.perform(self.smart.server)
        data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        return evaluate(data, fhirpath, {}) if fhirpath else data

    def search(
        self,
        resource_type: str = "Patient",
        search_parameters: dict | None = None,
        fhirpath: str | None = None,
    ):
        """Generic search for any resource type.

        Tries to resolve the appropriate fhirclient model class and perform a
        model-based search; if not possible, falls back to an HTTP GET.
        """
        params = dict(search_parameters or {})
        if "_count" not in params:
            params["_count"] = self.page_size

        self._ensure_token()
        cls = self._model_class(resource_type)
        data = None
        if cls is not None and hasattr(cls, "where"):
            try:
                bundle = cls.where(struct=params).perform(self.smart.server)
                data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
            except Exception:
                data = None

        if data is None:
            # Fallback to HTTP (works for unknown/extension resource types)
            url = f"{self.fhir_base_url}/{resource_type}"
            r = requests.get(
                url, params=params, headers=self._headers(), **self.requests_kwargs
            )
            r.raise_for_status()
            data = r.json()

        return evaluate(data, fhirpath, {}) if fhirpath else data

get_everything_for_patient(input_data=None, fhirpath=None)

Fetch resources related to a patient using $everything operation.

Returns JSON Bundle like DhtiFhirSearch.

Source code in src/dhti_elixir_base/fhir/smart_on_fhir.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def get_everything_for_patient(
    self, input_data: dict | str | None = None, fhirpath: str | None = None
):
    """Fetch resources related to a patient using $everything operation.

    Returns JSON Bundle like DhtiFhirSearch.
    """
    if input_data is None:
        input_data = {}
    patient_id = self.get_patient_id(input_data)
    if not patient_id:
        raise ValueError("Patient ID is required.")

    # Ensure token present for authenticated endpoints
    self._ensure_token()
    # Use explicit HTTP for predictable headers and testing
    path = f"Patient/{patient_id}/$everything"
    url = f"{self.fhir_base_url}/{path}"
    r = requests.get(url, headers=self._headers(), **self.requests_kwargs)
    r.raise_for_status()
    data = r.json()

    return evaluate(data, fhirpath, {}) if fhirpath else data

search(resource_type='Patient', search_parameters=None, fhirpath=None)

Generic search for any resource type.

Tries to resolve the appropriate fhirclient model class and perform a model-based search; if not possible, falls back to an HTTP GET.

Source code in src/dhti_elixir_base/fhir/smart_on_fhir.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def search(
    self,
    resource_type: str = "Patient",
    search_parameters: dict | None = None,
    fhirpath: str | None = None,
):
    """Generic search for any resource type.

    Tries to resolve the appropriate fhirclient model class and perform a
    model-based search; if not possible, falls back to an HTTP GET.
    """
    params = dict(search_parameters or {})
    if "_count" not in params:
        params["_count"] = self.page_size

    self._ensure_token()
    cls = self._model_class(resource_type)
    data = None
    if cls is not None and hasattr(cls, "where"):
        try:
            bundle = cls.where(struct=params).perform(self.smart.server)
            data = bundle.as_json() if hasattr(bundle, "as_json") else bundle
        except Exception:
            data = None

    if data is None:
        # Fallback to HTTP (works for unknown/extension resource types)
        url = f"{self.fhir_base_url}/{resource_type}"
        r = requests.get(
            url, params=params, headers=self._headers(), **self.requests_kwargs
        )
        r.raise_for_status()
        data = r.json()

    return evaluate(data, fhirpath, {}) if fhirpath else data

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

CrewAIAgentWrapper

Bases: Agent

Wrapper class to make BaseAgent compatible with CrewAI.

This wrapper allows the use of DHTI's BaseAgent within the CrewAI framework by adapting its interface to CrewAI's requirements.

Parameters:

Name Type Description Default
agent BaseAgent

An instance of BaseAgent from dhti_elixir_base

required
role str | None

The role of the agent (optional, uses agent's description if not provided)

None
goal str | None

The goal of the agent (optional, uses agent's name if not provided)

None
backstory str | None

The backstory of the agent (optional)

None
**kwargs Any

Additional keyword arguments passed to the CrewAI Agent

{}
Example
from dhti_elixir_base import BaseAgent
from dhti_elixir_base.crewai import CrewAIAgentWrapper

# Create a DHTI agent instance
dhti_agent = BaseAgent(
    name="medical_assistant",
    description="A medical assistant agent",
    llm=my_llm,
    prompt="You are a helpful medical assistant."
)

# Wrap it for use with CrewAI
crewai_agent = CrewAIAgentWrapper(
    agent=dhti_agent,
    role="Medical Assistant",
    goal="Assist with medical queries"
)
Source code in src/dhti_elixir_base/crewai/agent_wrapper.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class CrewAIAgentWrapper(CrewAIAgent):
    """
    Wrapper class to make BaseAgent compatible with CrewAI.

    This wrapper allows the use of DHTI's BaseAgent within the CrewAI framework
    by adapting its interface to CrewAI's requirements.

    Args:
        agent: An instance of BaseAgent from dhti_elixir_base
        role: The role of the agent (optional, uses agent's description if not provided)
        goal: The goal of the agent (optional, uses agent's name if not provided)
        backstory: The backstory of the agent (optional)
        **kwargs: Additional keyword arguments passed to the CrewAI Agent

    Example:
        ```python
        from dhti_elixir_base import BaseAgent
        from dhti_elixir_base.crewai import CrewAIAgentWrapper

        # Create a DHTI agent instance
        dhti_agent = BaseAgent(
            name="medical_assistant",
            description="A medical assistant agent",
            llm=my_llm,
            prompt="You are a helpful medical assistant."
        )

        # Wrap it for use with CrewAI
        crewai_agent = CrewAIAgentWrapper(
            agent=dhti_agent,
            role="Medical Assistant",
            goal="Assist with medical queries"
        )
        ```
    """

    _dhti_agent: BaseAgent = PrivateAttr()

    def __init__(
        self,
        agent: BaseAgent,
        role: str | None = None,
        goal: str | None = None,
        backstory: str | None = None,
        **kwargs: Any,
    ):
        """
        Initialize the CrewAI Agent wrapper.

        Args:
            agent: An instance of BaseAgent
            role: The role of the agent
            goal: The goal of the agent
            backstory: The backstory of the agent
            **kwargs: Additional keyword arguments
        """
        # Extract information from the DHTI agent
        agent_role = role or agent.description or "Assistant"
        agent_goal = goal or f"Execute tasks related to {agent.name}"
        agent_backstory = backstory or f"An agent specialized in {agent.name} tasks"

        # Wrap the LLM if available
        llm = None
        if agent.llm is not None:
            llm = CrewAILLMWrapper(llm=agent.llm)

        # Convert tools if available
        tools = kwargs.pop("tools", None)
        if tools is None and hasattr(agent, "tools") and agent.tools:
            # Use the agent's tools if available
            tools = agent.tools

        # Validate and wrap tools
        if tools:
            try:
                from crewai.tools import BaseTool as CrewAIBaseTool
            except ImportError:
                from crewai.tools.base_tool import BaseTool as CrewAIBaseTool

            from .langchain_tool_wrapper import CrewAILangChainToolWrapper

            validated_tools = []
            tool_list = tools if isinstance(tools, list) else [tools]
            for tool in tool_list:
                try:
                    # If it's already a BaseTool, use it as is
                    if isinstance(tool, CrewAIBaseTool):
                        validated_tools.append(tool)
                    # Try to wrap as a LangChain tool
                    else:
                        wrapped = CrewAILangChainToolWrapper(langchain_tool=tool)
                        validated_tools.append(wrapped)
                except Exception as e:
                    # Log but skip tools that can't be wrapped
                    pass
            tools = validated_tools if validated_tools else None

        # Initialize CrewAI Agent
        super().__init__(
            role=agent_role,
            goal=agent_goal,
            backstory=agent_backstory,
            llm=llm,
            tools=tools,
            **kwargs,
        )

        # Store the DHTI agent reference
        self._dhti_agent = agent

    def execute_task(self, task: Any, *args: Any, **kwargs: Any) -> str:
        """
        Execute a task using the underlying DHTI agent.

        Args:
            task: The task to execute
            *args: Additional positional arguments
            **kwargs: Additional keyword arguments

        Returns:
            str: The result of the task execution
        """
        # Extract the task context/input
        task_context = (
            str(task) if not hasattr(task, "description") else task.description
        )

        # Use the DHTI agent's response method
        return self._dhti_agent.get_agent_response(task_context)

    def __str__(self) -> str:
        """Return string representation of the wrapper."""
        return f"CrewAIAgentWrapper(agent={self._dhti_agent.name})"

    def __repr__(self) -> str:
        """Return detailed string representation of the wrapper."""
        return self.__str__()

__init__(agent, role=None, goal=None, backstory=None, **kwargs)

Initialize the CrewAI Agent wrapper.

Parameters:

Name Type Description Default
agent BaseAgent

An instance of BaseAgent

required
role str | None

The role of the agent

None
goal str | None

The goal of the agent

None
backstory str | None

The backstory of the agent

None
**kwargs Any

Additional keyword arguments

{}
Source code in src/dhti_elixir_base/crewai/agent_wrapper.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(
    self,
    agent: BaseAgent,
    role: str | None = None,
    goal: str | None = None,
    backstory: str | None = None,
    **kwargs: Any,
):
    """
    Initialize the CrewAI Agent wrapper.

    Args:
        agent: An instance of BaseAgent
        role: The role of the agent
        goal: The goal of the agent
        backstory: The backstory of the agent
        **kwargs: Additional keyword arguments
    """
    # Extract information from the DHTI agent
    agent_role = role or agent.description or "Assistant"
    agent_goal = goal or f"Execute tasks related to {agent.name}"
    agent_backstory = backstory or f"An agent specialized in {agent.name} tasks"

    # Wrap the LLM if available
    llm = None
    if agent.llm is not None:
        llm = CrewAILLMWrapper(llm=agent.llm)

    # Convert tools if available
    tools = kwargs.pop("tools", None)
    if tools is None and hasattr(agent, "tools") and agent.tools:
        # Use the agent's tools if available
        tools = agent.tools

    # Validate and wrap tools
    if tools:
        try:
            from crewai.tools import BaseTool as CrewAIBaseTool
        except ImportError:
            from crewai.tools.base_tool import BaseTool as CrewAIBaseTool

        from .langchain_tool_wrapper import CrewAILangChainToolWrapper

        validated_tools = []
        tool_list = tools if isinstance(tools, list) else [tools]
        for tool in tool_list:
            try:
                # If it's already a BaseTool, use it as is
                if isinstance(tool, CrewAIBaseTool):
                    validated_tools.append(tool)
                # Try to wrap as a LangChain tool
                else:
                    wrapped = CrewAILangChainToolWrapper(langchain_tool=tool)
                    validated_tools.append(wrapped)
            except Exception as e:
                # Log but skip tools that can't be wrapped
                pass
        tools = validated_tools if validated_tools else None

    # Initialize CrewAI Agent
    super().__init__(
        role=agent_role,
        goal=agent_goal,
        backstory=agent_backstory,
        llm=llm,
        tools=tools,
        **kwargs,
    )

    # Store the DHTI agent reference
    self._dhti_agent = agent

__repr__()

Return detailed string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/agent_wrapper.py
160
161
162
def __repr__(self) -> str:
    """Return detailed string representation of the wrapper."""
    return self.__str__()

__str__()

Return string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/agent_wrapper.py
156
157
158
def __str__(self) -> str:
    """Return string representation of the wrapper."""
    return f"CrewAIAgentWrapper(agent={self._dhti_agent.name})"

execute_task(task, *args, **kwargs)

Execute a task using the underlying DHTI agent.

Parameters:

Name Type Description Default
task Any

The task to execute

required
*args Any

Additional positional arguments

()
**kwargs Any

Additional keyword arguments

{}

Returns:

Name Type Description
str str

The result of the task execution

Source code in src/dhti_elixir_base/crewai/agent_wrapper.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def execute_task(self, task: Any, *args: Any, **kwargs: Any) -> str:
    """
    Execute a task using the underlying DHTI agent.

    Args:
        task: The task to execute
        *args: Additional positional arguments
        **kwargs: Additional keyword arguments

    Returns:
        str: The result of the task execution
    """
    # Extract the task context/input
    task_context = (
        str(task) if not hasattr(task, "description") else task.description
    )

    # Use the DHTI agent's response method
    return self._dhti_agent.get_agent_response(task_context)

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

CrewAIChainToolWrapper

Bases: BaseTool

Wrapper class to make BaseChain usable as a Tool within CrewAI.

This wrapper allows the use of DHTI's BaseChain as a tool within the CrewAI framework by adapting its interface to CrewAI's tool requirements.

Parameters:

Name Type Description Default
chain BaseChain

An instance of BaseChain from dhti_elixir_base

required
name str | None

Name of the tool (optional, uses chain's name if not provided)

None
description str | None

Description of the tool (optional, uses chain's description if not provided)

None
**kwargs Any

Additional keyword arguments

{}
Example
from dhti_elixir_base import BaseChain
from dhti_elixir_base.crewai import CrewAIChainToolWrapper

# Create a DHTI chain instance
dhti_chain = BaseChain(
    name="medical_analyzer",
    description="Analyzes medical records"
)

# Wrap it as a CrewAI tool
crewai_tool = CrewAIChainToolWrapper(
    chain=dhti_chain,
    name="Medical Analyzer",
    description="Analyzes medical records and provides insights"
)
Source code in src/dhti_elixir_base/crewai/chain_tool_wrapper.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class CrewAIChainToolWrapper(CrewAIBaseTool):
    """
    Wrapper class to make BaseChain usable as a Tool within CrewAI.

    This wrapper allows the use of DHTI's BaseChain as a tool within the CrewAI framework
    by adapting its interface to CrewAI's tool requirements.

    Args:
        chain: An instance of BaseChain from dhti_elixir_base
        name: Name of the tool (optional, uses chain's name if not provided)
        description: Description of the tool (optional, uses chain's description if not provided)
        **kwargs: Additional keyword arguments

    Example:
        ```python
        from dhti_elixir_base import BaseChain
        from dhti_elixir_base.crewai import CrewAIChainToolWrapper

        # Create a DHTI chain instance
        dhti_chain = BaseChain(
            name="medical_analyzer",
            description="Analyzes medical records"
        )

        # Wrap it as a CrewAI tool
        crewai_tool = CrewAIChainToolWrapper(
            chain=dhti_chain,
            name="Medical Analyzer",
            description="Analyzes medical records and provides insights"
        )
        ```
    """

    name: str = "BaseChain Tool"
    description: str = "A tool that wraps a DHTI BaseChain for use in CrewAI"
    _dhti_chain: BaseChain = PrivateAttr()

    def __init__(
        self,
        chain: BaseChain,
        name: str | None = None,
        description: str | None = None,
        **kwargs: Any,
    ):
        """
        Initialize the CrewAI Chain Tool wrapper.

        Args:
            chain: An instance of BaseChain
            name: Name of the tool
            description: Description of the tool
            **kwargs: Additional keyword arguments
        """
        # Set name and description from chain if not provided
        tool_name = name or chain.name or "chain_tool"
        tool_description = (
            description or chain.description or "A chain tool for processing inputs"
        )

        # Initialize the base tool
        super().__init__(
            name=tool_name,
            description=tool_description,
            **kwargs,
        )

        # Restore the original description since CrewAI's _generate_description
        # prepends tool name and arguments to it
        self.description = tool_description

        # Store the DHTI chain reference
        self._dhti_chain = chain

    def _generate_description(self) -> None:
        """Override to prevent automatic description generation."""
        # Do nothing - we want to keep the simple description
        pass

    def _run(self, *args: Any, **kwargs: Any) -> str:
        """
        Execute the underlying DHTI chain.

        Args:
            *args: Positional arguments (first arg used as input if no kwargs)
            **kwargs: Keyword arguments passed to the chain

        Returns:
            str: The result of the chain execution

        Raises:
            ValueError: If neither positional nor keyword arguments are provided
            RuntimeError: If chain execution fails
        """
        try:
            # If kwargs are provided, use them directly
            if kwargs:
                result = self._dhti_chain.invoke(**kwargs)
            # If a single positional arg is provided, treat it as the input
            elif args:
                result = self._dhti_chain.invoke(input=args[0])
            else:
                raise ValueError(
                    "Either provide input as a keyword argument or as a positional argument"
                )

            # Convert result to string
            if isinstance(result, dict):
                # If result is a dict, try to extract the most relevant value
                if "cards" in result:
                    # Handle CDS Hook response format
                    return str(result.get("cards", []))
                elif "output" in result:
                    return str(result["output"])
                else:
                    return str(result)

            return str(result)
        except ValueError:
            raise
        except Exception as e:
            raise RuntimeError(f"Chain execution failed: {e}") from e

    def __str__(self) -> str:
        """Return string representation of the wrapper."""
        return f"CrewAIChainToolWrapper(chain={self._dhti_chain.name})"

    def __repr__(self) -> str:
        """Return detailed string representation of the wrapper."""
        return self.__str__()

__init__(chain, name=None, description=None, **kwargs)

Initialize the CrewAI Chain Tool wrapper.

Parameters:

Name Type Description Default
chain BaseChain

An instance of BaseChain

required
name str | None

Name of the tool

None
description str | None

Description of the tool

None
**kwargs Any

Additional keyword arguments

{}
Source code in src/dhti_elixir_base/crewai/chain_tool_wrapper.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(
    self,
    chain: BaseChain,
    name: str | None = None,
    description: str | None = None,
    **kwargs: Any,
):
    """
    Initialize the CrewAI Chain Tool wrapper.

    Args:
        chain: An instance of BaseChain
        name: Name of the tool
        description: Description of the tool
        **kwargs: Additional keyword arguments
    """
    # Set name and description from chain if not provided
    tool_name = name or chain.name or "chain_tool"
    tool_description = (
        description or chain.description or "A chain tool for processing inputs"
    )

    # Initialize the base tool
    super().__init__(
        name=tool_name,
        description=tool_description,
        **kwargs,
    )

    # Restore the original description since CrewAI's _generate_description
    # prepends tool name and arguments to it
    self.description = tool_description

    # Store the DHTI chain reference
    self._dhti_chain = chain

__repr__()

Return detailed string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/chain_tool_wrapper.py
155
156
157
def __repr__(self) -> str:
    """Return detailed string representation of the wrapper."""
    return self.__str__()

__str__()

Return string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/chain_tool_wrapper.py
151
152
153
def __str__(self) -> str:
    """Return string representation of the wrapper."""
    return f"CrewAIChainToolWrapper(chain={self._dhti_chain.name})"

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

CrewAILangChainToolWrapper

Bases: BaseTool

Wrapper class to make LangChain Tool usable within CrewAI.

This wrapper allows the use of standard LangChain tools within the CrewAI framework by adapting their interface to CrewAI's tool requirements.

Parameters:

Name Type Description Default
langchain_tool BaseTool | Any

An instance of LangChain BaseTool

required
**kwargs Any

Additional keyword arguments

{}
Example
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from dhti_elixir_base.crewai import CrewAILangChainToolWrapper

# Create a LangChain tool
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())

# Wrap it for use with CrewAI
crewai_tool = CrewAILangChainToolWrapper(langchain_tool=wikipedia)
Source code in src/dhti_elixir_base/crewai/langchain_tool_wrapper.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class CrewAILangChainToolWrapper(CrewAIBaseTool):
    """
    Wrapper class to make LangChain Tool usable within CrewAI.

    This wrapper allows the use of standard LangChain tools within the CrewAI framework
    by adapting their interface to CrewAI's tool requirements.

    Args:
        langchain_tool: An instance of LangChain BaseTool
        **kwargs: Additional keyword arguments

    Example:
        ```python
        from langchain_community.tools import WikipediaQueryRun
        from langchain_community.utilities import WikipediaAPIWrapper
        from dhti_elixir_base.crewai import CrewAILangChainToolWrapper

        # Create a LangChain tool
        wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())

        # Wrap it for use with CrewAI
        crewai_tool = CrewAILangChainToolWrapper(langchain_tool=wikipedia)
        ```
    """

    name: str = "LangChain Tool"
    description: str = "A tool that wraps a LangChain tool for use in CrewAI"
    _langchain_tool: LangChainBaseTool | Any = PrivateAttr()

    def __init__(
        self,
        langchain_tool: LangChainBaseTool | Any,
        **kwargs: Any,
    ):
        """
        Initialize the CrewAI LangChain Tool wrapper.

        Args:
            langchain_tool: An instance of LangChain BaseTool or compatible tool
            **kwargs: Additional keyword arguments
        """
        # Extract name and description from the LangChain tool
        tool_name = str(getattr(langchain_tool, "name", "langchain_tool"))
        tool_description = str(
            getattr(
                langchain_tool, "description", "A LangChain tool wrapped for CrewAI"
            )
        )

        # Initialize the base tool
        super().__init__(
            name=tool_name,
            description=tool_description,
            **kwargs,
        )

        # Restore the original description since CrewAI's _generate_description
        # prepends tool name and arguments to it
        self.description = tool_description

        # Store the LangChain tool reference
        self._langchain_tool = langchain_tool

    def _generate_description(self) -> None:
        """Override to prevent automatic description generation."""
        # Do nothing - we want to keep the simple description
        pass

    def _run(self, *args: Any, **kwargs: Any) -> str:
        """
        Execute the underlying LangChain tool.

        Args:
            *args: Positional arguments passed to the tool
            **kwargs: Keyword arguments passed to the tool

        Returns:
            str: The result of the tool execution

        Raises:
            AttributeError: If no valid invocation method is found on the tool
        """
        try:
            # Try using the run method (common in LangChain tools)
            if hasattr(self._langchain_tool, "run"):
                try:
                    if args and not kwargs:
                        result = self._langchain_tool.run(*args)
                    elif kwargs:
                        result = self._langchain_tool.run(**kwargs)
                    else:
                        result = self._langchain_tool.run(tool_input={})
                    return str(result)
                except (AttributeError, TypeError):
                    # run method doesn't exist or failed, try next option
                    pass

            # Try using the invoke method (newer LangChain tools)
            if hasattr(self._langchain_tool, "invoke"):
                try:
                    if args and not kwargs:
                        result = self._langchain_tool.invoke(
                            args[0] if len(args) == 1 else args # type: ignore
                        )
                    elif kwargs:
                        result = self._langchain_tool.invoke(kwargs)
                    else:
                        result = self._langchain_tool.invoke({})
                    return str(result)
                except (AttributeError, TypeError):
                    # invoke method doesn't exist or failed, try next option
                    pass

            # Check if __call__ is explicitly set on the object (for test mocks)
            if "__call__" in self._langchain_tool.__dict__:
                __call_method = self._langchain_tool.__dict__["__call__"]
                if args and not kwargs:
                    result = __call_method(*args)
                elif kwargs:
                    result = __call_method(**kwargs)
                else:
                    result = __call_method()
                return str(result)

            # Try calling the tool directly if it's callable
            if callable(self._langchain_tool):
                try:
                    if args and not kwargs:
                        result = self._langchain_tool(*args)
                    elif kwargs:
                        result = self._langchain_tool(**kwargs)
                    else:
                        result = self._langchain_tool()
                    return str(result)
                except (AttributeError, TypeError):
                    # Direct call didn't work
                    pass

            # If we got here, no valid method was found
            tool_type = type(self._langchain_tool).__name__
            raise AttributeError(
                f"LangChain tool '{tool_type}' does not have any of the following: "
                "run(), invoke(), or __call__() methods"
            )

        except AttributeError:
            raise
        except Exception as e:
            return f"Error executing LangChain tool: {e!s}"

    def __str__(self) -> str:
        """Return string representation of the wrapper."""
        return f"CrewAILangChainToolWrapper(tool={self._langchain_tool.name})"

    def __repr__(self) -> str:
        """Return detailed string representation of the wrapper."""
        return self.__str__()

__init__(langchain_tool, **kwargs)

Initialize the CrewAI LangChain Tool wrapper.

Parameters:

Name Type Description Default
langchain_tool BaseTool | Any

An instance of LangChain BaseTool or compatible tool

required
**kwargs Any

Additional keyword arguments

{}
Source code in src/dhti_elixir_base/crewai/langchain_tool_wrapper.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self,
    langchain_tool: LangChainBaseTool | Any,
    **kwargs: Any,
):
    """
    Initialize the CrewAI LangChain Tool wrapper.

    Args:
        langchain_tool: An instance of LangChain BaseTool or compatible tool
        **kwargs: Additional keyword arguments
    """
    # Extract name and description from the LangChain tool
    tool_name = str(getattr(langchain_tool, "name", "langchain_tool"))
    tool_description = str(
        getattr(
            langchain_tool, "description", "A LangChain tool wrapped for CrewAI"
        )
    )

    # Initialize the base tool
    super().__init__(
        name=tool_name,
        description=tool_description,
        **kwargs,
    )

    # Restore the original description since CrewAI's _generate_description
    # prepends tool name and arguments to it
    self.description = tool_description

    # Store the LangChain tool reference
    self._langchain_tool = langchain_tool

__repr__()

Return detailed string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/langchain_tool_wrapper.py
183
184
185
def __repr__(self) -> str:
    """Return detailed string representation of the wrapper."""
    return self.__str__()

__str__()

Return string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/langchain_tool_wrapper.py
179
180
181
def __str__(self) -> str:
    """Return string representation of the wrapper."""
    return f"CrewAILangChainToolWrapper(tool={self._langchain_tool.name})"

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

CrewAILLMWrapper

Bases: BaseLLM

Wrapper class to make BaseLLM and BaseChatLLM compatible with CrewAI.

This wrapper allows the use of DHTI's LLM classes within the CrewAI framework by adapting their interfaces to CrewAI's requirements.

Parameters:

Name Type Description Default
llm BaseLLM | BaseChatLLM

An instance of BaseLLM or BaseChatLLM from dhti_elixir_base

required
**kwargs Any

Additional keyword arguments

{}
Example
from dhti_elixir_base import BaseChatLLM
from dhti_elixir_base.crewai import CrewAILLMWrapper

# Create a DHTI LLM instance
dhti_llm = BaseChatLLM(
    base_url="https://api.example.com/chat",
    model="gpt-4",
    api_key="your-api-key"
)

# Wrap it for use with CrewAI
crewai_llm = CrewAILLMWrapper(llm=dhti_llm)
Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class CrewAILLMWrapper(CrewAIBaseLLM):
    """
    Wrapper class to make BaseLLM and BaseChatLLM compatible with CrewAI.

    This wrapper allows the use of DHTI's LLM classes within the CrewAI framework
    by adapting their interfaces to CrewAI's requirements.

    Args:
        llm: An instance of BaseLLM or BaseChatLLM from dhti_elixir_base
        **kwargs: Additional keyword arguments

    Example:
        ```python
        from dhti_elixir_base import BaseChatLLM
        from dhti_elixir_base.crewai import CrewAILLMWrapper

        # Create a DHTI LLM instance
        dhti_llm = BaseChatLLM(
            base_url="https://api.example.com/chat",
            model="gpt-4",
            api_key="your-api-key"
        )

        # Wrap it for use with CrewAI
        crewai_llm = CrewAILLMWrapper(llm=dhti_llm)
        ```
    """

    def __init__(self, llm: BaseLLM | BaseChatLLM, **kwargs: Any):
        """
        Initialize the CrewAI LLM wrapper.

        Args:
            llm: An instance of BaseLLM or BaseChatLLM
            **kwargs: Additional keyword arguments
        """
        self._dhti_llm = llm
        self._model_name = getattr(llm, "model", "custom-model")

    def call(self, messages: list[dict[str, Any]], *args: Any, **kwargs: Any) -> str:
        """
        Call the underlying DHTI LLM with the provided messages.

        Args:
            messages: List of message dictionaries with 'role' and 'content' keys
            *args: Additional positional arguments
            **kwargs: Additional keyword arguments

        Returns:
            str: The generated response from the LLM

        Raises:
            ValueError: If messages list is empty or malformed
            RuntimeError: If LLM invocation fails
        """
        if not messages:
            raise ValueError("Messages list cannot be empty")

        try:
            # Convert messages to the format expected by DHTI LLMs
            if isinstance(self._dhti_llm, BaseChatLLM):
                # For BaseChatLLM, convert to LangChain messages
                from langchain_core.messages import (
                    AIMessage,
                    HumanMessage,
                    SystemMessage,
                )

                lc_messages = []
                for msg in messages:
                    role = msg.get("role", "user")
                    content = msg.get("content", "")

                    if role == "system":
                        lc_messages.append(SystemMessage(content=content))
                    elif role == "assistant":
                        lc_messages.append(AIMessage(content=content))
                    else:  # user or any other role
                        lc_messages.append(HumanMessage(content=content))

                result = self._dhti_llm.invoke(lc_messages)
                return result.content if hasattr(result, "content") else str(result)  # type: ignore
            else:
                # For BaseLLM, combine messages into a single prompt
                prompt = "\n".join(
                    [
                        f"{msg.get('role', 'user')}: {msg.get('content', '')}"
                        for msg in messages
                    ]
                )
                return self._dhti_llm.invoke(prompt)
        except Exception as e:
            raise RuntimeError(f"Failed to invoke LLM: {e}") from e

    is_litellm: bool = False

    @property
    def model(self) -> str:
        """Return the model name."""
        return self._model_name

    @property
    def provider(self) -> str:
        """Return the provider name."""
        return "dhti-elixir"

    @provider.setter
    def provider(self, value: str) -> None:
        # Read-only property; setter is a no-op to satisfy base class signature
        pass

    def get_context_window_size(self) -> int:
        """Get the context window size for the model."""
        # Default context window size
        # This can be overridden in subclasses for specific models
        return 4096 * 2

    def get_token_usage_summary(self) -> UsageMetrics:
        """Get token usage summary."""
        return UsageMetrics(
            prompt_tokens=0,
            completion_tokens=0,
            total_tokens=0,
        )

    def supports_stop_words(self) -> bool:
        """Check if the model supports stop words."""
        return False

    def __str__(self) -> str:
        """Return string representation of the wrapper."""
        return f"CrewAILLMWrapper(llm={self._dhti_llm.__class__.__name__})"

    def __repr__(self) -> str:
        """Return detailed string representation of the wrapper."""
        return self.__str__()

model property

Return the model name.

provider property writable

Return the provider name.

__init__(llm, **kwargs)

Initialize the CrewAI LLM wrapper.

Parameters:

Name Type Description Default
llm BaseLLM | BaseChatLLM

An instance of BaseLLM or BaseChatLLM

required
**kwargs Any

Additional keyword arguments

{}
Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
54
55
56
57
58
59
60
61
62
63
def __init__(self, llm: BaseLLM | BaseChatLLM, **kwargs: Any):
    """
    Initialize the CrewAI LLM wrapper.

    Args:
        llm: An instance of BaseLLM or BaseChatLLM
        **kwargs: Additional keyword arguments
    """
    self._dhti_llm = llm
    self._model_name = getattr(llm, "model", "custom-model")

__repr__()

Return detailed string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
159
160
161
def __repr__(self) -> str:
    """Return detailed string representation of the wrapper."""
    return self.__str__()

__str__()

Return string representation of the wrapper.

Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
155
156
157
def __str__(self) -> str:
    """Return string representation of the wrapper."""
    return f"CrewAILLMWrapper(llm={self._dhti_llm.__class__.__name__})"

call(messages, *args, **kwargs)

Call the underlying DHTI LLM with the provided messages.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

List of message dictionaries with 'role' and 'content' keys

required
*args Any

Additional positional arguments

()
**kwargs Any

Additional keyword arguments

{}

Returns:

Name Type Description
str str

The generated response from the LLM

Raises:

Type Description
ValueError

If messages list is empty or malformed

RuntimeError

If LLM invocation fails

Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def call(self, messages: list[dict[str, Any]], *args: Any, **kwargs: Any) -> str:
    """
    Call the underlying DHTI LLM with the provided messages.

    Args:
        messages: List of message dictionaries with 'role' and 'content' keys
        *args: Additional positional arguments
        **kwargs: Additional keyword arguments

    Returns:
        str: The generated response from the LLM

    Raises:
        ValueError: If messages list is empty or malformed
        RuntimeError: If LLM invocation fails
    """
    if not messages:
        raise ValueError("Messages list cannot be empty")

    try:
        # Convert messages to the format expected by DHTI LLMs
        if isinstance(self._dhti_llm, BaseChatLLM):
            # For BaseChatLLM, convert to LangChain messages
            from langchain_core.messages import (
                AIMessage,
                HumanMessage,
                SystemMessage,
            )

            lc_messages = []
            for msg in messages:
                role = msg.get("role", "user")
                content = msg.get("content", "")

                if role == "system":
                    lc_messages.append(SystemMessage(content=content))
                elif role == "assistant":
                    lc_messages.append(AIMessage(content=content))
                else:  # user or any other role
                    lc_messages.append(HumanMessage(content=content))

            result = self._dhti_llm.invoke(lc_messages)
            return result.content if hasattr(result, "content") else str(result)  # type: ignore
        else:
            # For BaseLLM, combine messages into a single prompt
            prompt = "\n".join(
                [
                    f"{msg.get('role', 'user')}: {msg.get('content', '')}"
                    for msg in messages
                ]
            )
            return self._dhti_llm.invoke(prompt)
    except Exception as e:
        raise RuntimeError(f"Failed to invoke LLM: {e}") from e

get_context_window_size()

Get the context window size for the model.

Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
137
138
139
140
141
def get_context_window_size(self) -> int:
    """Get the context window size for the model."""
    # Default context window size
    # This can be overridden in subclasses for specific models
    return 4096 * 2

get_token_usage_summary()

Get token usage summary.

Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
143
144
145
146
147
148
149
def get_token_usage_summary(self) -> UsageMetrics:
    """Get token usage summary."""
    return UsageMetrics(
        prompt_tokens=0,
        completion_tokens=0,
        total_tokens=0,
    )

supports_stop_words()

Check if the model supports stop words.

Source code in src/dhti_elixir_base/crewai/llm_wrapper.py
151
152
153
def supports_stop_words(self) -> bool:
    """Check if the model supports stop words."""
    return False

Copyright 2025 Bell Eapen

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

FileProcessingRequest

Bases: CustomUserType

Request including a base64 encoded file.

Source code in src/dhti_elixir_base/rag/process.py
33
34
35
36
37
38
39
40
41
42
43
44
class FileProcessingRequest(CustomUserType):
    """Request including a base64 encoded file."""

    # The extra field is used to specify a widget for the playground UI.
    file: str = Field(..., extra={"widget": {"type": "base64file"}})  # type: ignore
    filename: str = Field(
        default="", json_schema_extra={"widget": {"type": "text"}}
    )
    year: int = Field(
        default=0,
        json_schema_extra={"widget": {"type": "number"}},
    )

combine_documents(documents, document_separator='\n\n')

Combine documents into a single string.

Source code in src/dhti_elixir_base/rag/process.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def combine_documents(documents: list, document_separator="\n\n") -> str:
    """Combine documents into a single string."""
    # Use list and join for O(n) instead of repeated string concatenation O(n²)
    parts = []
    DEFAULT_DOCUMENT_PROMPT = PromptTemplate.from_template(template="{page_content}\n")
    for document in documents:
        filename = document.metadata.get("filename", "")
        year = document.metadata.get("year", 0)
        current_separator = f"[{filename} ({year})]\n\n" if filename and year else document_separator
        parts.append(
            DEFAULT_DOCUMENT_PROMPT.format(page_content=document.page_content)
            + current_separator
        )
    combined_text = "".join(parts)
    if len(combined_text) < 3:
        return "No information found. The vectorstore may still be indexing. Please try again later."
    return combined_text.strip()

process_file(request)

Extract the text from all pages of the PDF.

Source code in src/dhti_elixir_base/rag/process.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def process_file(request: FileProcessingRequest) -> str:
    """Extract the text from all pages of the PDF."""
    content = base64.b64decode(request.file.encode("utf-8"))
    blob = Blob(data=content)
    documents = list(PDFMinerParser().lazy_parse(blob))
    # Use list and join for O(n) instead of repeated string concatenation O(n²)
    page_contents = [doc.page_content for doc in documents]
    pages = "".join(page_contents)
    docs = get_di("text_splitter").create_documents([pages]) # type: ignore
    metadata = {"filename": request.filename, "year": request.year}
    _docs = []
    for doc in docs:
        doc.metadata = metadata
        _docs.append(doc)
    try:
        get_di("vectorstore").add_documents(_docs) # type: ignore
    except Exception as e:
        return f"Error adding documents to vectorstore: {e}"
    # return first 100 characters of the extracted text
    return pages[:100]

search_vectorstore(query)

Search the vectorstore for the given query.

Source code in src/dhti_elixir_base/rag/process.py
88
89
90
91
def search_vectorstore(query: str) -> list:
    """Search the vectorstore for the given query."""
    vectorstore = get_di("vectorstore")
    return vectorstore.similarity_search(query, k=get_di("rag_k", 5))  # type: ignore