Source code for pyomop.vocabulary

import pandas as pd
from .cdm6_tables import Concept
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_scoped_session,
)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import insert
import numpy as np
from sqlalchemy.ext.automap import automap_base, AutomapBase
from sqlalchemy import select
[docs]class CdmVocabulary(object): def __init__(self, cdm): self._concept_id = 0 self._concept_name = '' self._domain_id = '' self._vocabulary_id = '' self._concept_class_id = '' self._concept_code = '' self._cdm = cdm self._engine = cdm.engine self._maker = sessionmaker(self._engine, class_=AsyncSession) self._scope = async_scoped_session(self._maker, scopefunc=asyncio.current_task) @property def concept_id(self): return self._concept_id @property def concept_code(self): return self._concept_code @property def concept_name(self): return self._concept_name @property def vocabulary_id(self): return self._vocabulary_id @property def domain_id(self): return self._domain_id @concept_id.setter def concept_id(self, concept_id): self._concept_id = concept_id _concept = asyncio.run(self.get_concept(concept_id)) self._concept_name = _concept.concept_name self._domain_id = _concept.domain_id self._vocabulary_id = _concept.vocabulary_id self._concept_class_id = _concept.concept_class_id self._concept_code = _concept.concept_code
[docs] async def get_concept(self, concept_id): stmt = select(Concept).where(Concept.concept_id == concept_id) async with self._cdm.session() as session: _concept = await session.execute(stmt) return _concept.scalar_one()
[docs] async def get_concept_by_code(self, concept_code, vocabulary_id): stmt = select(Concept).where(Concept.concept_code == concept_code) \ .where(Concept.vocabulary_id == vocabulary_id) async with self._cdm.session() as session: _concept = await session.execute(stmt) return _concept.scalar_one()
[docs] def set_concept(self, concept_code, vocabulary_id=None): self._concept_code = concept_code try: if vocabulary_id is not None: self._vocabulary_id = vocabulary_id _concept = asyncio.run(self.get_concept_by_code(concept_code, vocabulary_id)) else: _concept = asyncio.run(self.get_concept_by_code(concept_code)) self._vocabulary_id = _concept.vocabulary_id self._concept_name = _concept.concept_name self._domain_id = _concept.domain_id self._concept_id = _concept.concept_id self._concept_class_id = _concept.concept_class_id self._concept_code = _concept.concept_code except: self._vocabulary_id = 0 self._concept_id = 0
[docs] def create_vocab(self, folder, sample=None): try: df = pd.read_csv(folder + '/DRUG_STRENGTH.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'drug_strength', 'replace')) # df.to_sql('drug_strength', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/CONCEPT.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'concept', 'replace')) # df.to_sql('concept', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/CONCEPT_RELATIONSHIP.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'concept_relationship', 'replace')) # df.to_sql('concept_relationship', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/CONCEPT_ANCESTOR.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'concept_ancestor', 'replace')) # df.to_sql('concept_ancester', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/CONCEPT_SYNONYM.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'concept_synonym', 'replace')) # df.to_sql('concept_synonym', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/VOCABULARY.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'vocabulary', 'replace')) # df.to_sql('vocabulary', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/RELATIONSHIP.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'relationship', 'replace')) # df.to_sql('relationship', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/CONCEPT_CLASS.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'concept_class', 'replace')) # df.to_sql('concept_class', con=self._engine, if_exists = 'replace') df = pd.read_csv(folder + '/DOMAIN.csv', sep='\t', nrows=sample, on_bad_lines='skip') asyncio.run(self.write_vocab(df, 'domain', 'replace')) # df.to_sql('domain', con=self._engine, if_exists = 'replace') except Exception as e: print(f"An error occurred while creating the vocabulary: {e}")
[docs] @asynccontextmanager async def get_session(self) -> AsyncGenerator[AsyncSession, None]: async with self._scope() as session: yield session
[docs] async def write_vocab(self, df, table, if_exists='replace', chunk_size=1000): async with self.get_session() as session: conn = await session.connection() automap: AutomapBase = automap_base() await conn.run_sync(lambda sync_conn: automap.prepare(autoload_with=sync_conn)) mapper = getattr(automap.classes, table) stmt = insert(mapper) for _, group in df.groupby(np.arange(df.shape[0], dtype=int) // chunk_size): await session.execute(stmt, group.to_dict("records")) await session.commit() await session.close()