ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Python - 데이터베이스 비동기 처리 ( SQLAlchemy )
    Python 2021. 11. 25. 17:12
    반응형

    비동기 처리를 위해선 통신뿐만 아니라 데이터 베이스 처리 부분도 비동기 처리가 되어야만 완벽한 비동기 처리가 가능하다. java 에선 R2DBC 가 있지만 아직 정식 버전이 아니고 안정성도 보장돼있지 않다. 찾아보면 아직까지는 시범적으로 사용해보라고 하지만 python에서 사용하는 sqlalchemy 경우 지원하기 시작한 지 기간이 좀 됐고 지속적으로 버전이 올라가면서 잘 지원이 되고 있는 거 같다. 

     

    설정

    일단 사용을 위해서 sqlalchemy 를 설치해줘야 하고 드라이버도 일반 동기 드라이버가 아닌 비동기 드라이버를 사용해야 한다. 

     SQLAlchemy 1.4 버전이상 부터 Async를 지원한다. 

    pip3 install aiomysql
    pip3 install sqlalchemy

     

    기본 설정 및 사용

    여러 블로그들을 뒤지면서 샘플코드를 확인했는데 공식 사이트에 나와있는 구성에서 조금 다른 정도 여서 대부분은 공식 사이트에 있는 샘플들을 가져왔다. 

    import asyncio
    import aiomysql
    
    loop = asyncio.get_event_loop()
    
    
    async def test_example():
        conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                           user='root', password='', db='mysql',
                                           loop=loop)
    
        cur = await conn.cursor()
        await cur.execute("SELECT Host,User FROM user")
        print(cur.description)
        r = await cur.fetchall()
        print(r)
        await cur.close()
        conn.close()
    
    loop.run_until_complete(test_example())

    간단하게 접속해서 low level 쿼리를 사용한 샘플이다. 어떤 구성으로 처리되는지 확인이 가능한데 await를 이용해서 각 비동기 요소를 컨트롤하고 있는 것을 확인할 수 있다. 

    커넥션 풀 

    동기식으로 써야만 사용이 가능하지 않을까 싶었는데 비동기로 하더라도 내부적으로 커넥션 풀을 관리해주는 것으로 추측된다. 샘플 코드에 커넥션 풀을 사용 가능한 방법이 있고 커넥션 풀을 사용하려면 아래 내용을 참고하면 될 거 같다. 

    import asyncio
    import aiomysql
    async def test_example(loop):
      pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                       user='root', password='',
                       db='mysql', loop=loop)
      async with pool.acquire() as conn:
        async with conn.cursor() as cur:
          await cur.execute("SELECT 42;")
          print(cur.description)
          (r,) = await cur.fetchone()
          assert r == 42
      pool.close()
      await pool.wait_closed()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_example(loop))

     

    SQLAlchemy 사용 

    가장 필요한 부분이였는데 low query를 사용해서 코딩하는 건 우아하지 못한 방법 같다. ORM을 사용해서 최대한 코드 다운 코드를 구성할 수 있을까 의문을 가졌는데 너무나도 당연하게 (?) 사용이 가능하다. 

    case 1 : aiomysql 사용하여 SQLAlchemy 사용

    import asyncio
    import sqlalchemy as sa
    from aiomysql.sa import create_engine
    
    metadata = sa.MetaData()
    tbl = sa.Table('tbl', metadata,
            sa.Column('id', sa.Integer, primary_key=True),
            sa.Column('val', sa.String(255)))
    
    async def go(loop):
      engine = await create_engine(user='root', db='test_pymysql',
                     host='127.0.0.1', password='', loop=loop)
      async with engine.acquire() as conn:
        await conn.execute(tbl.insert().values(val='abc'))
        await conn.execute(tbl.insert().values(val='xyz'))
        async for row in conn.execute(tbl.select()):
          print(row.id, row.val)
      engine.close()
      await engine.wait_closed()
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(go(loop))

    기본적으로 간단하게 데이터를 입력하는 부분들이 있는데 개인적으로는 await 를 계속 붙여줘야 한다는 점이 불편함이 있을 수 있을 거 같다. 이미 async 선언이 돼있다면 적어도 excute() 호출 부분은 알아서 await 처리를 해주면 좋지 않을까?라는 아쉬움이 있다. 

    case 2 :  Database 모듈로만 접근 하여 SQLAchemy 사용 

    특이한 점이였는데 공식 사이트는 아니지만 다른 블로그를 찾아보니 Database 기본 모듈로만으로도  어느 정도 구현이 가능한 걸 확인했다. 아래 코드는 단순 조회 기능인데 생각보다 위에 사용된 코드보다 간결해 보이긴 하지만 잘 뜯어보면 별반 차이는 없는 것 같다. 

    from sqlalchemy import Boolean, Column, Table, Metadata, String, Text
    
    
    #...
    metadata = Metadata()
    
    memos = Table(
        "memos",
        metadata,
        Column("id", String(120), primary_key=True),
        Column("title", String(80), default='No title', nullable=False, index=True),
        Column("content", Text, nullable=True)
        Column("is_favorite", Boolean, nullable=False)
    )
    
    async def get_todos():
        await database.connect()
        
        # SQLAlchemy core 
        query = memos.select()
        results = await database.fetch_all(query)
        
        await database.disconnect()
        
        return results

    case 3 : SQLAlchemy 1.4 이상 버전 사용시 Alchemy에서 제공하는 함수 사용 

    아래 코드는 SQLAlchemy 기능을 활용한 구성이다. engine 설정을 하여 사용이 가능하고 옵션들을 사용 가능하다. 

    from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
    from sqlalchemy.orm import declarative_base
    from typing import Optional
    
    Base = declarative_base()
    engine = create_async_engine('postgresql+asyncpg://user:pass@server_addr/database', pool_size=20 ,echo=True, pool_pre_ping=True)
    
    
    async def get_db_session() -> AsyncSession:
        sess = AsyncSession(bind=engine)
        try:
            yield sess
        finally:
            await sess.close()
    from sqlalchemy import Boolean, Column, String, Text 
    from sqlalchemy.ext.declarative import declarative_base 
    
    # ...
    
    Base = declarative_base() 
    
    
    class Memo(Base): 
        __tablename__ = 'memos' 
        
        id = Column(String(120), primary_key=True, default=lambda: str(uuid.uuid4())) 
        title = Column(String(80), default='No title', nullable=False, index=True) 
        content = Column(Text, nullable=True) 
        is_favorite = Column(Boolean, nullable=False, default=False)
    
        
    
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    기존 select 사용시 사용되는 방식은 모두 동기 방식이기 때문에 비동기 처리를 하기 위해서는 사용이 불가능 1.4 버전부터 추가된 sqlalchemy.ext.asyncio.select 함수를 사용해야 한다고 한다.

     engine.begin() 인경우 트랜잭션을 시작한단 의미인데 비동기 처리시 트랜잭션은 결국 세션을 사용하는 것과 동일한 역할을 한다고 한다. 

    연관된 Document ( https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html

    from pydantic import BaseModel 
    from typing import Optional 
    
    
    class RequestMemo(BaseModel):
        title: str 
        content: Optional[str] = None 
        is_favorite: Optional[bool] = False 
            
    
    class ResponseMemo(BaseModel): 
        id: str 
        title: str 
        content: Optional[str] = None 
        is_favorite: bool 
            
        class Config: 
            orm_mode = True 
            
            
    @app.post('/memos', response_model=ResponseMemo) 
    async def register_memo(req: RequestMemo, db: AsyncSession = Depends(get_db_session)): 
        memo = Memo(**req.dict()) 
        
        db.add(memo) 
        
        # 이 코드를 쓰지 않으면 DB에 반영되지 않음 
        db.commit()
        db.refresh(memo)
        
        return memo
    
    
    @app.put('/memos/{item_id}', response_model=ResponseMemo) 
    async def mod_memo(item_id: str, req: RequestMemo, db: AsyncSession = Depends(get_db_session)): 
        memo = db.query(Memo).filter_by(id=item_id) 
        req_dict = req.dict() 
        req_dict['id'] = item_id 
        
        req = {k: v for k, v in req_dict.items()} 
        for key, value in req.items(): 
            setattr(memo, key, value) 
            
        db.commit()
        db.refresh(memo)
        
        return memo

    위의 코드는 fast-api 기준으로 작성된 코드이다. flask 같은경우는 거의 동일한 구성이기 때문에 응용하기에 문제는 없어 보인다. commit() , refresh()를 실행시켜서 현재 상태를 반영하지 않으면 실제 적용이 되지 않는다. 

    일단, 파이썬에서 비동기 처리를 할때 데이터베이스 부분을 어떻게 처리할 수 있는지를 조사를 해보았고 sqlalchemy 1.4 버전에서 이미 지원을 하고 있다는 것을 확인했다. 어느 정도 응용하면 구성이 가능한 부분일 것 같다. 

     완전한 비동기의 이점을 가지기위해 데이터베이스와의 연결 처리 방식을 비동기로 처리하고 I/O 부분도 aiohttp를 사용해서 비동기 처리를 할 수 있다면 기본적으로 RDBMS를 쓰고 있어서 비동기 처리하는 것에 대한 걸림돌은 없어지지 않을까란 생각이 든다. 

     내부적으로 트랜잭션 관리가 어떻게 되는지는 좀더 살펴봐야 할부분인 것 같다. 아무리 생각해도 트랜잭션 관리와 비동기식 처리라는 두 분에서 서로 충돌할 수 있는 개념적 문제들이 남아 있기 때문에 롤백에 대한 문제도 테스트를 해보거나 관련 문서를 좀 더 찾아봐야 할거 같다. 어느 정도 데이터베이스를 비동기식으로 처리가 가능하게 되면 외부 통신에 사용될 aiohttp에 대해서도 정리가 필요할 것 같다. 

     

    참고 

    https://aiomysql.readthedocs.io/en/latest/

     

    Welcome to aiomysql’s documentation! — aiomysql 0.0.21 documentation

    aiomysql is a library for accessing a MySQL database from the asyncio (PEP-3156/tulip) framework. It depends and reuses most parts of PyMySQL . aiomysql tries to be like awesome aiopg library and preserve same api, look and feel. Internally aiomysql is cop

    aiomysql.readthedocs.io

    https://blog.neonkid.xyz/269

     

    [FastAPI] 9. Persistence Layer 구간을 비동기 처리 하는 방법

    첫 포스트에서 우리는 FastAPI가 ASGI 기반의 uvicorn을 이용하여 uvloop에 기반한 비동기 처리로 API 요청과 응답을 비동기로 처리할 수 있다는 이야기를 하였습니다. 하지만 공교롭게도 Database Connection

    blog.neonkid.xyz

    https://krcoder.com/p/31686

    반응형
Designed by Tistory.