[FastAPI] 9. Persistence Layer 구간을 비동기 처리 하는 방법

반응형

첫 포스트에서 우리는 FastAPI가 ASGI 기반의 uvicorn을 이용하여 uvloop에 기반한 비동기 처리로 API 요청과 응답을 비동기로 처리할 수 있다는 이야기를 하였습니다. 하지만 공교롭게도 Database Connection에서 이러한 기능을 지원해주지 않아 API 요청 단에는 비동기 처리가 가능하여도 DB에 액세스 하는 구간은 비동기 처리가 되지 않기 때문에 Blocking이 발생하고, 다음 요청이 계속 대기 되는 문제점을 가지고 있었습니다.

 

 

 

 

Python DB API

그렇다면 파이썬은 어떤식으로 Database와 연결할 수 있는 것일까요? Java의 경우는 JDBC라는 Database Connector라는 것이 존재하여 애플리케이션과 Database 사이를 연결해주는 게 가능한데요.

 

Python에는 이와 비슷한 것으로 DB API라는 것이 존재합니다. 

 

 

PEP 249 -- Python Database API Specification v2.0

The official home of the Python Programming Language

www.python.org

파이썬 문서 PEP 249에는 Python DB API 2.0이 담겨져 있습니다. 파이썬에서 DB에 액세스할 수 있는 표준 인터페이스로 우리가 이전에 다뤘던 FastAPI CRUD 만들기에서도 이러한 표준 API 기반의 모듈을 사용하여 데이터베이스에 액세스 했던 것입니다.

 

좀 더 이해하기 쉽게 그림으로 보여드리자면, asyncpg라는 PostgreSQL을 위한 DB 라이브러리가 있습니다. 이 라이브러리는 Python DB API를 기본 골격으로 하여 PostgreSQL에 맞춰진 라이브러리이며 커넥션을 만들기 위해 Connection 객체를 사용하고, Query를 작성하기 위해 Cursor 객체를 만들고, connection 위에 execute 메소드를 이용하여 Query를 실행할 수 있도록 되어 있습니다.

 

이러한 구조는 asyncpg 뿐만 아니라 Python에서 지원하는 대부분의 DB 라이브러리들이 이렇게 구현이 되어지고 있는 것입니다. 이것이 바로 표준 Python DB API 입니다. 

 

 

 

 

(Old) Databases + asyncpg + SQLAlchemy 1.3

asyncpg는 PosgreSQL용 비동기 DB 라이브러리로 FastAPI에서는 Databases와 asyncpg 모듈을 조합하여 SQLAlchemy Core에 기반한 문법을 통해 비동기 트랜잭션을 진행할 수 있는 방법이 있습니다.

 

asyncpg는 기본적으로 Python의 asyncio를 기반으로 동작하며 Python 3.5 버전부터 동작하고, PostgreSQL 9.2 ~ 12 버전까지를 지원합니다.

$ poetry add databases[asyncpg]

Databases 디펜던시를 사용할 때는 패키지 종류에 자신이 사용할 Database 이름을 넣고 디펜던시에 추가해야 합니다. 이렇게 디펜던시를 추가하게 되면 SQLAlchemy 1.3 버전의 디펜던시가 같이 추가될 것입니다.

 

공식적으로 SQLAlchemy의 1.3 버전에서는 asyncpg dialect를 지원하지 않습니다. 따라서 FastAPI나 Sanic 같은 비동기 프레임워크에서 Persistence Layer에 비동기 처리를 넣기 위해서는 asyncpgsa나 우리가 현재 다루고 있는 databases와 같은 타사의 어댑터와 같이 사용해야 합니다.

 

Databases 어댑터에 연결하고자 하는 Database의 URI를 넣고, 더불어 SQLAlchemy의 create_engine에도 같이 URI를 넣어줍니다.

 

이렇게 2가지 전부에 URI를 넣어주는 이유는 Databases 어댑터에서 실제 트랜잭션을 맡고, SQLAlchemy의 Core를 이용하여 Query를 생성해주기 위함입니다. 따라서 이러한 비동기 처리는 아래와 같이 동작합니다.

 

SQLAlchemy에서 DB 서버에 접속하여 하는 일은 테이블의 메타데이터를 정의하는 것입니다. 여기서 메타데이터란 테이블을 사용하기 위한 DDL 정의를 이야기 하며 차후 정의한 메타데이터를 DB 서버에 보내어 DDL을 수행해 테이블을 생성하는 역할을 해줍니다.

 

여기에 QueryDSL방식의 SQLAlchemy Core를 사용하여 DB 쿼리를 질의하고, 이 쿼리를 Databases 모듈의 fetch_all이나 execute 와 같은 Python DB API 함수에 넣어주면 트랜잭션이 동작하며 애플리케이션이 받아치는 형식입니다.

 

이렇게 복잡한 프로시저를 거치는 이유는 SQLAlchemy가 자체적으로 asyncpg나 aiopg와 같은 비동기 트랜잭션 모듈을 지원하지 않는 점에 있고, 이를 지원해주는 별도의 모듈을 사용하여 이용하는 방법인데, 만약 자신이 수동으로 Query를 작성하여 사용하고자 하는 경우에는 SQLAlchemy Core 없이도 Databases 모듈로만으로도 비동기 처리를 충분히 진행할 수 있습니다.

 

 

 

 

(New) SQLAlchemy 1.4.x + asyncpg

지난 3월 29일부터 SQLAlchemy 1.4.x 버전이 정식 버전으로 출시되면서 asyncpg를 이제 SQLAlchemy에서도 사용할 수 있게 되었습니다. 따라서 위의 방법처럼 비동기 처리를 위해 별도의 어댑터를 사용하지 않아도 SQLAlchemy와 Connection Pool만 있다면 비동기 처리가 가능한 것이죠.

 

SQLAlchemy에서 asyncpg를 지원하기 때문에 별도로 애플리케이션에서 asyncpg 내 함수나 객체를 선언할 필요 없이 바로 SQLALchemy의 create_async_engine 함수를 사용하여 커넥션을 만들고, DB 처리 함수 또한 Alchemy에서 제공하기 때문에 이를 사용해서 진행할 수 있습니다.

 

기존이랑 다른점은 create_engine이 아닌 create_async_engine을 사용한다는 점이고, PostgreSQL을 사용하신다면 연결 형식을 구분하기 위해 postgresql 뒤에 + asyncpg를 붙인다는 점이 있습니다. 마지막으로 Session 또한 AsyncSession이 별도로 존재하여 ORM을 이용한 DB 처리시 비동기 처리를 한층 강화시킨 점이 눈에 보입니다.

 

ORM으로 정의한 클래스를 DDL 처리할 때에는 동기 처리로 진행해야 하는 점이 눈에 보이는데요. 일반적으로 사용하는 create_all 함수를 사용하면 동기 함수이기 때문에 asyncio와 호환되지 않으므로 run_sync 함수를 이용하여 asyncio와 호환되는 동기 모델로 실행하도록 유도해줘야 합니다.

 

DB에서 데이터를 가져오는 방법도 조금은 다릅니다. 기존에는 sqlalchemy 상위에 있는 select 함수를 가지고 쿼리를 질의하는 방법이나 ORM session에서 query 함수를 이용해 쿼리를 질의하는 방법이 있었습니다. 하지만 이들 모두 동기 방식으로 비동기로 처리하기 위해서는 이 방법은 사용할 수 없습니다.

 

SQLAlchemy 1.4 버전에서 새로이 추가된 sqlalchemy.ext.asyncio.select 함수를 사용하여 select 쿼리를 생성하도록 하고, 이를 이용해 쿼리를 생성이 끝났으면 AsyncSession의 execute 비동기 메소드를 이용하여 쿼리 실행한 결과를 가져오도록 하면 깔끔하게 비동기 처리가 진행됩니다.

 

이 다음 사용한 scalars의 경우는 모델의 메타데이터를 걷고 순수 데이터만을 가져오겠다는 함수이며, fetchall을 통해 가져오면 기존에 사용했던 코드를 그대로 리팩토링 할 수 있습니다.

 

혹시 SQLAlchemy에서 select, where 등의 비ORM 함수를 이용하여 사용하신 분들이라면 session.refresh 함수를 사용해보셨겠지만 ORM 함수에서는 refresh 함수가 사실상 거의 사용하지 않기 때문에 모르실 수 있습니다.

 

SQLAlchemy 1.4에서는 새로운 데이터의 생성이나, 업데이트 발생시 모델 결과를 업데이트해주기 위해 refresh 메소드를 이용하여야 합니다. AsyncSession이 기본적으로 Session에 기반하여 AsyncEngine을 붙인 것이기 때문에 사실상 Session 객체랑 거의 동일하게 동작합니다. 

 

그런데, commit 등과 같이 transaction의 함수들은 AsyncSession에서 greenlet_spawn 함수를 통해 비동기 처리를 진행하도록 재구현되어 있지만 add, expire 등의 함수들은 재구현되어 있지 않아, 사실상 AsyncSession의 멤버 변수에 포함되어 있는 Session 클래스로 동작합니다.

 

코드를 보면, 프록시 패턴을 이용해서 Session 클래스의 메소드를 사용하는 것을 볼 수 있는데, 이 때문에 add 함수나 모델의 업데이트를 진행할 때는 동기 클래스를 사용하므로 refresh 메소드를 이용해서 비동기 처리 후 모델 반환을 진행하셔야 합니다.

 

 

 

 

 

마치며...

DB 구간을 비동기 처리하는 방법은 asyncpg 외에도 aiopg라는 다른 모듈도 존재합니다. 본인이 개발하시는 프로젝트에서 어떤 모듈을 사용하고 그에 대한 성능을 파악하는 것이 비동기 처리 구현을 진행하는 데 선 작업이 되어야 할 것입니다.

 

SQLAlchemy 1.4 버전을 시작으로 앞으로는 JPQL 방식이 아닌 2.x의 QueryDSL 방식으로 바뀔 것으로 보입니다. 따라서 파이썬을 계속 백엔드 개발로 이어나갈 분이시라면 ORM을 SQLAlchemy로 사용하실 때 이 부분을 염두해두는 것이 좋아 보입니다.

 

비록 이번 글에서는 asyncpg라는 PostgreSQL을 주제로 다루었지만 파이썬 DB API에서 비동기를 지원하는 모듈에는 아래와 같이 존재합니다.

 

  • MySQL (aiomysql)
  • PostgreSQL (aiopg, asyncpg)
  • SQLite (aiosqlite)

한 Dialect를 강력하게 지원하는 모듈에는 위의 3가지가 있지만 MSSQL과 같은 다른 Dialect에서도 aioodbc를 이용하여 ODBC 드라이버를 로드한 다음 사용할 수 있습니다.

반응형

Tistory Comments 7

  • hihi

    sqlalchemy 1.3 버전으로 쓸 때 poetry add database[postgresql] 이 아니라 poetry add databases[asyncpg] 인 것 같습니다 !

    글 잘 읽었습니다, 감사해요

  • 모오르겠다

    질문 있습니다. 만약 사용하는 DB가 mysql 이라면, create_async_engine 설정 시 postgresql+asyncpg로 할 수 없나요?

  • fastapi 의 문서에도 나온것처럼 depends 를 사용하여 리퀘스트마다 독립적인 session 을 사용하는 방식이 코드의 간결성은 지킬 수 있으나 로직 내에서 db 트랜잭션 작업 외에 외부 api 호출, 하드한 연산처리 등의 작업을 할 동안에는 session 을 release 해주는게, 즉 db 트랜잭션 로직을 context manger(with..as) 를 통해 감싸주어 자원을 효율적으로관리할 수 있지 않을까? 라는 생각을 해봅니다. 혹시 이 부분에 대해서는 어떻게 생각하시나요?

    • Favicon of https://blog.neonkid.xyz BlogIcon Neon K.I.D

      좋은 말씀이십니다. 실제로 제가 다니고 있는 회사에서도 그렇게 사용하시는 분이 계시고, 저 또한 Repository 패턴을 만들어 사용했을 때는 context manager를 이용했습니다.

      여기에 좀 더 첨삭하면 decorator를 이용해서 코드의 간결성을 좀 더 표현해 볼 수 있을 것 같네요.

    • BlogIcon

      혹시 데코레이터는 어떤식으로 사용하면 간결하게 처리될지 예시좀 부탁드려도 될까요?