-
[공부] 파이썬 코딩의 기술 책 정리 #7-3파이썬/책 정리 2021. 4. 21. 12:56
58. 동시성과
Queue
를 사용하기 위해 코드를 어떻게 리팩터링 하는지 이해하라.queue
내장 모듈의Queue
클래스를 사용해 파이프라인을 스레드로 실행하게 구현하는 것이다. 일반적인 접근 방법은 필요한 병렬 I/O의 숫자에 맞춰 미리 정해진 작업자 스레드를 만든다. 프로그램은 이를 통해 자원 사용을 제어하고, 새로운 스레드를 자주 시작하면서 생기는 부가 비용을 덜 수 있다.from queue import Queue class ClosableQueue(Queue): ... in_queue = ClosableQueue() out_queue = ClosableQueue()
in_queue
에서 원소를 소비하는 스레드를 여러 개 시작할 수 있다. 각 스레드는game_logic
을 호출해 원소를 처리한 다음out_queue
에 결과를 넣는다. 각 스레드는 동시에 실행되며 병력적으로 I/O를 수행하므로, 필요한 지연 시간이 줄어든다.from threading import Thread class StoppableWorker(Thread): ... def game_logic(state, neighbors): ... # Blocking I/O data = my_socket.recv(100) ... def game_logic_thread(item): y, x, state, neighbors = item try: next_state = game_logic(state, neighbors) except Exception as e: next_state = e return (y, x, next_state) # 스레드를 미리 시작한다. threads = [] for _ in range(5): thread = StoppableWorker( game_logic_thread, in_queue, out_queue) thread.start() threads.append(thread)
이제 큐와 상호작용하면서 상태 전이 정보를 요청하고 응답을 받도록
simulate
함수를 재정의할 수 있다. 원소를in_queue
에 추가하는 과정은 팬아웃 이고,out_queue
가 빈 큐가 될 때까지 원소를 소비하는 과정은 팬인 이다.ALIVE = '*' EMPTY = '-' class SimulationError(Exception): pass class Grid: ... def count_neighbors(y, x, get): .. def simulate_pipeline(grid, in_queue, out_queue): for y in range(grid.height): for x in range(grid.width): state = grid.get(y, x) neighbors = count_neighbors(y, x, grid.get) in_queue.put((y, x, state, neighbors)) in_queue.join() out_queue.close() next_grid = Grid(grid.height, grid.width ) for item in out_queue: y, x, next_state = item if isinstance(next_state, Exception): raise SimulationError(y, x) from next_state next_grid.set(y, x, next_state) return next_grid
Grid.get
과Grid.set
호출은 모두 새로운simulate_pipline
함수 안에서만 일어난다. 이는 동기화를 위해 Lock 인스턴스를 사용하는Grid
구현을 새로 만드는 대신에 기존의 단일 스레드 구현을 쓸 수 있다는 뜻이다.메모리를 폭팔적으로 사용하는 문제, (스레드) 시작 비용, 스레드를 디버깅하는 문제 등은 해결했지만, 여전히 많은 문제가 남아있다.
simulate_pipeline
함수가 따라가기 어렵다.- 코드의 가독성을 개선하려면
ClosableQueue
와StoppableWorker
라는 추가 지원 클래스가 필요하며, 복잡도가 늘어난다. - 병렬성을 활용해 필요에 따라 자동으로 시스템 규모가 확장되지 않는다. 미리 부하는 예측해서 잠재적인 병렬성 수준을 미리 지정해야 한다.
- 디버깅을 활성화하려면 발생한 예외를 작업 스레드에서 수동으로 잡아
Queue
를 통해 전달함으로써 주 스레드에서 다시 발생사켜줘야 한다.
하지만 가장 큰 문제점은 요구 사항이 변경될 떄 드러난다.
count_neighbors
함수에서도 I/O를 수행해야 한다고 가정한다면,def count_neighbors(y, x, get): ... #Blocking I/O data = my_socket.recv(100) ...
이 코드를 병렬화하려면
count_neighbors
를 별도의 스레드에서 실행하는 단계를 파이프라인에 추가해야 한다. 작업자 스레드 사이의 동기화를 위해Grid
클래스에 대해Lock
을 사용해야 한다.def count_neighbors_thread(item): y, x, sate, get = item try: neighbors = count_neighbors(y, x, get) except Exception as e: neighbors = e return (y, x, state, neighbors) def game_logic_thread(item): y, x, state, neighbors = item if isinstance(neighbors, Exception): next_state = neighbors else: try: next_state = game_logic(state, neighbors) except Exception as e: next_state = e return (y, x, next_state) class LockingGrid(Grid): ...
count_neighbors_thread
작업자와 그에 해당하는Thread
인스턴스를 위해 또 다른Queue
인스턴스 집합을 만들어야 한다.in_queue = ClosableQueue() logic_queue = ClosableQueue() out_queue = ClosableQueue() threads = [] for _ in range(5): thread = StoppableWorker( count_neighbors_thread, in_queue, logic_queue) thread.start() threads.append(thread) for _ in range(5): thread = StoppableWorker( game_logic_thread, logic_queue, out_queue) thread.start() threads.append(thread) def simulate_phased_pipeline( grid, in_queue, logic_queue, out_queue): for y in range(grid.height): for x in range(grid.width): state = grid.get(y, x) item = (y, x, state, grid.get) in_queue.put(item) # 팬 아웃 in_queue.join() logic_queue.join() out_queue.close() next_grid = LockingGrid(grid.height, grid.width) for item in out_queue: y, x, next_state = item if isinstance(next_state, Exception): raise SimulationError(y, x) from next_state next_grid.set(y, x, next_state) return next_grid
정리
- 작업자 스레드 수를 고정하고
Queue
와 함께 사용하면 스레드를 사용할 때 팬인과 팬아웃의 규모 확장성을 개선할 수 있다. Queue
를 사용하도록 기존 코드를 리팩터링하려면 상당히 많은 작업이 필요하다. 특히 다단계로 이뤄진 파이프라인이 필요하면 작업량이 더 늘어난다.- 다른 파이썬 내장 기능이나 모듈이 제공하는 병렬 I/O를 가능하게 해주는 다른 기능과 비교하면,
Queue
라는 프로그램이 활용할 수 있는 전체 I/O 병렬성의 정도를 제한한다는 단점이 있다.
59. 동시성을 위해 스레드가 필요한 경우에는 ThreadPoolExecuter를 사용하라
Grid
의 각 셀에 대해 새Thread
인스턴스를 시작하는 대신, 함수를 실행기(executor)에 제출함으로써 팬아웃 할 수 있다. 실행기는 제출받은 함수를 별도의 스레드에서 수행해준다.실행기는 사용할 스레드를 미리 할당한다. 따라서
simulate_pool
을 실행할 때마다 스레드를 시작하는 데 필요한 비용이 들지 않는다. 또한, 스레드 풀에 사용할 스레드의 최대 개수를 지정할 수도 있다.from concurrent.futures import ThreadPoolExecutor def simulate_pool(pool, grid): next_grid = LockingGrid(grid.height, grid.width) futures = [] for y in range(grid.height): for x in range(grid.width): args = (y, x, grid.get, next_grid.set) future = pool.submit(step_cell, *args) # 팬아웃 futures.append(future) for future in futures: future.result() # 팬인 return next_grid with ThreadPoolExecutor(max_workers=10) as pool: for i in range(5): grid = simulate_pool(pool, grid)
ThreadPoolExecutor
클래스에서 가장 좋은 점은 submit 메서드가 반환하는Future
인스턴스에 대해result
메서드를 호출하면 스레드를 실행하는 중에 발생한 예외를 자동으로 전파해준다. 하지만,ThreadPoolExecutor
가 제한된 수의 I/O 병렬성만 제공한다는 큰 문제점이 남아 있다. 비동기적인 해법이 존재하지 않는 상황을 처리할 때는 좋은 방법이다.정리
ThreadPoolExecutor
를 사용하면 한정된 리팩터링만으로 간단한 I/O 병렬성을 활성화할 수 있다. 동시성을 팬아웃해야 하는 경우에 발생하는 스레드 시작 비용을 쉽게 줄일 수 있다.ThreadPoolExecutor
를 사용하면 스레드를 직접 사용할 때 발생할 수 있는 잠재적인 메모리 낭비 문제를 없애주지만max_worker
의 개수를 미리 지정해야 하므로 I/O 병렬성을 제한한다.
60. I/O를 할 때는 코루틴을 사용해 동시성을 높여라
파이썬은 높은 I/O 동시성을 처리하기 위해 코루틴 을 사용한다. 코루틴을 사용하면 파이썬 프로그램 안에서 동시에 실행되는 것처럼 보이는 함수를 아주 많이 쓸 수 있다. 코루틴은 async 와 await 키워드를 사용해 구현되며, 제너레이터를 실행하기 위한 인프라를 사용한다.
코루틴을 시작하는 비용은 함수 호출뿐이다. 활성화된 코루틴은 종료될 떄까지 1KB 미만의 메모리를 사용한다. 스레드와 마찬가지로 코루틴도 환경으로부터 입력을 소비하고 결과를 출력할 수 있는 독립적인 함수다.
코루틴은 매 await 식에서 일시 중단되고 일시 중단된 대기 가능성 이 해결된 다음에 async 함수로부터 실행을 재개한다는 차이점이 있다. 여러 분리된 async 함수가 서로 장단을 맞춰 실행되면 마치 모든 async 함수가 동시에 실행되는 것처럼 보아며, 이를 통해 파이썬 스레드의 동시성 동작을 흉내낼 수 있다.
부가 비용, 시작 비용, 컨텍스트 전환 비용이 들지 않고 복잡한 락과 동기화 코드가 필요하지 않다.코루틴을 가능하게 하는 매커니즘은 이벤트 루프 로 다수의 I/O를 효율적으로 동시에 실행할 수 있다.
코루틴을 사용해 생명 게임을 구현한다.
ALIVE = '*' EMPTY = '-' class Grid: ... def count_neighbors(y, x, get): ... async def game_logic(state, neighbors): ... # 여기서 I/O를 수행 data = await my_socket.read(50) ... async def step_cell(y, x, get, set): state = get(y, x) neighbors = count_neighbors(y, x, get) next_state = await game_logic(state, neighbors) set(y, x, next_state) import asyncio async def simulate(grid): next_grid = Grid(grid.height, grid.width) tasks = [] for y in range(grid.height): for x in range(grid.width): task = step_cell( y, x, grid.get, next_grid.set) # 한번에 처리하기 위해 await을 안씀 tasks.append(task) await asyncio.gather(*task) # 여기서 사용 """ - step_cell을 호출해도 이 함수가 즉시 호출하지 않고 await 식에서 사용될 수 있는 coroutine 인스턴스를 반환 (마치 yield를 사용하는 제너레이터 함수를 호출하면 즉시 실행되지 않고 제너레이터를 반환하는 것과 비슷) 이와같은 실행 연기 메커니즘이 팬아웃을 수행 - asyncio 내장 라이브러리가 제공하는 gather gkatnsms 팬인을 수행 gather에 대해 적용한 await식은 이벤트 루프가 step_cell 코루틴을 동시에 실행하면서 완료될 떄마다 simulate 코루틴 실행을 재개하라고 요청 - 모든 실행이 단일 스레드에서 이뤄지므로 Grid 인스턴스 락을 사용할 필요가 없다. I/O는 asyncio가 제공하는 이벤트 루프의 일부분으로 병렬화된다. """
새로운 코드는 asyncio.run 함수를 사용해 simulate 코루틴을 이벤트 루프상에서 실행하고 각 함수가 의존하는 I/O를 수행한다.
for i in range(5): grid = asyincio.run(simulate(grid))
정리
async
키워드로 정의한 함수를 코루틴이라고 부른다. 코루틴을 호출하는 호출자는await
키워드를 사용해 자신이 의존하는 코루틴의 결과를 받을 수 있다.- 코루틴은 수만 개의 함수가 동시에 실행되는 것처럼 보이게 만드는 효과적인 방법을 제공한다.
- I/O를 병렬화하면서 스레드로 I/O를 수행할 때 발생할 수 있는 문제를 극복하기 위해 팬인과 팬아웃에 코루틴을 사용할 수 있다.
61. 스레드를 사용한 I/O를 어떻게
asyncio
로 포팅할 수 있는지 알아두라정리
- 파이썬은
for
루프,with
문, 제너레이터, 컴프리헨션의 비동기 버전을 제공하고, 코루틴안에서 기존 라이브러리 도우미 함수를 대신해 즉시 사용할 수 있는 대안을 제공한다. asyncio
내장 모듈을 사용하면 스레드와 블로킹 I/O를 사용하는 기존 코드를 코루틴과 비동기 I/O를 사용하는 코드로 쉽게 포팅할 수 있다.
62.
asyncio
로 쉽게 옮겨갈 수 있도록 스레드와 코루틴을 함꼐 사용하라코드베이스를 점진적으로 마이그레이션하면서 필요에 따라 테스트를 함께 갱신하며, 각 단계에서 모든 기능이 제대로 작동하는지 확인해야 한다. 이런 마이그레이션을 가능하려면, 코드베이스에서 블로킹 I/O에 스레드를 사용하는 부분과 비동기 I/O에 코루틴을 사용하는 부분이 서로 호환되면서 공존할 수 있어야한다.
스레드 기반 구현으로부터 코드를 점진적으로
asyncio
와 코루틴 기반으로 바꾸는 방법에는 하향식(Top-Down)과 상향식(Bottom-Up)이라는 두가지 방법이 있다.하향식이란
main
진입점처럼 코드베이스에서 가장 높은 구성 요소로부터 시작해 점차 호출 계층의 잎 부분에 위치한 개별 함수와 클래스로 내려가면서 작업한다.구체적인 단계는
- 최상위 함수가
def
대신async
def
를 사용하게 변경한다. - 최상위 함수가 I/O를 호출하는 모든 부분을
asyncio.run_in_executor
로 감싸라 run_in_executor
호출이 사용하는 자원이나 콜백이 제대로 동기화 됐는지 확인하란다.- 호출 계층의 잎쪽으로 내려가면서 중간에 있는 함수와 메서드를 코루틴으로 변환하고
get_event_loop
와run_in_executor
호출을 없애려고 시도한다.
상향식 접근 방법도 하향식 접근 방법과 비슷한 4단계로 이뤄지지만, 변환 과정에서 호출 계층을 반대 방향으로 옮겨간다.
구체적인 단계는
- 프로그램에서 잎 부분에 있는, 포팅하려는 함수의 비동기 코루틴 버전을 새로 만든다.
- 기존 동기 함수를 변경해서 코루틴 버전을 호출하고 실제 동작을 구현하는 대신 이벤트 루프를 실행하게 한다.
- 호출 계층을 한 단계올려서 다른 코루틴 계층을 만들고, 기존에 동기적 함수를 호출하던 부분을 1단계에서 정의한 코루틴 호출로 바꾼다.
- 이제 비동기 부분을 결합하기 위해 2단계에서 만든 동기적인 래퍼를 삭제한다.
정리
asyncio
이벤트 루프의run_in_executor
를 사용하면 코루틴이ThreadPoolExecutor
스레드 풀을 사용해 동기적인 함수를 호출할 수 있다. 이 기능을 활용하면 코드를 하향식으로asyncio
로 마이그레이션할 수 있다.asyncio
이벤트 루프의run_util_complete
메서드를 사용하면 동기적인 코드가 코루틴을 호출하고 완료를 기다릴 수 있다.asyncio.run_coroutine_threadsafe
도 같은 기능을 제공하지만 스레드 경계에서도 안전하게 작동한다. 이 두 메서드를 활용하면 코드를 상향식으로asyncio
로 마이그레이션할 때 도움이 된다.
63. 응답성을 최대로 높이려면 asyncio 이벤트 루프를 블록하지 말라
출력 파일 핸들에 대한
open
,close
,write
호출이 주 이벤트 루프에서 이루어지는 것은 큰 문제점이다. 프로그램을 실행하는 운영체제의 시스템 콜을 사용해야 하기 때문에 이벤트 루프를 상당히 오랫동안 블록할 수 있으므로 다른 코루틴이 진행하지 못하게 된다. 이로 인해 전체 응답성이 나빠지고, 특히 동시성이 아주 높은 서버에서는 응답 시간이 늘어날 수 있다.문제가 발생하는지 감시하고 싶으면
debug=True
라는 파라미터를asyncio.run
함수에 넘기면 된다.import asyncio import time async def slow_coroutine(): time.sleep(0.5) asyncio.run(slow_coroutine(), debug=True) """ executing <Task finished name='Task-1' coro=<slow_coroutine() done, defined at .../ex.py:5> result=None created at .../asyncio/base_events.py:595> took 0.503 seconds """
응답성을 최대로 높이려면 이벤트 루프 안에서 시스템 콜이 이뤄질 잠재적인 가능성을 최소화해야 한다.
정리
- 시스템 콜(블로킹 I/O와 스레드 시작도 포함)을 코루틴으로 만들면 프로그램의 응답성이 좋아지고 사용자가 느끼는 지연 시간을 줄일 수 있다.
debug=True
파라미터를asyncio.run
에 넘기면 이벤트 루프가 빨리 반응하지 못하게 방해하는 코루틴을 식별할 수 있다.
64. 진정한 병렬성을 살리려면 concurrent.future 를 사용하라
파이썬 전역 인터프리터 락(GIL)으로 인해 파이썬 스레드는 진정한 병렬 실행이 불가능하다. 일반적인 조언은 성능에 가장 결정적인 영향을 미치는 부분을 C 언어를 사용한 확장 모듈로 작성하라는 것이다. 파이썬 보다 빠르개 실행되고, GIL에 대해 신경 쓰지 않고 여러 CPU 코어를 활용할 수 있도록 네이티브 스레드를 시작할 수도 있다. C 확장 API는 문서화가 잘돼 있고, 다중 코어를 활용해야 할 때 마지막 방법이 될 수 있다.
문제는 한 부분만 C로 바꾸면 되는 경우가 드물다는 점이다. 파이썬 프로그럄을 최적화할 때 속도 저하의 원인이 코드의 어느 한 곳에만 있는 경우는 드물다.
concurrent.futures
내장 모듈을 통해 쉽게 사용할 수 있는multiprocessing
내장 모듈이 있다. 이 모듈을 사용하면 자식 프로세서로 다른 파이썬 인터프리터를 실행함으로써 파이썬에서 여러 CPU 코어를 활용할 수 있다. 자식 프로세스의 GIL이 주 인터프리터의 GIL과 분리된다. 각 자식 프록세스는 한 CPU 코어를 완전히 사용할 수 있다.from concurrent.futures import ProcessPoolExecutor def gcd(pair): a, b = pair low = min(a, b) for i in range(low, 0, -1): if a % i == 0 and b % i == 0: return i assert False, '도달할 수 없음' def main(): pool = ProcessPoolExecutor(max_workers=2) results = list(pool.map(gcd, NUMBERS)) NUBMERS = [...] if __name__ == '__main__': main()
ProcessPoolExecutor
클래스가 (multiprocessing
모듈이 제공하는 저수준 요소를 활용해) 실제로 하는 일- (부모) 이 객체(
ProcessPoolExecutor
인스턴스)는 입력 데이터로 들어온map
메서드에 전달된NUMBERS
의 각 원소를 취한다. - (부모) 이 객체는 1번에서 얻은 원소를
pickle
모듈을 사용해 이진 데이터로 직렬화한다. - (부모, 자식) 이 객체는 로컬 소켓을 통해 주 인터프리터 프로세스로부터 자식 인터프리터 프로세스에게 2번에서 직렬화한 데이터를 복사한다.
- (자식) 이 객체는
pickle
을 사용해 데이터를 파이썬 객체로 역직렬화한다. - (자식) 이 객체는
gcd
함수를 실행한다. 이때 다른 자식 인터프리터 프로세스와 병렬로 실행한다. - (자식) 이 캑체는
gcd
함수의 결과를 이진 데이터로 직렬화한다. - (부모, 자식) 이 객체는 로컬 소켓을 통해 자식 인터프리터 프로세스로부터 부모 인터프리터 프로세스에게 6번에서 직렬화한 결과 데이터를 돌려준다.
- (부모) 이 객체는 데이터를 파이썬 객체로 역직렬화한다.
- (부모) 여러 자식 프로세스가 돌려준 결과를 병합해서 한
list
로 만든다.
부모와 자식 프로세스 사이에 데이터가 오고 갈 때마다 항상 직렬화와 역직렬화가 일어나야 하므로 추가 비용이 매우 크다.
서로 잘 격리되고 레버리지가 큰 유형의 작업에는 좋다.
격리란? 프로그램의 다른 부분과 상태를 공유할 필요가 없는 함수를 실행한다는 뜻
레버리지란? 부모와 자식 사이에 주고받아야 하는 데이터 크는 작지만, 이 데이터로 인해 자식 프로세스가 계산해야 하는 연산의 양을 말한다.정리
- CPU 병복 지점을 C 확장 모듈로 옮기면 파이썬에 투자한 비용을 최대한 유지하면서 프로그램 성능을 개선에 효과적일 수 있다. 하지만 C 확장 모듈로 옮기려면 많은 비용이 들고 포팅하는 과정에서 버그가 생겨날 수도 있다.
multiprocessing
모듈을 사용하면 특정 유형의 파이썬 게산을 최소의 노력으로 병렬화할 수 있다.concurrent.futures
내장 모듈이 재공하는 간단한ProcessPoolExecutor
클래스를 호라용하면multiprocessing
의 능력을 최대한 활용할 수 있다.- 사용할 수 있는 모든 방법을 다 써보기 전에는
multiprocessing
이 제공하는 (복잡한) 고급기능을 사용하지 않는 것이 좋다.
'파이썬 > 책 정리' 카테고리의 다른 글
[공부] 파이썬 코딩의 기술 책 정리 #7-2 (0) 2021.04.21 [공부] 파이썬 코딩의 기술 책 정리 #7-1 (0) 2021.04.21 [공부] 파이썬 코딩의 기술 책 정리 #6 (0) 2021.04.07 [공부] 파이썬 코딩의 기술 책 정리 #5 (0) 2021.03.31 [공부] 파이썬 코딩의 기술 책 정리 #4 (0) 2021.03.28