Python에서 asyncio.gather()를 활용한 고성능 비동기 프로그래밍

Python에서 asyncio.gather()를 활용한 고성능 비동기 프로그래밍
Photo by Ferenc Almasi / Unsplash

On this page

소개

현대 소프트웨어 개발에서 효율적인 리소스 사용과 빠른 응답 시간은 매우 중요합니다. Python의 asyncio 라이브러리, 특히 asyncio.gather() 함수는 이러한 요구사항을 충족시키는 강력한 도구입니다. 이 블로그 포스트에서는 asyncio.gather()의 사용법, 장점, 그리고 실제 적용 사례에 대해 자세히 알아보겠습니다.

asyncio.gather()란?

asyncio.gather()는 여러 비동기 작업을 동시에 실행하고 모든 결과를 기다리는 함수입니다. 이 함수는 다음과 같은 특징을 가집니다:

  1. 병렬 실행: 여러 작업을 동시에 시작하고 실행합니다.
  2. 대기 시간 최소화: I/O 바운드 작업의 경우, 한 작업이 대기 중일 때 다른 작업을 진행할 수 있습니다.
  3. 전체 실행 시간 단축: 모든 작업을 순차적으로 실행하는 것보다 훨씬 빠릅니다.

사용 예시

1. 데이터베이스 쿼리

results = await asyncio.gather(*[db.execute(query) for query in queries])

이 코드는 여러 데이터베이스 쿼리를 동시에 실행합니다. 100개의 개별 쿼리를 병렬로 처리할 수 있어, 전체 실행 시간이 크게 단축됩니다.

주의: 데이터베이스의 동시 연결 수 제한을 고려해야 합니다.

2. API 호출

api_results = await asyncio.gather(*[call_api(item) for item in items])

여러 외부 API를 동시에 호출합니다. 네트워크 지연 시간이 겹치므로 전체 대기 시간이 줄어듭니다.

예시: 100개의 API 호출이 각각 1초씩 걸린다면, 순차 실행 시 100초가 걸리지만, 동시 실행 시 약 1-2초 정도로 단축될 수 있습니다.

3. 파일 처리

results = await asyncio.gather(*[process_file(file) for file in files])

여러 파일을 동시에 읽거나 쓸 수 있습니다. I/O 작업이 겹치면서 전체 처리 시간이 단축됩니다.

대량 데이터 처리에서의 이점

  1. 스케일링: 데이터 양이 증가해도 처리 시간이 선형적으로 증가하지 않습니다.
  2. 리소스 활용: CPU와 I/O 리소스를 더 효율적으로 사용합니다.
  3. 응답성: 전체 작업이 완료되기 전에 일부 결과를 얻을 수 있습니다.

주의사항

  1. 메모리 사용: 너무 많은 작업을 동시에 실행하면 메모리 사용량이 급증할 수 있습니다.
  2. 동시성 제한: 시스템 리소스와 외부 서비스의 제한을 고려하여 동시 실행 수를 제한해야 할 수 있습니다.
  3. 오류 처리: 동시 실행 중 발생한 오류를 적절히 처리해야 합니다.

실제 사용 예 (대량 데이터 처리)

async def process_large_dataset(dataset, chunk_size=1000):
    results = []
    for i in range(0, len(dataset), chunk_size):
        chunk = dataset[i:i+chunk_size]
        chunk_results = await asyncio.gather(*[process_item(item) for item in chunk])
        results.extend(chunk_results)
    return results

이 예제는 대량의 데이터셋을 청크로 나누어 처리합니다. 각 청크 내의 항목들은 동시에 처리되지만, 청크 간에는 순차적으로 처리됩니다. 이 방식은 전체 데이터셋을 한 번에 처리하는 것보다 메모리 사용을 효율적으로 관리하면서도 병렬 처리의 이점을 얻을 수 있습니다.

Celery와의 통합

asyncio.gather()는 Celery 태스크 내에서도 사용할 수 있어, 분산 작업 처리의 이점과 비동기 프로그래밍의 효율성을 결합할 수 있습니다.

from celery import Celery
import asyncio

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
async def process_data(data_chunk):
    db = await get_db()
    async with db.begin():
# 데이터베이스 작업
        results = await asyncio.gather(*[process_item(db, item) for item in data_chunk])

# API 호출
    api_results = await asyncio.gather(*[call_api(result) for result in results])

    return api_results

이 예제에서는 Celery 태스크 내에서 데이터베이스 작업과 API 호출을 비동기적으로 처리합니다. 이를 통해 대규모 데이터 처리 작업을 효율적으로 분산하고 실행할 수 있습니다.

결론

asyncio.gather()는 Python에서 고성능 비동기 프로그래밍을 구현하는 데 핵심적인 도구입니다. 대량 데이터 처리, 여러 외부 서비스와의 통합, 복잡한 워크플로우 관리 등에서 큰 성능 향상을 가져올 수 있습니다. 하지만 이를 효과적으로 사용하기 위해서는 비동기 프로그래밍의 개념을 잘 이해하고, 시스템 리소스와 제한 사항을 고려해야 합니다.

적절히 사용된다면, asyncio.gather()는 애플리케이션의 성능을 크게 향상시키고, 리소스 사용을 최적화하며, 사용자 경험을 개선하는 강력한 도구가 될 수 있습니다.

Subscribe to Keun's Story newsletter and stay updated.

Don't miss anything. Get all the latest posts delivered straight to your inbox. It's free!
Great! Check your inbox and click the link to confirm your subscription.
Error! Please enter a valid email address!