Skip to content

Modules

Derivative based on the original work here: https://github.com/thehyve/omop-cdm/blob/main/src/omop_cdm/regular/cdm600/tables.py Modifications made to this file: - Removed support for schema. - Added new tables

  • Licensed under the Apache License, Version 2.0 (the "License");
  • you may not use this file except in compliance with the License.
  • You may obtain a copy of the License at *
  • https://www.apache.org/licenses/LICENSE-2.0 *
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.

Derivative based on the original work here: https://github.com/thehyve/omop-cdm/blob/main/src/omop_cdm/regular/cdm54/tables.py Modifications made to this file: - Removed support for schema. - Added new tables

  • Licensed under the Apache License, Version 2.0 (the "License");
  • you may not use this file except in compliance with the License.
  • You may obtain a copy of the License at *
  • https://www.apache.org/licenses/LICENSE-2.0 *
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.

Engine and session factory for OMOP CDM databases.

This module provides an asynchronous SQLAlchemy engine factory with helpers to create/init CDM schemas and obtain async sessions across supported backends (SQLite, MySQL, PostgreSQL).

CdmEngineFactory

Bases: object

Factory to create async SQLAlchemy engines and sessions for OMOP CDM.

Supports SQLite (default), MySQL, and PostgreSQL. Exposes convenience properties for the configured engine and async session maker.

Parameters:

Name Type Description Default
db

Database type: "sqlite", "mysql", or "pgsql".

'sqlite'
host

Database host (ignored for SQLite).

'localhost'
port

Database port (ignored for SQLite).

5432
user

Database user (ignored for SQLite).

'root'
pw

Database password (ignored for SQLite).

'pass'
name

Database name or SQLite filename.

'cdm.sqlite'
schema

PostgreSQL schema to use for CDM.

''
Source code in src/pyomop/engine_factory.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class CdmEngineFactory(object):
    """Factory to create async SQLAlchemy engines and sessions for OMOP CDM.

    Supports SQLite (default), MySQL, and PostgreSQL. Exposes convenience
    properties for the configured engine and async session maker.

    Args:
        db: Database type: "sqlite", "mysql", or "pgsql".
        host: Database host (ignored for SQLite).
        port: Database port (ignored for SQLite).
        user: Database user (ignored for SQLite).
        pw: Database password (ignored for SQLite).
        name: Database name or SQLite filename.
        schema: PostgreSQL schema to use for CDM.
    """

    def __init__(
        self,
        db="sqlite",
        host="localhost",
        port=5432,
        user="root",
        pw="pass",
        name="cdm.sqlite",
        schema="",
    ):
        self._db = db
        self._name = name
        self._host = host
        self._port = port
        self._user = user
        self._pw = pw
        self._schema = schema
        self._engine = None
        self._base = None

    async def init_models(self, metadata):
        """Drop and re-create all tables from provided metadata.

        This is mainly used for tests and quick local setups.

        Args:
            metadata: SQLAlchemy ``MetaData`` containing table definitions.

        Raises:
            ValueError: If the engine has not been initialized.
        """
        if self._engine is None:
            raise ValueError("Database engine is not initialized.")
        async with self._engine.begin() as conn:
            await conn.run_sync(metadata.drop_all)
            await conn.run_sync(metadata.create_all)

    @property
    def db(self):
        """Return the configured database type (sqlite/mysql/pgsql)."""
        return self._db

    @property
    def host(self):
        """Return the configured database host (if applicable)."""
        return self._host

    @property
    def port(self):
        """Return the configured database port (if applicable)."""
        return self._port

    @property
    def name(self):
        """Return the configured database name or SQLite filename."""
        return self._name

    @property
    def user(self):
        """Return the configured database user (if applicable)."""
        return self._user

    @property
    def pw(self):
        """Return the configured database password (if applicable)."""
        return self._pw

    @property
    def schema(self):
        """Return the configured schema (PostgreSQL)."""
        return self._schema

    @property
    def base(self):
        """Return automapped classes when an engine exists, otherwise None."""
        if self.engine is not None:  # Not self_engine
            Base = automap_base()
            Base.prepare(self.engine, reflect=True)
            return Base.classes
        return None

    @property
    def engine(self):
        """Create or return the async engine for the configured backend.

        Returns:
            Async engine instance bound to the configured database.
        """
        if self._db == "sqlite":
            # Schemas are not supported for SQLite; warn if a non-default schema was provided
            if self._schema and self._schema not in ("", "public"):
                logger.warning(
                    "Schema is not supported for SQLite; ignoring schema='%s'",
                    self._schema,
                )
            self._engine = create_async_engine("sqlite+aiosqlite:///" + self._name)
        elif self._db == "mysql":
            # Schemas are not supported for MySQL in the same way as PostgreSQL; warn and ignore
            if self._schema and self._schema not in ("", "public"):
                logger.warning(
                    "Schema is not supported for MySQL; ignoring schema='%s'",
                    self._schema,
                )
            mysql_url = "mysql://{}:{}@{}:{}/{}"
            mysql_url = mysql_url.format(
                self._user, self._pw, self._host, self._port, self._name
            )
            self._engine = create_async_engine(
                mysql_url, isolation_level="READ UNCOMMITTED"
            )
        elif self._db == "pgsql":
            pgsql_url = "postgresql+asyncpg://{}:{}@{}:{}/{}"
            pgsql_url = pgsql_url.format(
                self._user, self._pw, self._host, self._port, self._name
            )
            connect_args = {}
            # If a schema is provided, set the PostgreSQL search_path so that all
            # operations (reflection, DDL/DML) use this schema by default.
            if self._schema and self._schema != "":
                connect_args = {"server_settings": {"search_path": self._schema}}
            self._engine = create_async_engine(pgsql_url, connect_args=connect_args)
        else:
            # Unknown DB type; create no engine and warn
            logger.warning("Unknown database type '%s'—no engine created.", self._db)
            return None
        return self._engine

    @property
    def session(self):
        """Return an async_sessionmaker for creating AsyncSession objects."""
        if self._engine is not None:
            async_session = async_sessionmaker(
                self._engine, expire_on_commit=False, class_=AsyncSession
            )
            return async_session
        return None

    @property
    def async_session(self):
        """Alias for session to maintain backward compatibility."""
        if self._engine is not None:
            async_session = async_sessionmaker(
                self._engine, expire_on_commit=False, class_=AsyncSession
            )
            return async_session
        return None

    @db.setter
    def db(self, value):
        self._db = value

    @name.setter
    def name(self, value):
        self._name = value

    @port.setter
    def port(self, value):
        self._port = value

    @host.setter
    def host(self, value):
        self._host = value

    @user.setter
    def user(self, value):
        self._user = value

    @pw.setter
    def pw(self, value):
        self._pw = value

    @schema.setter
    def schema(self, value):
        self._schema = value

async_session property

Alias for session to maintain backward compatibility.

base property

Return automapped classes when an engine exists, otherwise None.

db property writable

Return the configured database type (sqlite/mysql/pgsql).

engine property

Create or return the async engine for the configured backend.

Returns:

Type Description

Async engine instance bound to the configured database.

host property writable

Return the configured database host (if applicable).

name property writable

Return the configured database name or SQLite filename.

port property writable

Return the configured database port (if applicable).

pw property writable

Return the configured database password (if applicable).

schema property writable

Return the configured schema (PostgreSQL).

session property

Return an async_sessionmaker for creating AsyncSession objects.

user property writable

Return the configured database user (if applicable).

init_models(metadata) async

Drop and re-create all tables from provided metadata.

This is mainly used for tests and quick local setups.

Parameters:

Name Type Description Default
metadata

SQLAlchemy MetaData containing table definitions.

required

Raises:

Type Description
ValueError

If the engine has not been initialized.

Source code in src/pyomop/engine_factory.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def init_models(self, metadata):
    """Drop and re-create all tables from provided metadata.

    This is mainly used for tests and quick local setups.

    Args:
        metadata: SQLAlchemy ``MetaData`` containing table definitions.

    Raises:
        ValueError: If the engine has not been initialized.
    """
    if self._engine is None:
        raise ValueError("Database engine is not initialized.")
    async with self._engine.begin() as conn:
        await conn.run_sync(metadata.drop_all)
        await conn.run_sync(metadata.create_all)

LLM-oriented SQLDatabase wrapper for OMOP CDM.

This module provides a thin wrapper around llama_index.core.SQLDatabase that is aware of the OMOP CDM metadata reflected from this package's SQLAlchemy models. It enables LLM-powered query components to reason about available tables, columns, and foreign keys using the OMOP metadata directly.

This file is import-safe even when the optional LLM extras are not installed; in that case, attempting to instantiate CDMDatabase will raise a clear ImportError directing you to install pyomop[llm].

CDMDatabase

Bases: SQLDatabase

OMOP-aware SQLDatabase for LLM query engines.

This class adapts llama-index's SQLDatabase to use the OMOP CDM SQLAlchemy metadata bundled with this package, making it easy to expose concise schema information to LLM components.

Parameters:

Name Type Description Default
engine Engine

SQLAlchemy Engine connected to the OMOP database.

required
schema Optional[str]

Optional database schema name.

None
ignore_tables Optional[List[str]]

Tables to hide from the LLM context.

None
include_tables Optional[List[str]]

Explicit subset of tables to expose.

None
sample_rows_in_table_info int

Kept for API parity (unused here).

3
indexes_in_table_info bool

Kept for API parity (unused here).

False
custom_table_info Optional[dict]

Optional overrides for table descriptions.

None
view_support bool

Whether to reflect views as well (unused here).

False
max_string_length int

Max length of generated descriptions.

300
version str

OMOP CDM version label ("cdm54" or "cdm6").

'cdm54'
Source code in src/pyomop/llm_engine.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class CDMDatabase(SQLDatabase):
    """OMOP-aware SQLDatabase for LLM query engines.

    This class adapts llama-index's ``SQLDatabase`` to use the OMOP CDM
    SQLAlchemy metadata bundled with this package, making it easy to expose
    concise schema information to LLM components.

    Args:
        engine: SQLAlchemy ``Engine`` connected to the OMOP database.
        schema: Optional database schema name.
        ignore_tables: Tables to hide from the LLM context.
        include_tables: Explicit subset of tables to expose.
        sample_rows_in_table_info: Kept for API parity (unused here).
        indexes_in_table_info: Kept for API parity (unused here).
        custom_table_info: Optional overrides for table descriptions.
        view_support: Whether to reflect views as well (unused here).
        max_string_length: Max length of generated descriptions.
        version: OMOP CDM version label ("cdm54" or "cdm6").
    """

    def __init__(
        self,
        engine: Engine,
        schema: Optional[str] = None,
        ignore_tables: Optional[List[str]] = None,
        include_tables: Optional[List[str]] = None,
        sample_rows_in_table_info: int = 3,
        indexes_in_table_info: bool = False,
        custom_table_info: Optional[dict] = None,
        view_support: bool = False,
        max_string_length: int = 300,
        version: str = "cdm54",
    ) -> None:
        if not _LLM_AVAILABLE:  # pragma: no cover - import-safe guard
            raise ImportError("Install 'pyomop[llm]' to use LLM features.")

        # Basic configuration
        self._engine = engine
        self._schema = schema

        if include_tables and ignore_tables:
            raise ValueError("Cannot specify both include_tables and ignore_tables")

        # Load OMOP metadata for the chosen version
        if version == "cdm6":
            from .cdm6 import Base
        else:
            from .cdm54 import Base
        metadata: MetaData = Base.metadata

        # All known tables (no view reflection here)
        self._all_tables = set(metadata.tables.keys())

        # Validate include/ignore lists
        self._include_tables = set(include_tables) if include_tables else set()
        if self._include_tables:
            missing = self._include_tables - self._all_tables
            if missing:
                raise ValueError(f"include_tables {missing} not found in OMOP metadata")

        self._ignore_tables = set(ignore_tables) if ignore_tables else set()
        if self._ignore_tables:
            missing = self._ignore_tables - self._all_tables
            if missing:
                raise ValueError(f"ignore_tables {missing} not found in OMOP metadata")

        if self._include_tables:
            usable = set(self._include_tables)
        elif self._ignore_tables:
            usable = self._all_tables - self._ignore_tables
        else:
            usable = set(self._all_tables)
        self._usable_tables = usable

        if not isinstance(sample_rows_in_table_info, int):
            raise TypeError("sample_rows_in_table_info must be an integer")
        self._sample_rows_in_table_info = sample_rows_in_table_info
        self._indexes_in_table_info = indexes_in_table_info

        # Optional custom descriptions
        self._custom_table_info = custom_table_info
        if self._custom_table_info is not None:
            if not isinstance(self._custom_table_info, dict):
                raise TypeError(
                    "custom_table_info must be a dict of {table_name: description}"
                )
            self._custom_table_info = {
                t: info
                for t, info in self._custom_table_info.items()
                if t in self._all_tables
            }

        self._max_string_length = max_string_length
        self._metadata = metadata

        # Initialize parent so llama-index internals are configured too.
        # llama-index expects a synchronous SQLAlchemy Engine. If an AsyncEngine
        # is provided, create a synchronous engine from the same URL.
        parent_engine: Engine
        if AsyncEngine is not None and isinstance(self._engine, AsyncEngine):
            url_str = str(self._engine.url)
            # Convert common async driver URLs to sync variants.
            # This is conservative and primarily targets SQLite used in tests.
            url_str = (
                url_str.replace("+aiosqlite", "")
                .replace("+asyncpg", "")
                .replace("+psycopg_async", "+psycopg2")
            )
            parent_engine = create_engine(url_str)
        else:
            parent_engine = self._engine  # type: ignore[assignment]

        super().__init__(
            engine=parent_engine,
            schema=schema,
            include_tables=sorted(self._usable_tables) if self._usable_tables else None,
        )

    # --- llama-index compatibility helpers (use OMOP metadata directly) ---
    def get_table_columns(self, table_name: str) -> List[str]:
        """Return list of column names for a table.

        This uses the OMOP SQLAlchemy ``MetaData`` instead of DB inspector.
        """

        return [col.name for col in self._metadata.tables[table_name].columns]

    def get_single_table_info(self, table_name: str) -> str:
        """Return a concise description of columns and foreign keys for a table.

        The format matches what llama-index expects when building table context.
        """

        template = "Table '{table_name}' has columns: {columns}, and foreign keys: {foreign_keys}."
        columns: List[str] = []
        foreign_keys: List[str] = []
        for column in self._metadata.tables[table_name].columns:
            columns.append(f"{column.name} ({column.type!s})")
            for fk in column.foreign_keys:
                foreign_keys.append(
                    f"{column.name} -> {fk.column.table.name}.{fk.column.name}"
                )
        column_str = ", ".join(columns)
        fk_str = ", ".join(foreign_keys)
        return template.format(
            table_name=table_name, columns=column_str, foreign_keys=fk_str
        )

    def usable_tables(self) -> List[str]:
        """Return the sorted list of tables exposed to the LLM.

        This respects include/ignore settings passed at initialization.
        """

        return sorted(self._usable_tables)

    # Backwards/compat helper name used in some code paths
    def get_usable_table_names(self) -> List[str]:  # pragma: no cover - thin wrapper
        return self.usable_tables()

get_single_table_info(table_name)

Return a concise description of columns and foreign keys for a table.

The format matches what llama-index expects when building table context.

Source code in src/pyomop/llm_engine.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get_single_table_info(self, table_name: str) -> str:
    """Return a concise description of columns and foreign keys for a table.

    The format matches what llama-index expects when building table context.
    """

    template = "Table '{table_name}' has columns: {columns}, and foreign keys: {foreign_keys}."
    columns: List[str] = []
    foreign_keys: List[str] = []
    for column in self._metadata.tables[table_name].columns:
        columns.append(f"{column.name} ({column.type!s})")
        for fk in column.foreign_keys:
            foreign_keys.append(
                f"{column.name} -> {fk.column.table.name}.{fk.column.name}"
            )
    column_str = ", ".join(columns)
    fk_str = ", ".join(foreign_keys)
    return template.format(
        table_name=table_name, columns=column_str, foreign_keys=fk_str
    )

get_table_columns(table_name)

Return list of column names for a table.

This uses the OMOP SQLAlchemy MetaData instead of DB inspector.

Source code in src/pyomop/llm_engine.py
161
162
163
164
165
166
167
def get_table_columns(self, table_name: str) -> List[str]:
    """Return list of column names for a table.

    This uses the OMOP SQLAlchemy ``MetaData`` instead of DB inspector.
    """

    return [col.name for col in self._metadata.tables[table_name].columns]

usable_tables()

Return the sorted list of tables exposed to the LLM.

This respects include/ignore settings passed at initialization.

Source code in src/pyomop/llm_engine.py
190
191
192
193
194
195
196
def usable_tables(self) -> List[str]:
    """Return the sorted list of tables exposed to the LLM.

    This respects include/ignore settings passed at initialization.
    """

    return sorted(self._usable_tables)

LLM query utilities over the OMOP CDM schema.

This module wires llama-index components to an OMOP-aware CDMDatabase so you can build semantic and SQL-first query engines that know about your CDM tables. All LLM-related imports are optional and performed lazily at runtime.

CdmLLMQuery

Helper that prepares an LLM-backed SQL query engine for OMOP.

It constructs an object index of selected CDM tables and exposes a retriever-backed query engine that can generate SQL or run SQL-only queries depending on configuration.

Parameters:

Name Type Description Default
sql_database CDMDatabase

A CDMDatabase instance connected to the OMOP DB.

required
llm Any

Optional LLM implementation to plug into llama-index settings.

None
similarity_top_k int

Top-k tables to retrieve for each query.

1
embed_model str

HuggingFace embedding model name.

'BAAI/bge-small-en-v1.5'
**kwargs Any

Reserved for future expansion.

{}
Source code in src/pyomop/llm_query.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class CdmLLMQuery:
    """Helper that prepares an LLM-backed SQL query engine for OMOP.

    It constructs an object index of selected CDM tables and exposes a
    retriever-backed query engine that can generate SQL or run SQL-only queries
    depending on configuration.

    Args:
        sql_database: A ``CDMDatabase`` instance connected to the OMOP DB.
        llm: Optional LLM implementation to plug into llama-index settings.
        similarity_top_k: Top-k tables to retrieve for each query.
        embed_model: HuggingFace embedding model name.
        **kwargs: Reserved for future expansion.
    """

    def __init__(
        self,
        sql_database: CDMDatabase,
        llm: Any = None,  # FIXME: type
        similarity_top_k: int = 1,
        embed_model: str = "BAAI/bge-small-en-v1.5",
        **kwargs: Any,
    ):
        # Lazy import optional dependencies so the package imports without them
        try:
            sql_query_mod = importlib.import_module(
                "llama_index.core.indices.struct_store.sql_query"
            )
            objects_mod = importlib.import_module("llama_index.core.objects")
            core_mod = importlib.import_module("llama_index.core")
            hf_mod = importlib.import_module("langchain_huggingface")

            SQLTableRetrieverQueryEngine = getattr(
                sql_query_mod, "SQLTableRetrieverQueryEngine"
            )
            SQLTableNodeMapping = getattr(objects_mod, "SQLTableNodeMapping")
            ObjectIndex = getattr(objects_mod, "ObjectIndex")
            SQLTableSchema = getattr(objects_mod, "SQLTableSchema")
            VectorStoreIndex = getattr(core_mod, "VectorStoreIndex")
            Settings = getattr(core_mod, "Settings")
            HuggingFaceEmbeddings = getattr(hf_mod, "HuggingFaceEmbeddings")
        except Exception as e:  # pragma: no cover
            raise ImportError("Install 'pyomop[llm]' to use LLM query features.") from e
        self._sql_database = sql_database
        self._similarity_top_k = similarity_top_k
        self._embed_model = HuggingFaceEmbeddings(model_name=embed_model)
        self._llm = llm
        Settings.llm = llm
        Settings.embed_model = self._embed_model
        self._table_node_mapping = SQLTableNodeMapping(sql_database)
        usable_tables = []
        if hasattr(sql_database, "usable_tables"):
            usable_tables = list(sql_database.usable_tables())  # type: ignore[attr-defined]
        elif hasattr(sql_database, "get_usable_table_names"):
            usable_tables = list(sql_database.get_usable_table_names())  # type: ignore[attr-defined]
        self._table_schema_objs = [
            SQLTableSchema(table_name=t) for t in sorted(set(usable_tables))
        ]

        self._object_index = ObjectIndex.from_objects(
            self._table_schema_objs,
            self._table_node_mapping,
            VectorStoreIndex,  # type: ignore
        )

        self._query_engine = SQLTableRetrieverQueryEngine(
            self._sql_database,
            self._object_index.as_retriever(similarity_top_k=1),
            sql_only=True,
        )

    @property
    def table_node_mapping(self) -> Any:
        """Mapping between tables and nodes used by the object index."""
        return self._table_node_mapping

    @property
    def table_schema_objs(self) -> list[Any]:
        """List of table schema objects indexed for retrieval."""
        return self._table_schema_objs

    @property
    def object_index(self) -> Any:
        """The underlying llama-index object index used for retrieval."""
        return self._object_index

    @property
    def query_engine(self) -> Any:
        """A retriever-backed SQL query engine over the CDM tables."""
        return self._query_engine

object_index property

The underlying llama-index object index used for retrieval.

query_engine property

A retriever-backed SQL query engine over the CDM tables.

table_node_mapping property

Mapping between tables and nodes used by the object index.

table_schema_objs property

List of table schema objects indexed for retrieval.

CSV-to-OMOP loader.

This module implements a flexible CSV loader that can populate multiple OMOP CDM tables according to a JSON mapping file. It also performs helpful cleanup operations like foreign key normalization, birthdate backfilling, gender mapping, and concept code lookups.

CdmCsvLoader

Load a single CSV into multiple OMOP CDM tables using a JSON mapping file.

Mapping file format (JSON):

{ "csv_key": "patient_id", # optional, CSV column that contains the patient/person identifier "tables": [ { "name": "cohort", # target table name as in the database "filters": [ # optional row filters applied to CSV before mapping {"column": "resourceType", "equals": "Encounter"} ], "columns": { # mapping of target_table_column -> value "cohort_definition_id": {"const": 1}, # constant value "subject_id": "patient_id", # copy from CSV column "cohort_start_date": "period.start", # copy from CSV column "cohort_end_date": "period.end" # copy from CSV column } } ] }

Notes
  • Constants are provided via {"const": value}.
  • If a required column is missing from mapping, it's left as None (DB default or nullable required).
  • Primary keys that are Integer types will autoincrement where supported (SQLite/PostgreSQL typical behavior).
  • Dates/times are converted to proper Python types where possible based on reflected column types.
Source code in src/pyomop/loader.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
class CdmCsvLoader:
    """
    Load a single CSV into multiple OMOP CDM tables using a JSON mapping file.

    Mapping file format (JSON):

    {
      "csv_key": "patient_id",            # optional, CSV column that contains the patient/person identifier
      "tables": [
        {
          "name": "cohort",              # target table name as in the database
          "filters": [                     # optional row filters applied to CSV before mapping
            {"column": "resourceType", "equals": "Encounter"}
          ],
          "columns": {                     # mapping of target_table_column -> value
            "cohort_definition_id": {"const": 1},              # constant value
            "subject_id": "patient_id",                         # copy from CSV column
            "cohort_start_date": "period.start",                # copy from CSV column
            "cohort_end_date": "period.end"                     # copy from CSV column
          }
        }
      ]
    }

    Notes:
      - Constants are provided via {"const": value}.
      - If a required column is missing from mapping, it's left as None (DB default or nullable required).
      - Primary keys that are Integer types will autoincrement where supported (SQLite/PostgreSQL typical behavior).
      - Dates/times are converted to proper Python types where possible based on reflected column types.
    """

    def __init__(self, cdm_engine_factory, version: str = "cdm54") -> None:
        """Create a loader bound to a specific database engine.

        Args:
            cdm_engine_factory: An initialized ``CdmEngineFactory``.
            version: OMOP CDM version label ("cdm54" or "cdm6").
        """
        self._cdm = cdm_engine_factory
        self._engine = cdm_engine_factory.engine
        self._maker = async_sessionmaker(self._engine, class_=AsyncSession)
        self._scope = async_scoped_session(self._maker, scopefunc=asyncio.current_task)
        self._version = version

    @asynccontextmanager
    async def _get_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Yield a scoped async session bound to the engine."""
        async with self._scope() as session:
            yield session

    async def _prepare_automap(self, conn: AsyncConnection) -> AutomapBase:
        """Reflect the database and return an automapped base."""
        automap: AutomapBase = automap_base()

        def _prepare(sync_conn):
            automap.prepare(autoload_with=sync_conn)

        await conn.run_sync(_prepare)
        return automap

    def _coerce_record_to_table_types(
        self,
        table,
        rec: Dict[str, Any],
        force_text_fields: Optional[Set[str]] = None,
    ) -> Dict[str, Any]:
        """Coerce a record's values to the SQL types defined by the target OMOP table.

        Rules:
        - Strings/Text: cast to str; lists/tuples joined by comma; dicts JSON-serialized; enforce max length if defined.
        - Integers/Numerics: tolerant numeric parsing; None if unparsable.
        - Dates/DateTimes: parsed via pandas; DateTime normalized to UTC-naive.
        - Forced TEXT fields: certain columns are always stringified (e.g., codes arrays). The list comes
          from mapping["force_text_fields"].

        This is applied just before insert to ensure DB type compatibility.
        """
        # Columns that should always be treated as TEXT regardless of inferred type
        force_text: Set[str] = set(force_text_fields or [])

        # Local helper: stringify numbers without trailing .0 (e.g., 1.0 -> "1")
        def _s(v: Any) -> str:
            try:
                if isinstance(v, float):
                    return str(int(v)) if v.is_integer() else str(v)
                if isinstance(v, Decimal):
                    if v == v.to_integral_value():
                        return str(int(v))
                    # Normalize to drop insignificant trailing zeros
                    return format(v.normalize(), "f")
                return "" if v is None else str(v)
            except Exception:
                return str(v)

        for col in table.columns:
            name = col.name
            if name not in rec:
                continue
            val = rec[name]
            if val is None:
                continue

            t = col.type

            # Force certain fields to TEXT
            if name in force_text:
                if isinstance(val, (list, tuple)):
                    sval = ",".join([_s(v) for v in val])
                elif isinstance(val, dict):
                    try:
                        import json as _json

                        sval = _json.dumps(val, ensure_ascii=False)
                    except Exception:
                        sval = _s(val)
                else:
                    sval = _s(val)
                max_len = getattr(t, "length", None)
                rec[name] = sval[: max_len or 255]
                continue

            # Type-driven coercion
            try:
                from pandas import to_datetime as _to_datetime
                from pandas import to_numeric as _to_numeric

                if isinstance(t, Date):
                    dt = _to_datetime(val, errors="coerce")
                    rec[name] = None if pd.isna(dt) else dt.date()
                elif isinstance(t, DateTime):
                    ts = _to_datetime(val, errors="coerce", utc=True)
                    if pd.isna(ts):
                        rec[name] = None
                    else:
                        py = ts.to_pydatetime()
                        rec[name] = py.astimezone(timezone.utc).replace(tzinfo=None)
                elif isinstance(t, (Integer, BigInteger)):
                    num = _to_numeric(val, errors="coerce")
                    rec[name] = None if pd.isna(num) else int(num)
                elif isinstance(t, Numeric):
                    num = _to_numeric(val, errors="coerce")
                    rec[name] = None if pd.isna(num) else Decimal(str(num))
                elif isinstance(t, (String, Text)):
                    if isinstance(val, (list, tuple)):
                        sval = ",".join([_s(v) for v in val])
                    elif isinstance(val, dict):
                        try:
                            import json as _json

                            sval = _json.dumps(val, ensure_ascii=False)
                        except Exception:
                            sval = _s(val)
                    else:
                        sval = _s(val)
                    max_len = getattr(t, "length", None)
                    rec[name] = sval[: max_len or 255]
                else:
                    # Leave other types as-is
                    pass
            except Exception:
                # Last resort, stringify
                try:
                    s = _s(val)
                    max_len = getattr(getattr(col, "type", None), "length", None)
                    rec[name] = s[: max_len or 255]
                except Exception:
                    rec[name] = val

        return rec

    async def _list_tables_with_person_id(self, conn: AsyncConnection) -> List[str]:
        """Return table names (in default schema) that contain a person_id column, excluding 'person'."""

        def _inner(sync_conn):
            from sqlalchemy import inspect as _inspect

            insp = _inspect(sync_conn)
            tables = insp.get_table_names()
            result: List[str] = []
            for t in tables:
                try:
                    cols = insp.get_columns(t)
                except Exception:
                    continue
                if any(c.get("name") == "person_id" for c in cols) and t != "person":
                    result.append(t)
            return result

        return await conn.run_sync(_inner)

    async def _add_person_id_text_columns(self, session: AsyncSession) -> None:
        """Add a temporary person_id_text TEXT column to all non-person tables that have person_id.

        This avoids fragile type-alter and FK juggling. We'll populate this column
        when incoming person identifiers are not numeric, then resolve to integer
        person_id in step 2 and drop the column.
        """
        conn = await session.connection()
        tables = await self._list_tables_with_person_id(conn)
        dialect = self._engine.dialect.name
        for t in tables:
            if dialect == "postgresql":
                ddl = f'ALTER TABLE "{t}" ADD COLUMN IF NOT EXISTS person_id_text TEXT'
            else:
                # SQLite (and others): try without IF NOT EXISTS
                ddl = f'ALTER TABLE "{t}" ADD COLUMN person_id_text TEXT'
            try:
                await session.execute(text(ddl))
            except Exception:
                # Column may already exist or backend may not support IF NOT EXISTS; ignore
                pass

    async def _drop_person_id_text_columns(self, session: AsyncSession) -> None:
        """Drop the temporary person_id_text column from all non-person tables."""
        conn = await session.connection()
        tables = await self._list_tables_with_person_id(conn)
        dialect = self._engine.dialect.name
        for t in tables:
            if dialect == "postgresql":
                ddl = f'ALTER TABLE "{t}" DROP COLUMN IF EXISTS person_id_text'
            else:
                ddl = f'ALTER TABLE "{t}" DROP COLUMN person_id_text'
            try:
                await session.execute(text(ddl))
            except Exception:
                # Some backends or versions (older SQLite) may not support DROP COLUMN; ignore
                pass

    def _load_mapping(self, mapping_path: str) -> Dict[str, Any]:
        """Load a JSON mapping file from disk."""
        with open(mapping_path, "r", encoding="utf-8") as f:
            return json.load(f)

    def _apply_filters(self, df: pd.DataFrame, filters: Optional[List[Dict[str, Any]]]):
        """Apply optional row filters to a DataFrame prior to mapping."""
        if not filters:
            return df
        mask = pd.Series([True] * len(df), index=df.index)
        for flt in filters:
            col = flt.get("column")
            if col is None or col not in df.columns:
                continue
            if "equals" in flt:
                mask &= (df[col] == flt["equals"]) | (
                    df[col].astype(str) == str(flt["equals"])
                )
            elif "not_empty" in flt and flt["not_empty"]:
                mask &= df[col].notna() & (df[col].astype(str).str.len() > 0)
        result = df.loc[mask, :].copy()
        return result

    def _convert_value(self, sa_type: Any, value: Any) -> Any:
        """Coerce a CSV value into an appropriate Python type for insert.

        The conversion is guided by the SQLAlchemy column type.
        """
        if pd.isna(value) or value == "":
            return None
        try:
            if isinstance(sa_type, Date):
                # Accept many input formats
                dt = pd.to_datetime(value, errors="coerce")
                return None if pd.isna(dt) else dt.date()
            if isinstance(sa_type, DateTime):
                # Normalize to UTC-naive for Postgres compatibility
                ts = pd.to_datetime(value, errors="coerce", utc=True)
                if pd.isna(ts):
                    return None
                py = ts.to_pydatetime()
                if getattr(py, "tzinfo", None) is not None:
                    py = py.astimezone(timezone.utc).replace(tzinfo=None)
                return py
            if isinstance(sa_type, (Integer, BigInteger)):
                return int(value)
            if isinstance(sa_type, Numeric):
                return Decimal(str(value))
        except Exception:
            return value
        # Trim string values to 50 characters before insert
        if isinstance(value, str):
            return value[:50]
        return value

    async def load(
        self, csv_path: str, mapping_path: str | None = None, chunk_size: int = 1000
    ) -> None:
        """Load a CSV into multiple OMOP tables based on a mapping file.

        Args:
            csv_path: Path to the input CSV file.
            mapping_path: Path to the JSON mapping file. Defaults to the
                package's ``mapping.default.json`` when not provided.
            chunk_size: Batch size for INSERT statements.
        """
        # If mapping path is None, load mapping.default.json from the current directory
        logger.info(f"Loading CSV data from {csv_path}")
        if mapping_path is None:
            mapping_path = str(Path(__file__).parent / "mapping.default.json")
        mapping = self._load_mapping(mapping_path)
        # Use low_memory=False to avoid DtypeWarning for mixed-type columns
        df = pd.read_csv(csv_path, low_memory=False)

        async with self._get_session() as session:
            # Relax constraint enforcement during bulk load on Postgres
            is_pg = False
            try:
                is_pg = str(self._engine.dialect.name).startswith("postgres")
            except Exception:
                is_pg = False
            if is_pg:
                try:
                    await session.execute(
                        text("SET session_replication_role = replica")
                    )
                except Exception:
                    pass

            try:
                conn = await session.connection()
                # Before reflecting, add a temporary person_id_text column to accept non-numeric IDs
                await self._add_person_id_text_columns(session)
                automap = await self._prepare_automap(conn)

                for tbl in mapping.get("tables", []):
                    table_name = tbl.get("name")
                    if not table_name:
                        continue
                    # obtain mapped class
                    try:
                        mapper = getattr(automap.classes, table_name)
                    except AttributeError:
                        raise ValueError(f"Table '{table_name}' not found in database.")

                    # compute filtered dataframe
                    df_tbl = self._apply_filters(df, tbl.get("filters"))
                    if df_tbl.empty:
                        continue

                    col_map: Dict[str, Any] = tbl.get("columns", {})
                    # Gather target SQLA column metadata
                    sa_cols = {c.name: c.type for c in mapper.__table__.columns}
                    sa_col_objs = {c.name: c for c in mapper.__table__.columns}

                    # Build records
                    records: List[Dict[str, Any]] = []
                    for _, row in df_tbl.iterrows():
                        rec: Dict[str, Any] = {}
                        for target_col, src in col_map.items():
                            if isinstance(src, dict) and "const" in src:
                                value = src["const"]
                            elif isinstance(src, str):
                                value = row.get(src)
                            else:
                                value = None

                            # IMPORTANT: keep person_id raw for staging logic below
                            if target_col != "person_id":
                                # Convert based on SA type if available
                                sa_t = sa_cols.get(target_col)
                                if sa_t is not None:
                                    value = self._convert_value(sa_t, value)
                            rec[target_col] = value

                        # If person_id exists in the record, route non-numeric values into person_id_text
                        if "person_id" in rec:
                            pid = rec.get("person_id")
                            if pid is None:
                                # If NOT NULL, use placeholder to avoid constraint errors
                                col = sa_col_objs.get("person_id")
                                if col is not None and not getattr(
                                    col, "nullable", True
                                ):
                                    rec["person_id"] = 0
                            elif isinstance(pid, int):
                                pass
                            elif isinstance(pid, str) and pid.strip().isdigit():
                                try:
                                    rec["person_id"] = int(pid.strip())
                                except Exception:
                                    # If conversion unexpectedly fails, send to text column
                                    if "person_id_text" in sa_cols:
                                        rec["person_id_text"] = str(pid)
                                    # Respect NOT NULL with placeholder when required
                                    col = sa_col_objs.get("person_id")
                                    if col is not None and not getattr(
                                        col, "nullable", True
                                    ):
                                        rec["person_id"] = 0
                                    else:
                                        rec["person_id"] = None
                            else:
                                # Non-numeric content: place into person_id_text if available
                                if "person_id_text" in sa_cols:
                                    rec["person_id_text"] = str(pid)
                                # Respect NOT NULL with placeholder when required
                                col = sa_col_objs.get("person_id")
                                if col is not None and not getattr(
                                    col, "nullable", True
                                ):
                                    rec["person_id"] = 0
                                else:
                                    rec["person_id"] = None
                        # Finally coerce all fields to the table's schema (string lengths, forced TEXT, datetimes)
                        rec = self._coerce_record_to_table_types(
                            mapper.__table__,
                            rec,
                            set(mapping.get("force_text_fields", [])),
                        )
                        records.append(rec)

                    if not records:
                        continue

                    stmt = insert(mapper)
                    # Chunked insert
                    for i in range(0, len(records), chunk_size):
                        batch = records[i : i + chunk_size]
                        await session.execute(stmt, batch)

                # Step 2: Normalize person_id FKs using person.person_id (not person_source_value)
                logger.info("Normalizing person_id foreign keys")
                await self.fix_person_id(session, automap)

                # Drop the temporary person_id_text columns now that person_id has been normalized
                await self._drop_person_id_text_columns(session)

                # Step 3: Backfill year/month/day of birth from birth_datetime where missing or zero
                logger.info("Backfilling person birth fields")
                await self.backfill_person_birth_fields(session, automap)

                # Step 4: Set gender_concept_id from gender_source_value using standard IDs
                logger.info("Setting person.gender_concept_id from gender_source_value")
                await self.update_person_gender_concept_id(session, automap)

                # Step 5: Apply concept mappings defined in the JSON mapping
                logger.info("Applying concept mappings")
                await self.apply_concept_mappings(session, automap, mapping)

                await session.commit()
            finally:
                if is_pg:
                    try:
                        await session.execute(
                            text("SET session_replication_role = origin")
                        )
                    except Exception:
                        pass
                await session.close()

    async def fix_person_id(self, session: AsyncSession, automap: AutomapBase) -> None:
        """
        Update all tables so that person_id foreign keys store the canonical
        person.person_id (integer), replacing any rows where person_id currently
        contains the person_source_value (string/UUID).

        Approach:
        - Build a mapping from person_source_value -> person_id from the person table.
        - For each table (except person) having a person_id column, run updates:
                    SET person_id = person.person_id WHERE CAST(person_id AS TEXT) = person_source_value.
                - This is safe for SQLite (used in examples). For stricter RDBMS, ensure types
                    are compatible or adjust as needed.
        """
        # Resolve person table from automap
        try:
            person_cls = getattr(automap.classes, "person")
        except AttributeError:
            return  # No person table; nothing to do

        person_table = person_cls.__table__

        # Build mapping of person_source_value -> person_id
        res = await session.execute(
            select(person_table.c.person_source_value, person_table.c.person_id).where(
                person_table.c.person_source_value.isnot(None)
            )
        )
        pairs = res.fetchall()
        if not pairs:
            return

        psv_to_id: Dict[str, int] = {}
        for psv, pid in pairs:
            if psv is None or pid is None:
                continue
            psv_to_id[str(psv)] = int(pid)

        if not psv_to_id:
            return

        # Iterate all tables and update person_id where it matches a known person_source_value
        # Also handle rows that staged a non-numeric value in person_id_text.
        # Avoid metadata.sorted_tables to prevent SAWarning about unresolvable cycles in vocab tables.
        for tbl_name, table in automap.metadata.tables.items():
            if tbl_name == person_table.name:
                continue
            if "person_id" not in table.c:
                continue

            # If person_id_text exists, prefer it for matching
            has_text = "person_id_text" in table.c

            # Run per-psv updates; small and explicit for clarity
            for psv, pid in psv_to_id.items():
                if has_text:
                    # Match on person_id_text
                    stmt = (
                        update(table)
                        .where(table.c.person_id_text == psv)
                        .values(person_id=pid)
                    )
                    await session.execute(stmt)
                # Also try matching where person_id was staged as string
                stmt2 = (
                    update(table)
                    .where(cast(table.c.person_id, String()) == psv)
                    .values(person_id=pid)
                )
                await session.execute(stmt2)

            # Clear person_id_text after normalization when column exists
            if has_text:
                try:
                    await session.execute(
                        update(table)
                        .where(table.c.person_id_text.isnot(None))
                        .values(person_id_text=None)
                    )
                except Exception:
                    pass

    async def update_person_gender_concept_id(
        self, session: AsyncSession, automap: AutomapBase
    ) -> None:
        """
            Update person.gender_concept_id from person.gender_source_value using static mapping:
            - male (or 'm')   -> 8507
            - female (or 'f') -> 8532
            - anything else   -> 0 (unknown)

        Only updates rows where the computed value differs from the current value
        or where gender_concept_id is NULL.
        """
        try:
            person_cls = getattr(automap.classes, "person")
        except AttributeError:
            return

        person_table = person_cls.__table__

        # Fetch rows to evaluate. We consider all rows with a non-null gender_source_value
        res = await session.execute(
            select(
                person_table.c.person_id,
                person_table.c.gender_source_value,
                person_table.c.gender_concept_id,
            ).where(person_table.c.gender_source_value.isnot(None))
        )

        rows = res.fetchall()
        if not rows:
            return

        def map_gender(val: str | None) -> int:
            if val is None:
                return 0
            s = str(val).strip().lower()
            if s in {"male", "m"}:
                return 8507
            if s in {"female", "f"}:
                return 8532
            return 0

        for pid, gsrc, gcid in rows:
            target = map_gender(gsrc)
            # Skip if already correct
            if gcid == target:
                continue
            stmt = (
                update(person_table)
                .where(person_table.c.person_id == pid)
                .values(gender_concept_id=target)
            )
            await session.execute(stmt)

    async def backfill_person_birth_fields(
        self, session: AsyncSession, automap: AutomapBase
    ) -> None:
        """
            In the person table, replace 0 or NULL values in year_of_birth, month_of_birth,
            and day_of_birth with values derived from birth_datetime.

        This runs in Python for portability across backends.
        """
        # Resolve person table from automap
        try:
            person_cls = getattr(automap.classes, "person")
        except AttributeError:
            return

        person_table = person_cls.__table__

        # Fetch necessary columns
        res = await session.execute(
            select(
                person_table.c.person_id,
                person_table.c.birth_datetime,
                person_table.c.year_of_birth,
                person_table.c.month_of_birth,
                person_table.c.day_of_birth,
            ).where(person_table.c.birth_datetime.isnot(None))
        )

        rows = res.fetchall()
        if not rows:
            return

        for pid, birth_dt, y, m, d in rows:
            # Parse birth_dt to a datetime if needed
            bd: Optional[datetime]
            if isinstance(birth_dt, datetime):
                # Normalize timezone-aware to UTC-naive
                if birth_dt.tzinfo is not None:
                    bd = birth_dt.astimezone(timezone.utc).replace(tzinfo=None)
                else:
                    bd = birth_dt
            elif isinstance(birth_dt, date):
                bd = datetime(birth_dt.year, birth_dt.month, birth_dt.day)
            else:
                try:
                    tmp = pd.to_datetime(birth_dt, errors="coerce", utc=True)
                    if pd.isna(tmp):
                        bd = None
                    else:
                        py = tmp.to_pydatetime()
                        bd = py.astimezone(timezone.utc).replace(tzinfo=None)
                except Exception:
                    bd = None

            if bd is None:
                continue

            new_y = y if (y is not None and int(y or 0) != 0) else bd.year
            new_m = m if (m is not None and int(m or 0) != 0) else bd.month
            new_d = d if (d is not None and int(d or 0) != 0) else bd.day

            # Only update when something changes
            if new_y != y or new_m != m or new_d != d:
                stmt = (
                    update(person_table)
                    .where(person_table.c.person_id == pid)
                    .values(
                        year_of_birth=new_y,
                        month_of_birth=new_m,
                        day_of_birth=new_d,
                    )
                )
                await session.execute(stmt)

    async def apply_concept_mappings(
        self,
        session: AsyncSession,
        automap: AutomapBase,
        mapping: Dict[str, Any],
    ) -> None:
        """
            Based on the "concept" key in the mapping JSON, populate target *_concept_id columns
            by looking up concept.concept_id using codes found in the specified source column.

            Rules:
        - If the source value is a comma-separated string, use only the first element for lookup.
        - Find by equality on concept.concept_code.
        - Update the target column with the matching concept.concept_id.
        """
        if not mapping or "concept" not in mapping:
            return

        # Resolve concept table
        try:
            concept_cls = getattr(automap.classes, "concept")
        except AttributeError:
            return

        concept_table = concept_cls.__table__

        # Simple in-memory code to cid mapping
        code_to_cid: Dict[str, Optional[int]] = {}

        async def lookup_concept_id(code: str) -> Optional[int]:
            if code in code_to_cid:
                return code_to_cid[code]
            res = await session.execute(
                select(concept_table.c.concept_id).where(
                    concept_table.c.concept_code == code
                )
            )
            row = res.first()
            cid = int(row[0]) if row and row[0] is not None else None
            code_to_cid[code] = cid
            return cid

        for item in mapping.get("concept", []):
            table_name = item.get("table")
            if not table_name:
                continue
            try:
                mapper = getattr(automap.classes, table_name)
            except AttributeError:
                # Target table not found; skip
                continue

            table = mapper.__table__
            pk_cols = list(table.primary_key.columns)
            if not pk_cols:
                # Cannot safely update without a primary key
                continue

            for m in item.get("mappings", []):
                source_col = m.get("source")
                target_col = m.get("target")
                if not source_col or not target_col:
                    continue
                if source_col not in table.c or target_col not in table.c:
                    continue

                # Fetch candidate rows: target is NULL or 0, and source is not NULL/empty
                res = await session.execute(
                    select(
                        *pk_cols,
                        table.c[source_col].label("_src"),
                        table.c[target_col].label("_tgt"),
                    ).where(
                        or_(
                            table.c[target_col].is_(None),
                            table.c[target_col] == 0,
                        ),
                        table.c[source_col].isnot(None),
                    )
                )

                rows = res.fetchall()
                if not rows:
                    continue

                for row in rows:
                    # row is a tuple: (*pk_vals, _src, _tgt)
                    pk_vals = row[: len(pk_cols)]
                    src_val = row[len(pk_cols)]

                    # Only care about non-empty strings; if comma-separated, take first element
                    code: Optional[str] = None
                    if isinstance(src_val, str):
                        # Split on comma and strip whitespace
                        first = src_val.split(",")[0].strip()
                        code = first if first else None
                    elif isinstance(src_val, list) and src_val:
                        # If a list somehow made it into the DB, use first element's string
                        code = str(src_val[0])
                    else:
                        # Fallback to simple string conversion if it's a scalar
                        code = str(src_val) if src_val is not None else None

                    if not code:
                        continue

                    cid = await lookup_concept_id(code)
                    if cid is None:
                        continue

                    # Build WHERE with PK columns
                    where_clause = and_(
                        *[
                            (pk_col == pk_val)
                            for pk_col, pk_val in zip(pk_cols, pk_vals)
                        ]
                    )

                    stmt = update(table).where(where_clause).values({target_col: cid})
                    await session.execute(stmt)

__init__(cdm_engine_factory, version='cdm54')

Create a loader bound to a specific database engine.

Parameters:

Name Type Description Default
cdm_engine_factory

An initialized CdmEngineFactory.

required
version str

OMOP CDM version label ("cdm54" or "cdm6").

'cdm54'
Source code in src/pyomop/loader.py
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, cdm_engine_factory, version: str = "cdm54") -> None:
    """Create a loader bound to a specific database engine.

    Args:
        cdm_engine_factory: An initialized ``CdmEngineFactory``.
        version: OMOP CDM version label ("cdm54" or "cdm6").
    """
    self._cdm = cdm_engine_factory
    self._engine = cdm_engine_factory.engine
    self._maker = async_sessionmaker(self._engine, class_=AsyncSession)
    self._scope = async_scoped_session(self._maker, scopefunc=asyncio.current_task)
    self._version = version

apply_concept_mappings(session, automap, mapping) async

Based on the "concept" key in the mapping JSON, populate target *_concept_id columns
by looking up concept.concept_id using codes found in the specified source column.

Rules:
  • If the source value is a comma-separated string, use only the first element for lookup.
  • Find by equality on concept.concept_code.
  • Update the target column with the matching concept.concept_id.
Source code in src/pyomop/loader.py
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
async def apply_concept_mappings(
    self,
    session: AsyncSession,
    automap: AutomapBase,
    mapping: Dict[str, Any],
) -> None:
    """
        Based on the "concept" key in the mapping JSON, populate target *_concept_id columns
        by looking up concept.concept_id using codes found in the specified source column.

        Rules:
    - If the source value is a comma-separated string, use only the first element for lookup.
    - Find by equality on concept.concept_code.
    - Update the target column with the matching concept.concept_id.
    """
    if not mapping or "concept" not in mapping:
        return

    # Resolve concept table
    try:
        concept_cls = getattr(automap.classes, "concept")
    except AttributeError:
        return

    concept_table = concept_cls.__table__

    # Simple in-memory code to cid mapping
    code_to_cid: Dict[str, Optional[int]] = {}

    async def lookup_concept_id(code: str) -> Optional[int]:
        if code in code_to_cid:
            return code_to_cid[code]
        res = await session.execute(
            select(concept_table.c.concept_id).where(
                concept_table.c.concept_code == code
            )
        )
        row = res.first()
        cid = int(row[0]) if row and row[0] is not None else None
        code_to_cid[code] = cid
        return cid

    for item in mapping.get("concept", []):
        table_name = item.get("table")
        if not table_name:
            continue
        try:
            mapper = getattr(automap.classes, table_name)
        except AttributeError:
            # Target table not found; skip
            continue

        table = mapper.__table__
        pk_cols = list(table.primary_key.columns)
        if not pk_cols:
            # Cannot safely update without a primary key
            continue

        for m in item.get("mappings", []):
            source_col = m.get("source")
            target_col = m.get("target")
            if not source_col or not target_col:
                continue
            if source_col not in table.c or target_col not in table.c:
                continue

            # Fetch candidate rows: target is NULL or 0, and source is not NULL/empty
            res = await session.execute(
                select(
                    *pk_cols,
                    table.c[source_col].label("_src"),
                    table.c[target_col].label("_tgt"),
                ).where(
                    or_(
                        table.c[target_col].is_(None),
                        table.c[target_col] == 0,
                    ),
                    table.c[source_col].isnot(None),
                )
            )

            rows = res.fetchall()
            if not rows:
                continue

            for row in rows:
                # row is a tuple: (*pk_vals, _src, _tgt)
                pk_vals = row[: len(pk_cols)]
                src_val = row[len(pk_cols)]

                # Only care about non-empty strings; if comma-separated, take first element
                code: Optional[str] = None
                if isinstance(src_val, str):
                    # Split on comma and strip whitespace
                    first = src_val.split(",")[0].strip()
                    code = first if first else None
                elif isinstance(src_val, list) and src_val:
                    # If a list somehow made it into the DB, use first element's string
                    code = str(src_val[0])
                else:
                    # Fallback to simple string conversion if it's a scalar
                    code = str(src_val) if src_val is not None else None

                if not code:
                    continue

                cid = await lookup_concept_id(code)
                if cid is None:
                    continue

                # Build WHERE with PK columns
                where_clause = and_(
                    *[
                        (pk_col == pk_val)
                        for pk_col, pk_val in zip(pk_cols, pk_vals)
                    ]
                )

                stmt = update(table).where(where_clause).values({target_col: cid})
                await session.execute(stmt)

backfill_person_birth_fields(session, automap) async

In the person table, replace 0 or NULL values in year_of_birth, month_of_birth,
and day_of_birth with values derived from birth_datetime.

This runs in Python for portability across backends.

Source code in src/pyomop/loader.py
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
async def backfill_person_birth_fields(
    self, session: AsyncSession, automap: AutomapBase
) -> None:
    """
        In the person table, replace 0 or NULL values in year_of_birth, month_of_birth,
        and day_of_birth with values derived from birth_datetime.

    This runs in Python for portability across backends.
    """
    # Resolve person table from automap
    try:
        person_cls = getattr(automap.classes, "person")
    except AttributeError:
        return

    person_table = person_cls.__table__

    # Fetch necessary columns
    res = await session.execute(
        select(
            person_table.c.person_id,
            person_table.c.birth_datetime,
            person_table.c.year_of_birth,
            person_table.c.month_of_birth,
            person_table.c.day_of_birth,
        ).where(person_table.c.birth_datetime.isnot(None))
    )

    rows = res.fetchall()
    if not rows:
        return

    for pid, birth_dt, y, m, d in rows:
        # Parse birth_dt to a datetime if needed
        bd: Optional[datetime]
        if isinstance(birth_dt, datetime):
            # Normalize timezone-aware to UTC-naive
            if birth_dt.tzinfo is not None:
                bd = birth_dt.astimezone(timezone.utc).replace(tzinfo=None)
            else:
                bd = birth_dt
        elif isinstance(birth_dt, date):
            bd = datetime(birth_dt.year, birth_dt.month, birth_dt.day)
        else:
            try:
                tmp = pd.to_datetime(birth_dt, errors="coerce", utc=True)
                if pd.isna(tmp):
                    bd = None
                else:
                    py = tmp.to_pydatetime()
                    bd = py.astimezone(timezone.utc).replace(tzinfo=None)
            except Exception:
                bd = None

        if bd is None:
            continue

        new_y = y if (y is not None and int(y or 0) != 0) else bd.year
        new_m = m if (m is not None and int(m or 0) != 0) else bd.month
        new_d = d if (d is not None and int(d or 0) != 0) else bd.day

        # Only update when something changes
        if new_y != y or new_m != m or new_d != d:
            stmt = (
                update(person_table)
                .where(person_table.c.person_id == pid)
                .values(
                    year_of_birth=new_y,
                    month_of_birth=new_m,
                    day_of_birth=new_d,
                )
            )
            await session.execute(stmt)

fix_person_id(session, automap) async

Update all tables so that person_id foreign keys store the canonical person.person_id (integer), replacing any rows where person_id currently contains the person_source_value (string/UUID).

Approach: - Build a mapping from person_source_value -> person_id from the person table. - For each table (except person) having a person_id column, run updates: SET person_id = person.person_id WHERE CAST(person_id AS TEXT) = person_source_value. - This is safe for SQLite (used in examples). For stricter RDBMS, ensure types are compatible or adjust as needed.

Source code in src/pyomop/loader.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
async def fix_person_id(self, session: AsyncSession, automap: AutomapBase) -> None:
    """
    Update all tables so that person_id foreign keys store the canonical
    person.person_id (integer), replacing any rows where person_id currently
    contains the person_source_value (string/UUID).

    Approach:
    - Build a mapping from person_source_value -> person_id from the person table.
    - For each table (except person) having a person_id column, run updates:
                SET person_id = person.person_id WHERE CAST(person_id AS TEXT) = person_source_value.
            - This is safe for SQLite (used in examples). For stricter RDBMS, ensure types
                are compatible or adjust as needed.
    """
    # Resolve person table from automap
    try:
        person_cls = getattr(automap.classes, "person")
    except AttributeError:
        return  # No person table; nothing to do

    person_table = person_cls.__table__

    # Build mapping of person_source_value -> person_id
    res = await session.execute(
        select(person_table.c.person_source_value, person_table.c.person_id).where(
            person_table.c.person_source_value.isnot(None)
        )
    )
    pairs = res.fetchall()
    if not pairs:
        return

    psv_to_id: Dict[str, int] = {}
    for psv, pid in pairs:
        if psv is None or pid is None:
            continue
        psv_to_id[str(psv)] = int(pid)

    if not psv_to_id:
        return

    # Iterate all tables and update person_id where it matches a known person_source_value
    # Also handle rows that staged a non-numeric value in person_id_text.
    # Avoid metadata.sorted_tables to prevent SAWarning about unresolvable cycles in vocab tables.
    for tbl_name, table in automap.metadata.tables.items():
        if tbl_name == person_table.name:
            continue
        if "person_id" not in table.c:
            continue

        # If person_id_text exists, prefer it for matching
        has_text = "person_id_text" in table.c

        # Run per-psv updates; small and explicit for clarity
        for psv, pid in psv_to_id.items():
            if has_text:
                # Match on person_id_text
                stmt = (
                    update(table)
                    .where(table.c.person_id_text == psv)
                    .values(person_id=pid)
                )
                await session.execute(stmt)
            # Also try matching where person_id was staged as string
            stmt2 = (
                update(table)
                .where(cast(table.c.person_id, String()) == psv)
                .values(person_id=pid)
            )
            await session.execute(stmt2)

        # Clear person_id_text after normalization when column exists
        if has_text:
            try:
                await session.execute(
                    update(table)
                    .where(table.c.person_id_text.isnot(None))
                    .values(person_id_text=None)
                )
            except Exception:
                pass

load(csv_path, mapping_path=None, chunk_size=1000) async

Load a CSV into multiple OMOP tables based on a mapping file.

Parameters:

Name Type Description Default
csv_path str

Path to the input CSV file.

required
mapping_path str | None

Path to the JSON mapping file. Defaults to the package's mapping.default.json when not provided.

None
chunk_size int

Batch size for INSERT statements.

1000
Source code in src/pyomop/loader.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
async def load(
    self, csv_path: str, mapping_path: str | None = None, chunk_size: int = 1000
) -> None:
    """Load a CSV into multiple OMOP tables based on a mapping file.

    Args:
        csv_path: Path to the input CSV file.
        mapping_path: Path to the JSON mapping file. Defaults to the
            package's ``mapping.default.json`` when not provided.
        chunk_size: Batch size for INSERT statements.
    """
    # If mapping path is None, load mapping.default.json from the current directory
    logger.info(f"Loading CSV data from {csv_path}")
    if mapping_path is None:
        mapping_path = str(Path(__file__).parent / "mapping.default.json")
    mapping = self._load_mapping(mapping_path)
    # Use low_memory=False to avoid DtypeWarning for mixed-type columns
    df = pd.read_csv(csv_path, low_memory=False)

    async with self._get_session() as session:
        # Relax constraint enforcement during bulk load on Postgres
        is_pg = False
        try:
            is_pg = str(self._engine.dialect.name).startswith("postgres")
        except Exception:
            is_pg = False
        if is_pg:
            try:
                await session.execute(
                    text("SET session_replication_role = replica")
                )
            except Exception:
                pass

        try:
            conn = await session.connection()
            # Before reflecting, add a temporary person_id_text column to accept non-numeric IDs
            await self._add_person_id_text_columns(session)
            automap = await self._prepare_automap(conn)

            for tbl in mapping.get("tables", []):
                table_name = tbl.get("name")
                if not table_name:
                    continue
                # obtain mapped class
                try:
                    mapper = getattr(automap.classes, table_name)
                except AttributeError:
                    raise ValueError(f"Table '{table_name}' not found in database.")

                # compute filtered dataframe
                df_tbl = self._apply_filters(df, tbl.get("filters"))
                if df_tbl.empty:
                    continue

                col_map: Dict[str, Any] = tbl.get("columns", {})
                # Gather target SQLA column metadata
                sa_cols = {c.name: c.type for c in mapper.__table__.columns}
                sa_col_objs = {c.name: c for c in mapper.__table__.columns}

                # Build records
                records: List[Dict[str, Any]] = []
                for _, row in df_tbl.iterrows():
                    rec: Dict[str, Any] = {}
                    for target_col, src in col_map.items():
                        if isinstance(src, dict) and "const" in src:
                            value = src["const"]
                        elif isinstance(src, str):
                            value = row.get(src)
                        else:
                            value = None

                        # IMPORTANT: keep person_id raw for staging logic below
                        if target_col != "person_id":
                            # Convert based on SA type if available
                            sa_t = sa_cols.get(target_col)
                            if sa_t is not None:
                                value = self._convert_value(sa_t, value)
                        rec[target_col] = value

                    # If person_id exists in the record, route non-numeric values into person_id_text
                    if "person_id" in rec:
                        pid = rec.get("person_id")
                        if pid is None:
                            # If NOT NULL, use placeholder to avoid constraint errors
                            col = sa_col_objs.get("person_id")
                            if col is not None and not getattr(
                                col, "nullable", True
                            ):
                                rec["person_id"] = 0
                        elif isinstance(pid, int):
                            pass
                        elif isinstance(pid, str) and pid.strip().isdigit():
                            try:
                                rec["person_id"] = int(pid.strip())
                            except Exception:
                                # If conversion unexpectedly fails, send to text column
                                if "person_id_text" in sa_cols:
                                    rec["person_id_text"] = str(pid)
                                # Respect NOT NULL with placeholder when required
                                col = sa_col_objs.get("person_id")
                                if col is not None and not getattr(
                                    col, "nullable", True
                                ):
                                    rec["person_id"] = 0
                                else:
                                    rec["person_id"] = None
                        else:
                            # Non-numeric content: place into person_id_text if available
                            if "person_id_text" in sa_cols:
                                rec["person_id_text"] = str(pid)
                            # Respect NOT NULL with placeholder when required
                            col = sa_col_objs.get("person_id")
                            if col is not None and not getattr(
                                col, "nullable", True
                            ):
                                rec["person_id"] = 0
                            else:
                                rec["person_id"] = None
                    # Finally coerce all fields to the table's schema (string lengths, forced TEXT, datetimes)
                    rec = self._coerce_record_to_table_types(
                        mapper.__table__,
                        rec,
                        set(mapping.get("force_text_fields", [])),
                    )
                    records.append(rec)

                if not records:
                    continue

                stmt = insert(mapper)
                # Chunked insert
                for i in range(0, len(records), chunk_size):
                    batch = records[i : i + chunk_size]
                    await session.execute(stmt, batch)

            # Step 2: Normalize person_id FKs using person.person_id (not person_source_value)
            logger.info("Normalizing person_id foreign keys")
            await self.fix_person_id(session, automap)

            # Drop the temporary person_id_text columns now that person_id has been normalized
            await self._drop_person_id_text_columns(session)

            # Step 3: Backfill year/month/day of birth from birth_datetime where missing or zero
            logger.info("Backfilling person birth fields")
            await self.backfill_person_birth_fields(session, automap)

            # Step 4: Set gender_concept_id from gender_source_value using standard IDs
            logger.info("Setting person.gender_concept_id from gender_source_value")
            await self.update_person_gender_concept_id(session, automap)

            # Step 5: Apply concept mappings defined in the JSON mapping
            logger.info("Applying concept mappings")
            await self.apply_concept_mappings(session, automap, mapping)

            await session.commit()
        finally:
            if is_pg:
                try:
                    await session.execute(
                        text("SET session_replication_role = origin")
                    )
                except Exception:
                    pass
            await session.close()

update_person_gender_concept_id(session, automap) async

Update person.gender_concept_id from person.gender_source_value using static mapping:
- male (or 'm')   -> 8507
- female (or 'f') -> 8532
- anything else   -> 0 (unknown)

Only updates rows where the computed value differs from the current value or where gender_concept_id is NULL.

Source code in src/pyomop/loader.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
async def update_person_gender_concept_id(
    self, session: AsyncSession, automap: AutomapBase
) -> None:
    """
        Update person.gender_concept_id from person.gender_source_value using static mapping:
        - male (or 'm')   -> 8507
        - female (or 'f') -> 8532
        - anything else   -> 0 (unknown)

    Only updates rows where the computed value differs from the current value
    or where gender_concept_id is NULL.
    """
    try:
        person_cls = getattr(automap.classes, "person")
    except AttributeError:
        return

    person_table = person_cls.__table__

    # Fetch rows to evaluate. We consider all rows with a non-null gender_source_value
    res = await session.execute(
        select(
            person_table.c.person_id,
            person_table.c.gender_source_value,
            person_table.c.gender_concept_id,
        ).where(person_table.c.gender_source_value.isnot(None))
    )

    rows = res.fetchall()
    if not rows:
        return

    def map_gender(val: str | None) -> int:
        if val is None:
            return 0
        s = str(val).strip().lower()
        if s in {"male", "m"}:
            return 8507
        if s in {"female", "f"}:
            return 8532
        return 0

    for pid, gsrc, gcid in rows:
        target = map_gender(gsrc)
        # Skip if already correct
        if gcid == target:
            continue
        stmt = (
            update(person_table)
            .where(person_table.c.person_id == pid)
            .values(gender_concept_id=target)
        )
        await session.execute(stmt)

Command-line interface for pyomop.

Provides commands to create CDM tables, load vocabulary CSVs, and import FHIR Bulk Export data into an OMOP database.

main_routine()

Top-level runner used by python -m pyomop.

Source code in src/pyomop/main.py
136
137
138
139
140
141
def main_routine():
    """Top-level runner used by ``python -m pyomop``."""
    click.echo("_________________________________________")
    click.echo("Pyomop v" + __version__ + " by Bell Eapen ( https://nuchange.ca ) ")
    cli()  # run the main function
    click.echo("Pyomop done.")

Vocabulary utilities for loading and querying OMOP vocab tables.

Provides helpers to import vocabulary CSVs into the database and to look up concepts by id or code. Uses async SQLAlchemy sessions.

CdmVocabulary

Bases: object

Helpers for OMOP Vocabulary management and lookups.

Parameters:

Name Type Description Default
cdm

An initialized CdmEngineFactory instance.

required
version

CDM version string ("cdm54" or "cdm6"). Defaults to "cdm54".

'cdm54'
Source code in src/pyomop/vocabulary.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
class CdmVocabulary(object):
    """Helpers for OMOP Vocabulary management and lookups.

    Args:
        cdm: An initialized ``CdmEngineFactory`` instance.
        version: CDM version string ("cdm54" or "cdm6"). Defaults to "cdm54".
    """

    def __init__(self, cdm, version="cdm54"):
        self._concept_id = 0
        self._concept_name = ""
        self._domain_id = ""
        self._vocabulary_id = ""
        self._concept_class_id = ""
        self._concept_code = ""
        self._cdm = cdm
        self._engine = cdm.engine
        self._maker = async_sessionmaker(self._engine, class_=AsyncSession)
        self._scope = async_scoped_session(self._maker, scopefunc=asyncio.current_task)
        self._version = version

    @property
    def concept_id(self):
        """Current concept_id for this helper (if set)."""
        return self._concept_id

    @property
    def concept_code(self):
        """Current concept_code for this helper (if set)."""
        return self._concept_code

    @property
    def concept_name(self):
        """Current concept_name for this helper (if set)."""
        return self._concept_name

    @property
    def vocabulary_id(self):
        """Current vocabulary_id for this helper (if set)."""
        return self._vocabulary_id

    @property
    def domain_id(self):
        """Current domain_id for this helper (if set)."""
        return self._domain_id

    @concept_id.setter
    def concept_id(self, concept_id):
        """Set the active concept context by concept_id.

        Side effects: populates concept name, domain, vocabulary, class, and code
        on this helper instance for convenience.

        Args:
            concept_id: The concept_id to fetch and set.
        """
        self._concept_id = concept_id
        _concept = asyncio.run(self.get_concept(concept_id))
        self._concept_name = _concept.concept_name
        self._domain_id = _concept.domain_id
        self._vocabulary_id = _concept.vocabulary_id
        self._concept_class_id = _concept.concept_class_id
        self._concept_code = _concept.concept_code

    async def get_concept(self, concept_id):
        """Fetch a concept row by id.

        Args:
            concept_id: Concept identifier.

        Returns:
            The ORM Concept instance.
        """
        if self._version == "cdm6":
            from .cdm6 import Concept
        else:
            from .cdm54 import Concept
        stmt = select(Concept).where(Concept.concept_id == concept_id)
        async with self._cdm.session() as session:
            _concept = await session.execute(stmt)
        return _concept.scalar_one()

    async def get_concept_by_code(self, concept_code, vocabulary_id):
        """Fetch a concept by code within a vocabulary.

        Args:
            concept_code: The vocabulary-specific code string.
            vocabulary_id: Vocabulary identifier (e.g., 'SNOMED', 'LOINC').

        Returns:
            The ORM Concept instance.
        """
        if self._version == "cdm6":
            from .cdm6 import Concept
        else:
            from .cdm54 import Concept
        stmt = (
            select(Concept)
            .where(Concept.concept_code == concept_code)
            .where(Concept.vocabulary_id == vocabulary_id)
        )
        async with self._cdm.session() as session:
            _concept = await session.execute(stmt)
        return _concept.scalar_one()

    def set_concept(self, concept_code, vocabulary_id=None):
        """Set the active concept context by code and vocabulary.

        Args:
            concept_code: The concept code string to resolve.
            vocabulary_id: Vocabulary identifier. Required.

        Notes:
            On success, populates concept fields on this instance. On failure,
            sets ``_vocabulary_id`` and ``_concept_id`` to 0.
        """
        self._concept_code = concept_code
        try:
            if vocabulary_id is not None:
                self._vocabulary_id = vocabulary_id
                _concept = asyncio.run(
                    self.get_concept_by_code(concept_code, vocabulary_id)
                )
            else:
                raise ValueError(
                    "vocabulary_id must be provided when setting concept by code."
                )

            self._concept_name = _concept.concept_name
            self._domain_id = _concept.domain_id
            self._concept_id = _concept.concept_id
            self._concept_class_id = _concept.concept_class_id
            self._concept_code = _concept.concept_code

        except:
            self._vocabulary_id = 0
            self._concept_id = 0

    async def create_vocab(self, folder, sample=None):
        """Load vocabulary CSV files from a folder into the database.

        This imports the standard OMOP vocab tables (drug_strength, concept,
        concept_relationship, concept_ancestor, concept_synonym, vocabulary,
        relationship, concept_class, domain).

        Args:
            folder: Path to the folder containing OMOP vocabulary CSVs.
            sample: Optional number of rows to limit per file during import.
        """
        try:
            # Parents first (for concept FKs): DOMAIN, CONCEPT_CLASS
            df = pd.read_csv(
                folder + "/DOMAIN.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            await self.write_vocab(df, "domain", "replace")

            df = pd.read_csv(
                folder + "/CONCEPT_CLASS.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            await self.write_vocab(df, "concept_class", "replace")

            # Then CONCEPT (uses domain_id and concept_class_id)
            df = pd.read_csv(
                folder + "/CONCEPT.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            df["valid_start_date"] = pd.to_datetime(
                df["valid_start_date"], errors="coerce"
            )
            df["valid_end_date"] = pd.to_datetime(df["valid_end_date"], errors="coerce")
            await self.write_vocab(df, "concept", "replace")

            # Then VOCABULARY (uses vocabulary_concept_id -> concept)
            df = pd.read_csv(
                folder + "/VOCABULARY.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            await self.write_vocab(df, "vocabulary", "replace")

            # Relationship depends on concept for relationship_concept_id
            df = pd.read_csv(
                folder + "/RELATIONSHIP.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            await self.write_vocab(df, "relationship", "replace")

            # Post-concept tables
            df = pd.read_csv(
                folder + "/CONCEPT_RELATIONSHIP.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            df["valid_start_date"] = pd.to_datetime(
                df["valid_start_date"], errors="coerce"
            )
            df["valid_end_date"] = pd.to_datetime(df["valid_end_date"], errors="coerce")
            await self.write_vocab(df, "concept_relationship", "replace")

            df = pd.read_csv(
                folder + "/CONCEPT_SYNONYM.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            await self.write_vocab(df, "concept_synonym", "replace")

            df = pd.read_csv(
                folder + "/DRUG_STRENGTH.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            df["valid_start_date"] = pd.to_datetime(
                df["valid_start_date"], errors="coerce"
            )
            df["valid_end_date"] = pd.to_datetime(df["valid_end_date"], errors="coerce")
            await self.write_vocab(df, "drug_strength", "replace")

            df = pd.read_csv(
                folder + "/CONCEPT_ANCESTOR.csv",
                sep="\t",
                nrows=sample,
                on_bad_lines="skip",
                low_memory=False,
            )
            await self.write_vocab(df, "concept_ancestor", "replace")
        except Exception as e:
            logger.error(f"An error occurred while creating the vocabulary: {e}")

    @asynccontextmanager
    async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Yield an async session bound to the current engine.

        Yields:
            AsyncSession: An async SQLAlchemy session.
        """
        async with self._scope() as session:
            yield session

    async def write_vocab(self, df, table, if_exists="replace", chunk_size=1000):
        """Write a DataFrame to a vocabulary table with type-safe defaults.

        Ensures required columns exist with reasonable defaults, coerces types,
        and performs chunked inserts via SQLAlchemy core for performance.

        Args:
            df: Pandas DataFrame with data to insert.
            table: Target table name (e.g., 'concept').
            if_exists: Compatibility only. This method always inserts.
            chunk_size: Number of rows per batch insert.
        """
        async with self.get_session() as session:
            # For PostgreSQL, temporarily relax constraint enforcement during bulk loads
            is_pg = False
            try:
                is_pg = self._engine.dialect.name.startswith("postgres")
            except Exception:
                is_pg = False
            if is_pg:
                logger.info(
                    "Temporarily disabling replication role for bulk load on postgres"
                )
                try:
                    await session.execute(
                        text("SET session_replication_role = replica")
                    )
                except Exception:
                    # Ignore if not permitted or unsupported
                    logger.warning("Failed to set session_replication_role to replica")

            conn = await session.connection()
            automap: AutomapBase = automap_base()

            def prepare_automap(sync_conn):
                automap.prepare(autoload_with=sync_conn)

            await conn.run_sync(prepare_automap)
            mapper = getattr(automap.classes, table)

            # Build defaults for non-nullable columns based on SQL types
            sa_cols = {c.name: c for c in mapper.__table__.columns}

            def default_for(col):
                from sqlalchemy import (
                    BigInteger,
                    Date,
                    DateTime,
                    Integer,
                    Numeric,
                    String,
                    Text,
                )

                t = col.type
                if isinstance(t, (Integer, BigInteger)):
                    return 0
                if isinstance(t, Numeric):
                    return 0
                if isinstance(t, (String, Text)):
                    return "UNKNOWN"
                if isinstance(t, Date):
                    return date(1970, 1, 1)
                if isinstance(t, DateTime):
                    return datetime(1970, 1, 1)
                return None

            # Work on a copy so we can normalize types and fill required fields
            df2 = df.copy()

            for name, col in sa_cols.items():
                # Ensure column exists
                if name not in df2.columns:
                    # For nullable columns, start with None; for required, use default
                    df2[name] = None if col.nullable else default_for(col)
                    continue

                # Coerce types and handle missing values
                if str(df2[name].dtype) == "object":
                    # Treat empty strings as missing
                    df2[name] = df2[name].replace("", np.nan)

                from sqlalchemy import BigInteger
                from sqlalchemy import Date as SA_Date
                from sqlalchemy import DateTime as SA_DateTime
                from sqlalchemy import Integer, Numeric, String, Text

                t = col.type
                if isinstance(t, SA_Date):
                    ser = pd.to_datetime(df2[name], errors="coerce").dt.date
                    df2[name] = (
                        ser.where(pd.notna(ser), None)
                        if col.nullable
                        else ser.fillna(default_for(col))
                    )
                elif isinstance(t, SA_DateTime):
                    # Normalize to UTC-naive to avoid tz-aware vs tz-naive issues in Postgres
                    ser = pd.to_datetime(df2[name], errors="coerce", utc=True)

                    # Convert to Python datetime and drop tzinfo
                    def _to_naive(dt):
                        try:
                            if pd.isna(dt):
                                return None
                        except Exception:
                            pass
                        if hasattr(dt, "to_pydatetime"):
                            py = dt.to_pydatetime()
                        else:
                            py = dt
                        if getattr(py, "tzinfo", None) is not None:
                            py = (
                                py.tz_convert("UTC").tz_localize(None)
                                if hasattr(py, "tz_convert")
                                else py.replace(tzinfo=None)
                            )
                        return py

                    ser = ser.map(_to_naive)
                    df2[name] = (
                        ser.where(pd.notna(ser), None)
                        if col.nullable
                        else ser.fillna(default_for(col))
                    )
                elif isinstance(t, (Integer, BigInteger)):
                    ser = pd.to_numeric(df2[name], errors="coerce")
                    df2[name] = (
                        ser.where(pd.notna(ser), None)
                        if col.nullable
                        else ser.fillna(default_for(col))
                    )
                elif isinstance(t, Numeric):
                    ser = pd.to_numeric(df2[name], errors="coerce")
                    df2[name] = (
                        ser.where(pd.notna(ser), None)
                        if col.nullable
                        else ser.fillna(default_for(col))
                    )
                elif isinstance(t, (String, Text)):
                    # Only cast non-null values to str and trim; keep nulls as None
                    ser = df2[name].astype(object)
                    mask = ser.notna()
                    ser.loc[mask] = ser.loc[mask].astype(str).str.slice(0, 255)
                    if col.nullable:
                        ser = ser.where(pd.notna(ser), None)
                    else:
                        # Required string columns get a default
                        ser = ser.where(pd.notna(ser), default_for(col))
                    df2[name] = ser
                else:
                    # Fallback: ensure NaN/NaT -> None for nullable cols, else fill default
                    df2[name] = (
                        df2[name].where(pd.notna(df2[name]), None)
                        if col.nullable
                        else df2[name].fillna(default_for(col))
                    )

            # Final safety pass: replace any remaining NaN/NaT with None across all columns
            df2 = df2.where(pd.notna(df2), None)

            stmt = insert(mapper)

            try:
                for _, group in df2.groupby(
                    np.arange(df2.shape[0], dtype=int) // chunk_size
                ):
                    records = group.to_dict("records")
                    try:
                        # Fast path: batch insert
                        await session.execute(stmt, records)
                    except Exception:
                        logger.warning(
                            "Batch insert failed, falling back to row-by-row insert."
                        )
                        # Fallback: insert row-by-row, skipping bad rows
                        for row in records:
                            try:
                                await session.execute(stmt, [row])
                            except Exception:
                                # Ignore duplicates/FK issues per row
                                logger.warning(
                                    f"Failed to insert row: {row}. Skipping."
                                )
                                continue
                    # Commit after each group
                    await session.commit()
            finally:
                if is_pg:
                    try:
                        await session.execute(
                            text("SET session_replication_role = origin")
                        )
                    except Exception:
                        pass

concept_code property

Current concept_code for this helper (if set).

concept_id property writable

Current concept_id for this helper (if set).

concept_name property

Current concept_name for this helper (if set).

domain_id property

Current domain_id for this helper (if set).

vocabulary_id property

Current vocabulary_id for this helper (if set).

create_vocab(folder, sample=None) async

Load vocabulary CSV files from a folder into the database.

This imports the standard OMOP vocab tables (drug_strength, concept, concept_relationship, concept_ancestor, concept_synonym, vocabulary, relationship, concept_class, domain).

Parameters:

Name Type Description Default
folder

Path to the folder containing OMOP vocabulary CSVs.

required
sample

Optional number of rows to limit per file during import.

None
Source code in src/pyomop/vocabulary.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
async def create_vocab(self, folder, sample=None):
    """Load vocabulary CSV files from a folder into the database.

    This imports the standard OMOP vocab tables (drug_strength, concept,
    concept_relationship, concept_ancestor, concept_synonym, vocabulary,
    relationship, concept_class, domain).

    Args:
        folder: Path to the folder containing OMOP vocabulary CSVs.
        sample: Optional number of rows to limit per file during import.
    """
    try:
        # Parents first (for concept FKs): DOMAIN, CONCEPT_CLASS
        df = pd.read_csv(
            folder + "/DOMAIN.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        await self.write_vocab(df, "domain", "replace")

        df = pd.read_csv(
            folder + "/CONCEPT_CLASS.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        await self.write_vocab(df, "concept_class", "replace")

        # Then CONCEPT (uses domain_id and concept_class_id)
        df = pd.read_csv(
            folder + "/CONCEPT.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        df["valid_start_date"] = pd.to_datetime(
            df["valid_start_date"], errors="coerce"
        )
        df["valid_end_date"] = pd.to_datetime(df["valid_end_date"], errors="coerce")
        await self.write_vocab(df, "concept", "replace")

        # Then VOCABULARY (uses vocabulary_concept_id -> concept)
        df = pd.read_csv(
            folder + "/VOCABULARY.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        await self.write_vocab(df, "vocabulary", "replace")

        # Relationship depends on concept for relationship_concept_id
        df = pd.read_csv(
            folder + "/RELATIONSHIP.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        await self.write_vocab(df, "relationship", "replace")

        # Post-concept tables
        df = pd.read_csv(
            folder + "/CONCEPT_RELATIONSHIP.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        df["valid_start_date"] = pd.to_datetime(
            df["valid_start_date"], errors="coerce"
        )
        df["valid_end_date"] = pd.to_datetime(df["valid_end_date"], errors="coerce")
        await self.write_vocab(df, "concept_relationship", "replace")

        df = pd.read_csv(
            folder + "/CONCEPT_SYNONYM.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        await self.write_vocab(df, "concept_synonym", "replace")

        df = pd.read_csv(
            folder + "/DRUG_STRENGTH.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        df["valid_start_date"] = pd.to_datetime(
            df["valid_start_date"], errors="coerce"
        )
        df["valid_end_date"] = pd.to_datetime(df["valid_end_date"], errors="coerce")
        await self.write_vocab(df, "drug_strength", "replace")

        df = pd.read_csv(
            folder + "/CONCEPT_ANCESTOR.csv",
            sep="\t",
            nrows=sample,
            on_bad_lines="skip",
            low_memory=False,
        )
        await self.write_vocab(df, "concept_ancestor", "replace")
    except Exception as e:
        logger.error(f"An error occurred while creating the vocabulary: {e}")

get_concept(concept_id) async

Fetch a concept row by id.

Parameters:

Name Type Description Default
concept_id

Concept identifier.

required

Returns:

Type Description

The ORM Concept instance.

Source code in src/pyomop/vocabulary.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
async def get_concept(self, concept_id):
    """Fetch a concept row by id.

    Args:
        concept_id: Concept identifier.

    Returns:
        The ORM Concept instance.
    """
    if self._version == "cdm6":
        from .cdm6 import Concept
    else:
        from .cdm54 import Concept
    stmt = select(Concept).where(Concept.concept_id == concept_id)
    async with self._cdm.session() as session:
        _concept = await session.execute(stmt)
    return _concept.scalar_one()

get_concept_by_code(concept_code, vocabulary_id) async

Fetch a concept by code within a vocabulary.

Parameters:

Name Type Description Default
concept_code

The vocabulary-specific code string.

required
vocabulary_id

Vocabulary identifier (e.g., 'SNOMED', 'LOINC').

required

Returns:

Type Description

The ORM Concept instance.

Source code in src/pyomop/vocabulary.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def get_concept_by_code(self, concept_code, vocabulary_id):
    """Fetch a concept by code within a vocabulary.

    Args:
        concept_code: The vocabulary-specific code string.
        vocabulary_id: Vocabulary identifier (e.g., 'SNOMED', 'LOINC').

    Returns:
        The ORM Concept instance.
    """
    if self._version == "cdm6":
        from .cdm6 import Concept
    else:
        from .cdm54 import Concept
    stmt = (
        select(Concept)
        .where(Concept.concept_code == concept_code)
        .where(Concept.vocabulary_id == vocabulary_id)
    )
    async with self._cdm.session() as session:
        _concept = await session.execute(stmt)
    return _concept.scalar_one()

get_session() async

Yield an async session bound to the current engine.

Yields:

Name Type Description
AsyncSession AsyncGenerator[AsyncSession, None]

An async SQLAlchemy session.

Source code in src/pyomop/vocabulary.py
277
278
279
280
281
282
283
284
285
@asynccontextmanager
async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
    """Yield an async session bound to the current engine.

    Yields:
        AsyncSession: An async SQLAlchemy session.
    """
    async with self._scope() as session:
        yield session

set_concept(concept_code, vocabulary_id=None)

Set the active concept context by code and vocabulary.

Parameters:

Name Type Description Default
concept_code

The concept code string to resolve.

required
vocabulary_id

Vocabulary identifier. Required.

None
Notes

On success, populates concept fields on this instance. On failure, sets _vocabulary_id and _concept_id to 0.

Source code in src/pyomop/vocabulary.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def set_concept(self, concept_code, vocabulary_id=None):
    """Set the active concept context by code and vocabulary.

    Args:
        concept_code: The concept code string to resolve.
        vocabulary_id: Vocabulary identifier. Required.

    Notes:
        On success, populates concept fields on this instance. On failure,
        sets ``_vocabulary_id`` and ``_concept_id`` to 0.
    """
    self._concept_code = concept_code
    try:
        if vocabulary_id is not None:
            self._vocabulary_id = vocabulary_id
            _concept = asyncio.run(
                self.get_concept_by_code(concept_code, vocabulary_id)
            )
        else:
            raise ValueError(
                "vocabulary_id must be provided when setting concept by code."
            )

        self._concept_name = _concept.concept_name
        self._domain_id = _concept.domain_id
        self._concept_id = _concept.concept_id
        self._concept_class_id = _concept.concept_class_id
        self._concept_code = _concept.concept_code

    except:
        self._vocabulary_id = 0
        self._concept_id = 0

write_vocab(df, table, if_exists='replace', chunk_size=1000) async

Write a DataFrame to a vocabulary table with type-safe defaults.

Ensures required columns exist with reasonable defaults, coerces types, and performs chunked inserts via SQLAlchemy core for performance.

Parameters:

Name Type Description Default
df

Pandas DataFrame with data to insert.

required
table

Target table name (e.g., 'concept').

required
if_exists

Compatibility only. This method always inserts.

'replace'
chunk_size

Number of rows per batch insert.

1000
Source code in src/pyomop/vocabulary.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
async def write_vocab(self, df, table, if_exists="replace", chunk_size=1000):
    """Write a DataFrame to a vocabulary table with type-safe defaults.

    Ensures required columns exist with reasonable defaults, coerces types,
    and performs chunked inserts via SQLAlchemy core for performance.

    Args:
        df: Pandas DataFrame with data to insert.
        table: Target table name (e.g., 'concept').
        if_exists: Compatibility only. This method always inserts.
        chunk_size: Number of rows per batch insert.
    """
    async with self.get_session() as session:
        # For PostgreSQL, temporarily relax constraint enforcement during bulk loads
        is_pg = False
        try:
            is_pg = self._engine.dialect.name.startswith("postgres")
        except Exception:
            is_pg = False
        if is_pg:
            logger.info(
                "Temporarily disabling replication role for bulk load on postgres"
            )
            try:
                await session.execute(
                    text("SET session_replication_role = replica")
                )
            except Exception:
                # Ignore if not permitted or unsupported
                logger.warning("Failed to set session_replication_role to replica")

        conn = await session.connection()
        automap: AutomapBase = automap_base()

        def prepare_automap(sync_conn):
            automap.prepare(autoload_with=sync_conn)

        await conn.run_sync(prepare_automap)
        mapper = getattr(automap.classes, table)

        # Build defaults for non-nullable columns based on SQL types
        sa_cols = {c.name: c for c in mapper.__table__.columns}

        def default_for(col):
            from sqlalchemy import (
                BigInteger,
                Date,
                DateTime,
                Integer,
                Numeric,
                String,
                Text,
            )

            t = col.type
            if isinstance(t, (Integer, BigInteger)):
                return 0
            if isinstance(t, Numeric):
                return 0
            if isinstance(t, (String, Text)):
                return "UNKNOWN"
            if isinstance(t, Date):
                return date(1970, 1, 1)
            if isinstance(t, DateTime):
                return datetime(1970, 1, 1)
            return None

        # Work on a copy so we can normalize types and fill required fields
        df2 = df.copy()

        for name, col in sa_cols.items():
            # Ensure column exists
            if name not in df2.columns:
                # For nullable columns, start with None; for required, use default
                df2[name] = None if col.nullable else default_for(col)
                continue

            # Coerce types and handle missing values
            if str(df2[name].dtype) == "object":
                # Treat empty strings as missing
                df2[name] = df2[name].replace("", np.nan)

            from sqlalchemy import BigInteger
            from sqlalchemy import Date as SA_Date
            from sqlalchemy import DateTime as SA_DateTime
            from sqlalchemy import Integer, Numeric, String, Text

            t = col.type
            if isinstance(t, SA_Date):
                ser = pd.to_datetime(df2[name], errors="coerce").dt.date
                df2[name] = (
                    ser.where(pd.notna(ser), None)
                    if col.nullable
                    else ser.fillna(default_for(col))
                )
            elif isinstance(t, SA_DateTime):
                # Normalize to UTC-naive to avoid tz-aware vs tz-naive issues in Postgres
                ser = pd.to_datetime(df2[name], errors="coerce", utc=True)

                # Convert to Python datetime and drop tzinfo
                def _to_naive(dt):
                    try:
                        if pd.isna(dt):
                            return None
                    except Exception:
                        pass
                    if hasattr(dt, "to_pydatetime"):
                        py = dt.to_pydatetime()
                    else:
                        py = dt
                    if getattr(py, "tzinfo", None) is not None:
                        py = (
                            py.tz_convert("UTC").tz_localize(None)
                            if hasattr(py, "tz_convert")
                            else py.replace(tzinfo=None)
                        )
                    return py

                ser = ser.map(_to_naive)
                df2[name] = (
                    ser.where(pd.notna(ser), None)
                    if col.nullable
                    else ser.fillna(default_for(col))
                )
            elif isinstance(t, (Integer, BigInteger)):
                ser = pd.to_numeric(df2[name], errors="coerce")
                df2[name] = (
                    ser.where(pd.notna(ser), None)
                    if col.nullable
                    else ser.fillna(default_for(col))
                )
            elif isinstance(t, Numeric):
                ser = pd.to_numeric(df2[name], errors="coerce")
                df2[name] = (
                    ser.where(pd.notna(ser), None)
                    if col.nullable
                    else ser.fillna(default_for(col))
                )
            elif isinstance(t, (String, Text)):
                # Only cast non-null values to str and trim; keep nulls as None
                ser = df2[name].astype(object)
                mask = ser.notna()
                ser.loc[mask] = ser.loc[mask].astype(str).str.slice(0, 255)
                if col.nullable:
                    ser = ser.where(pd.notna(ser), None)
                else:
                    # Required string columns get a default
                    ser = ser.where(pd.notna(ser), default_for(col))
                df2[name] = ser
            else:
                # Fallback: ensure NaN/NaT -> None for nullable cols, else fill default
                df2[name] = (
                    df2[name].where(pd.notna(df2[name]), None)
                    if col.nullable
                    else df2[name].fillna(default_for(col))
                )

        # Final safety pass: replace any remaining NaN/NaT with None across all columns
        df2 = df2.where(pd.notna(df2), None)

        stmt = insert(mapper)

        try:
            for _, group in df2.groupby(
                np.arange(df2.shape[0], dtype=int) // chunk_size
            ):
                records = group.to_dict("records")
                try:
                    # Fast path: batch insert
                    await session.execute(stmt, records)
                except Exception:
                    logger.warning(
                        "Batch insert failed, falling back to row-by-row insert."
                    )
                    # Fallback: insert row-by-row, skipping bad rows
                    for row in records:
                        try:
                            await session.execute(stmt, [row])
                        except Exception:
                            # Ignore duplicates/FK issues per row
                            logger.warning(
                                f"Failed to insert row: {row}. Skipping."
                            )
                            continue
                # Commit after each group
                await session.commit()
        finally:
            if is_pg:
                try:
                    await session.execute(
                        text("SET session_replication_role = origin")
                    )
                except Exception:
                    pass

Utilities to execute queries and convert results to DataFrames.

Exposes a small helper class around async SQLAlchemy execution and integration with OHDSI QueryLibrary.

CdmVector

Bases: object

Query execution utility for OMOP CDM.

Methods let you run raw SQL or QueryLibrary snippets and turn results into pandas DataFrames.

Source code in src/pyomop/vector.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class CdmVector(object):
    """Query execution utility for OMOP CDM.

    Methods let you run raw SQL or QueryLibrary snippets and turn results into
    pandas DataFrames.
    """

    async def execute(self, cdm, sqldict=None, query=None, chunksize=1000):
        """Execute a SQL query asynchronously.

        Args:
            cdm: CdmEngineFactory instance.
            sqldict: Optional key from ``CDMSQL`` to pick a canned query.
            query: Raw SQL string (used if provided).
            chunksize: Unused; kept for future streaming support.

        Returns:
            SQLAlchemy AsyncResult.
        """
        if sqldict:
            query = CDMSQL[sqldict]
        if not isinstance(query, str) or not query:
            raise ValueError("Query must be a non-empty string.")
        logger.info(f"Executing query: {query}")
        async with cdm.session() as session:
            result = await session.execute(text(query))
        await session.close()
        return result

    def result_to_df(self, result):
        """Convert a Result to a DataFrame.

        Args:
            result: SQLAlchemy Result or AsyncResult.

        Returns:
            pandas.DataFrame of result mappings.
        """
        list_of_dicts = result.mappings().all()
        """Convert a list of dictionaries to a DataFrame."""
        if not list_of_dicts:
            return pd.DataFrame()
        return pd.DataFrame(list_of_dicts)

    async def query_library(self, cdm, resource="person", query_name="PE02"):
        """Fetch a query from OHDSI QueryLibrary and execute it.

        Args:
            cdm: CdmEngineFactory instance.
            resource: Query resource subfolder (e.g., "person").
            query_name: Query markdown file name (e.g., "PE02").

        Returns:
            SQLAlchemy AsyncResult.
        """
        # Get the markdown from the query library repository: https://github.com/OHDSI/QueryLibrary/blob/master/inst/shinyApps/QueryLibrary/queries/person/PE02.md
        url = f"https://raw.githubusercontent.com/OHDSI/QueryLibrary/master/inst/shinyApps/QueryLibrary/queries/{resource}/{query_name}.md"
        markdown = requests.get(url)
        if markdown.status_code != 200:
            raise ValueError(f"Query {query_name} not found in the Query Library.")
        query = markdown.text.split("```sql")[1].split("```")[0].strip()
        # remove @cdm. and @vocab. references
        query = query.replace("@cdm.", "").replace("@vocab.", "")
        if not query:
            raise ValueError(f"Query {query_name} is empty.")
        return await self.execute(cdm, query=query)

execute(cdm, sqldict=None, query=None, chunksize=1000) async

Execute a SQL query asynchronously.

Parameters:

Name Type Description Default
cdm

CdmEngineFactory instance.

required
sqldict

Optional key from CDMSQL to pick a canned query.

None
query

Raw SQL string (used if provided).

None
chunksize

Unused; kept for future streaming support.

1000

Returns:

Type Description

SQLAlchemy AsyncResult.

Source code in src/pyomop/vector.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
async def execute(self, cdm, sqldict=None, query=None, chunksize=1000):
    """Execute a SQL query asynchronously.

    Args:
        cdm: CdmEngineFactory instance.
        sqldict: Optional key from ``CDMSQL`` to pick a canned query.
        query: Raw SQL string (used if provided).
        chunksize: Unused; kept for future streaming support.

    Returns:
        SQLAlchemy AsyncResult.
    """
    if sqldict:
        query = CDMSQL[sqldict]
    if not isinstance(query, str) or not query:
        raise ValueError("Query must be a non-empty string.")
    logger.info(f"Executing query: {query}")
    async with cdm.session() as session:
        result = await session.execute(text(query))
    await session.close()
    return result

query_library(cdm, resource='person', query_name='PE02') async

Fetch a query from OHDSI QueryLibrary and execute it.

Parameters:

Name Type Description Default
cdm

CdmEngineFactory instance.

required
resource

Query resource subfolder (e.g., "person").

'person'
query_name

Query markdown file name (e.g., "PE02").

'PE02'

Returns:

Type Description

SQLAlchemy AsyncResult.

Source code in src/pyomop/vector.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def query_library(self, cdm, resource="person", query_name="PE02"):
    """Fetch a query from OHDSI QueryLibrary and execute it.

    Args:
        cdm: CdmEngineFactory instance.
        resource: Query resource subfolder (e.g., "person").
        query_name: Query markdown file name (e.g., "PE02").

    Returns:
        SQLAlchemy AsyncResult.
    """
    # Get the markdown from the query library repository: https://github.com/OHDSI/QueryLibrary/blob/master/inst/shinyApps/QueryLibrary/queries/person/PE02.md
    url = f"https://raw.githubusercontent.com/OHDSI/QueryLibrary/master/inst/shinyApps/QueryLibrary/queries/{resource}/{query_name}.md"
    markdown = requests.get(url)
    if markdown.status_code != 200:
        raise ValueError(f"Query {query_name} not found in the Query Library.")
    query = markdown.text.split("```sql")[1].split("```")[0].strip()
    # remove @cdm. and @vocab. references
    query = query.replace("@cdm.", "").replace("@vocab.", "")
    if not query:
        raise ValueError(f"Query {query_name} is empty.")
    return await self.execute(cdm, query=query)

result_to_df(result)

Convert a Result to a DataFrame.

Parameters:

Name Type Description Default
result

SQLAlchemy Result or AsyncResult.

required

Returns:

Type Description

pandas.DataFrame of result mappings.

Source code in src/pyomop/vector.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def result_to_df(self, result):
    """Convert a Result to a DataFrame.

    Args:
        result: SQLAlchemy Result or AsyncResult.

    Returns:
        pandas.DataFrame of result mappings.
    """
    list_of_dicts = result.mappings().all()
    """Convert a list of dictionaries to a DataFrame."""
    if not list_of_dicts:
        return pd.DataFrame()
    return pd.DataFrame(list_of_dicts)

Predefined OMOP SQL snippets from the OHDSI Query Library.

This module exposes a small dictionary of named SQL queries that can be used for demos, tests, or quick analytics over a CDM instance.

Source: https://github.com/OHDSI/QueryLibrary/tree/master/inst/shinyApps/QueryLibrary/queries