⚡ concurrent.futures로 쓰레드·프로세스 동시 처리 간편하게!
concurrent.futures는 Python의 고수준 병렬·동시성 모듈로, 스레드 기반 또는 프로세스 기반 작업을 `Executor` 추상화로 간단하게 처리할 수 있습니다. 복잡한 스레드 제어 없이 Future 객체만으로 비동기 작업을 직관적으로 관리할 수 있어요.
✅ 1. ThreadPoolExecutor 기본 사용 예
import concurrent.futures
import time
def io_task(n):
time.sleep(1)
return f"I/O 결과 {n}"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(io_task, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
- I/O 바운드 작업에서 쓰레드를 효과적으로 활용. `as_completed()`로 완료 순서대로 결과 처리 가능.
✅ 2. ProcessPoolExecutor로 CPU 집약 작업 병렬화
import concurrent.futures
def cpu_task(n):
return sum(i*i for i in range(n))
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_task, [1000000, 2000000, 3000000]))
print(results)
- CPU 바운드 작업 시 `ProcessPoolExecutor`를 사용하면 GIL 제약 없이 병렬 속도 높일 수 있음.
✅ 3. timeout 설정, 취소, 예외 처리
import concurrent.futures, time
def slow(n):
time.sleep(n)
return n
with concurrent.futures.ThreadPoolExecutor() as exec:
f = exec.submit(slow, 5)
try:
print(f.result(timeout=2))
except concurrent.futures.TimeoutError:
print("타임아웃 발생")
f.cancel()
- Future의 `result(timeout=…)`로 타임아웃 제어 가능하며, `cancel()`을 사용해 작업 중단도 가능합니다.
✅ 4. `executor.map()` vs `submit()` 차이 및 예외 처리 전략
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in executor.map(cpu_task, [1000000, 2000000, 3000000]):
print(result)
# submit + as_completed 사용 시:
futures = [executor.submit(cpu_task, n) for n in [1000000,2000000,3000000]]
for f in concurrent.futures.as_completed(futures):
try:
print(f.result())
except Exception as e:
print("에러:", e)
- `map()`은 순서 보장, 예외 발생 시 자동 전파. `submit()`+`as_completed()`는 유연하지만 예외를 직접 처리해야 함.
✅ 5. 실제 벤치마크 비교 – 싱글 vs Thread vs Process
import timeit
setup = """
def cpu(x):
return sum(i*i for i in range(x))
data = [1000000]*4
"""
baseline = timeit.timeit("list(map(cpu, data))", setup=setup, number=5)
threaded = timeit.timeit("""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e:
list(e.map(cpu, data))
""", setup=setup + "\nimport concurrent.futures", number=5)
processed = timeit.timeit("""
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as e:
list(e.map(cpu, data))
""", setup=setup + "\nimport concurrent.futures", number=5)
print("싱글:", baseline)
print("Thread:", threaded)
print("Process:", processed)
- CPU 작업에선 `ProcessPoolExecutor`가 가장 빠르지만, 작은 작업일 경우 스레드가 더 유리할 수 있음.
✨ 요약 & 활용 팁 요약
- I/O 작업엔 **ThreadPoolExecutor**, CPU 집중 작업엔 **ProcessPoolExecutor** 추천
- `submit()` + `as_completed()`는 유동적인 결과 처리에, `map()`은 순차적 결과에 적합
- 타임아웃과 취소 기능으로 안정성 고려
- 작업 크기에 따라 Executor 유형 선택 및 `max_workers` 조절 필요
댓글
댓글 쓰기