Source code for pyomop.engine_factory

# from sqlalchemy import create_engine
# from sqlalchemy.orm import Session
# from sqlalchemy.ext.automap import automap_base

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.automap import automap_base

[docs]class CdmEngineFactory(object): def __init__(self, db = 'sqlite', host = 'localhost', port = 5432, user = 'root', pw='pass', name = 'cdm6.sqlite', schema = 'public'): self._db = db self._name = name self._host = host self._port = port self._user = user self._pw = pw self._schema = schema self._engine = None self._base = None
[docs] async def init_models(self, metadata): async with self._engine.begin() as conn: await conn.run_sync(metadata.drop_all) await conn.run_sync(metadata.create_all)
@property def db(self): return self._db @property def host(self): return self._host @property def port(self): return self._port @property def name(self): return self._name @property def user(self): return self._user @property def pw(self): return self._pw @property def schema(self): return self._schema @property def engine(self): return self.engine @property def base(self): if self.engine is not None: # Not self_engine Base = automap_base() Base.prepare(self.engine, reflect=True) return Base.classes return None @property def engine(self): if self._db == 'sqlite': self._engine = create_async_engine("sqlite+aiosqlite:///"+self._name) if self._db == 'mysql': mysql_url = 'mysql://{}:{}@{}:{}/{}' mysql_url = mysql_url.format(self._user, self._pw, self._host, self._port, self._name) self._engine = create_async_engine(mysql_url, isolation_level="READ UNCOMMITTED") if self._db == 'pgsql': # https://stackoverflow.com/questions/9298296/sqlalchemy-support-of-postgres-schemas dbschema = '{},public' # Searches left-to-right dbschema = dbschema.format(self._schema) pgsql_url = 'postgresql+psycopg2://{}:{}@{}:{}/{}' pgsql_url = pgsql_url.format(self._user, self._pw, self._host, self._port, self._name) self._engine = create_async_engine( pgsql_url, connect_args={'options': '-csearch_path={}'.format(dbschema)}) return self._engine @property def session(self): if self._engine is not None: async_session = sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession) return async_session return None @property def async_session(self): if self._engine is not None: async_session = sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession) return async_session return None @db.setter def db(self, value): self._db = value @name.setter def name(self, value): self._name = value @port.setter def port(self, value): self._port = value @host.setter def host(self, value): self._host = value @user.setter def user(self, value): self._user = value @pw.setter def pw(self, value): self._pw = value @schema.setter def schema(self, value): self._schema = value