ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [공부] 파이썬 코딩의 기술 책 정리 #7-2
    파이썬/책 정리 2021. 4. 21. 12:38

    55. Queue를 사용해 스레드 사이에서 작업을 조율하라

    동시성 작업을 처리할 때 가장 유용한 방식은 함수 파이프라인이다.

    파이프라인은 순차적으로 실행해야 하는 여러 단계가 있고, 각 단계마다 실행할 구체적인 함수가 정해진다.작업은 매 단계 함수가 완료될 때마다 다음 단계로 전달되며, 더 이상 실행할 단계가 없을 때 끝난다.

    queue 내장 모듈에 있는 Queue는 새로운 데이터가 나타날 때까지 get 메서드가 블록되게 만들어서 작업자의 바쁜 대기 문제를 해결한다.

     

    큐에 입력 데이터가 들어오기를 기다리는 스레드를 하나 시작한다.

    from queue import Queue
    
    my_queue = Queue()
    
    
    def consumer():
        print('소비자 대기')
        my_queue.get()  # 다음에 보여줄 put()이 실행된 다음에 실행된다.
        print('소비자 완료')
    
    
    thread = Thread(target=consumer)
    thread.start()
    
    print('생산자 데이터 추가')
    my_queue.put(object())  # get()이 실행되기전에 실행된다.
    print('생산자 완료')
    thread.join()
    

    파이프라인 중간에 막히는 경우를 해결하기 위해 Queue 클래스에서는 두 단계 사이에 허용할 수 있는 미완성 작업의 최대 개수를 지정할 수 있다. 버퍼의 크기를 정하면 큐가 이미 가득 찬 경우 put이 블록된다.

    from queue import Queue
    from threading import Thread
    import time
    
    my_queue = Queue(1)
    
    
    def consumer():
        time.sleep(0.1)  # 대기
        my_queue.get()  # 두 번째로 실행
        print('소비자 1')
        my_queue.get()  # 네 번째로 실행
        print('소비자 2')
        print('소비자 완료')
    
    
    thread = Thread(target=consumer)
    thread.start()
    
    my_queue.put(object())  # 첫 번째로 실행
    print('생산자 1')
    my_queue.put(object())  # 세 번쨰로 실행
    print('생산자 2')
    print('생산자 완료')
    thread.join()

     

    Queue 클래스의 task_done 메서드를 통해 작업의 진행을 추적할 수 있다. 이 메서드를 사용하면 어떤 단게의 입력 큐가 다 소진될 때까지 기다릴 수 있고, 파이프라인의 마지막 단계를 폴링할 필요도 없어진다.

    in_queue = Queue()
    def consumer():
        print('소비자 대기')
        work = in_queue.get() # 두 번째로 실행됨
        print('소비자 작업 중')
        ...
        print('소비자 완료')
        in_queue.task_done() # 세 번째로 실행됨
    
    thread = Thread(target=consumer)
    thread.start()
    """
    생산자 코드가 소비자 스레드를 조인하거나 폴링할 필요가 없다. 생산자는 Queue 인스턴스의 join 메서드를 호출함으로써 in_queue가 끝나기를 기다릴 수 있다. in_queue가 비어 있더라도 지금까지 이 큐에 들어간 모든 원소에 대해 task_done이 호출되기 전까지는 join이 끝나지 않는다.
    """
    print('생산자 데이터 추가')
    in_queue.put(object()) # 첫 번째로 실행됨
    print('생산자 대기')
    in_queue.join() # 네 번째로 실행됨
    print('생산자 완료')
    thread.join()
    

    이 모든 동작을 Queue 하위 클래스에 넣고, 처리가 끝났음을 작업자 스레드에 알리는 기능을 추가할 수 있다. 큐에 더 이상 다른 입력이 없음을 표시하는 특별한 센티넬 원소를 추가하는 close 메서드를 정의한다.

    class ClosableQueue(Queue):
        SENTINEL = object()
    
    
        def close(self):
            self.put(self.SENTINEL)
    
        def __iter__(self):
            while True:
                item = self.get()
                try:
                    if item is self.SENTINEL:
                        return
                    yield item
                finally:
                    self.task_done()

    작업자 스레드가 ClosableQueue 클래스의 동작을 활용하게 할 수 있다.

    class StoppableWorker(Thread):
        def __init__(self, func, in_queue, out_queue):
            self.func = func
            self.in_queue = in_queue
            self.out_queue = out_queue
    
        def run(self):
            for item in self.in_queue:
                result = self.func(item)
                self.out_queue.put(result)

    새 작업자 클래스를 사용해 작업자 스레드를 새로 정의한다.

    download_queue = ClosableQueue()
    resize_queue = ClosableQueue()
    upload_queue = ClosableQueue()
    done_queue = ClosableQueue()
    
    thread = [
        StoppableWorker(download, download_queue, resize_queue),
        StoppableWorker(resize, resize_queue, upload_queue),
        StoppableWorker(uplaod, upload_queue, done_queue),
    ]
    
    for thread in threads:
        thread.start()
    for _ in range(1000):
        download_queue.put(object())
    
    download_queue.close()
    download_queue.join()
    resize_queue.close()
    resize_queue.join()
    upload_queue.close()
    upload_queue.join()
    print(done_queue.qsize())
    
    for thread in threads:
        thread.join()

    정리

    • 순차적인 작업을 동시에 여러 파이썬 스레드에서 실행되도록 조직하고 싶을 때, 특히 I/O 위주의 프로그램인 경우라면 파이프라인이 매우 유용하다.
    • 동시성 파이프라인을 만들 때 발생할 수 있는 여러 가지 문제를 잘 알아준다.
    • Queue 클래스는 튼튼한 파이프라인을 구축할 때 필요한 기능인 블로킹 연산, 버퍼 크기 지정, join을 통한 완료 대기를 모두 제공한다.

    56. 언제 동시성이 필요할지 인식하는 방법을 알아두라

     프로그램이 다루는 영역이 커짐에 따라 복잡도도 증가한다. 프로그램의 명확성, 테스트 가능성, 효율성을 유지하면서 늘어나는 요구 조건을 만족시키는 것은 프로그래밍에서 가장 어려운 부분이다. 가장 처리하기 어려운 일은 단일 스레드 프로그래밍을 동시 실행되는 여러 흐름으로 이뤄진 프로그램으로 바꾸는 경우일 것이다.

     

     만약 기존의 프로그램에서 요구 사항이 바뀌어서 I/O(소켓 통신 등)가 필요하다면, I/O를 병렬로 수행해서 해결할 수 있다.
    각 작업 단위에 대해 동시 실행되는 여러 실행 흐름을 만들어내는 과정을 팬아웃 이라고 한다.

    전체를 조율하는 프로세스 안에서 다음 단계로 진행하기 전에 동시 작업 단위의 작업이 모두 끝날 때까지 기다리는 과정을 팬인 이라고 한다.

     

    파이썬은 팬아웃과 팬인을 지원하는 여러 가지 내장 도구를 제공한다.

    정리

    • 프로그램이 커지면서 범위와 복잡도가 증가함에 따라 동시에 실행되는 여러 실행 흐름이 필요해지는 경우가 많다.
    • 동시성을 조율하는 가장 일반적인 방법으로는 팬아웃(새로운 동시성 단위들을 만들어냄)과 팬인(기존 동시성 단위들의 실행이 끝나기를 기다림)이 있다.
    • 파이썬은 팬아웃과 팬인을 구현하는 다양한 방법을 제공한다.

    57. 요구에 따라 팬아웃을 진행하려면 새로운 스레드를 생성하지 말라.

    병렬 I/O를 실행하고 싶을 때는 자연스레 스레드를 가장 먼저 고려하게 된다. 그러나, 여러 동시 실행흐름을 만들어내는 팬아웃을 수행하고자 스레드를 사용할 경우 중요한 단점과 마주하게 된다.

    여기서는 game_logic 안에서 I/O를 수행함으로써 생기는 지연시간을 해결한다.

    from threading import Lock
    
    
    ALIVE = '*'
    EMPTY = '-'
    
    
    class Grid:
        ...
    
    
    class LockingGrid(Grid):
        def __init__(self, height, width):
            super().__init__(height, width)
            self.lock = Lock()
    
        def __str__(self):
            with self.lock:
                return super().__str__()
    
        def get(self, y, x):
            with self.lock:
                return super().__get__(y, x)
    
        def set(self, y, x, state):
            with self.lock:
                return super().set(y, x, state)

    step_cell 호출마다 스레드를 정의해 팬아웃 하도록 simulate 함수를 정의한다. 스레드를 병렬로 실행되며, 다른 I/O가 끝날 때까지 기다리지 않아도 된다. 다음 세대로 진행하기 전에 모든 스레드가 작업을 마칠 때까지 기다리므로 팬인 할 수 있다.

    from threading import Thread
    
    
    def count_neighbors(y, x, get):
        ...
    
    def game_logic(state, neighbors):
        ...
        # Blocking I/O
        data = my_socket.recv(100)
        ...
    
    def step_cell(y, x, get, set):
        state = get(y, x)
        neighbors = count_neighbors(y, x, get)
        next_state = game_logic(state, neighbors)
        set(y, x, next_state)
    
    def simulate_threaded(grid):
        next_grid = LockingGrid(grid.height, grid.width)
    
        threads = []
        for y in range(grid.height):
            for x in range(grid.width):
                args = (y, x, grid.get, next_grid.set)
                thread  = Thread(target=step_cell), args=args)
                thread.start()
                threads.append(thread)
    
        for thread in threads:
            thread.join()
    
        return next_grad
    
    

    스레드 사이에 I/O가 병렬화됐다. 하지만 이 코드에는 세 가지 문제점이 있다.

    1. Thread 인스턴스를 서로 안전하게 조율하려면 특별한 도구가 필요하다. 이로 인해 순차적인 단일 스레드보다 스레드를 사용하는 코드가 읽기 어렵다. 복잡도 때문에 시간이 지남에 따라 스레드를 사용한 코드를 확장하고 유지 보수하기도 더 어렵다.
    2. 스레드는 메모리를 많이 사용하며, 스레드 하나당 약 8MB가 더 필요하다.
    3. 스레드를 시작하는 비용이 비싸며, 컨텍스트 전환에 비용이 들기 때문에 스레드는 성능에 부정적인 영향을 미친다.

    지속적으로 새로운 동시성 함수를 시작하고 끝내야 하는 경우 스레드는 적절한 해법이 아니다. 파이썬은 이런 경우에 더 적합한 다른 해법을 제공한다.

    정리

    • 스레드는 많은 단점이 있다. 스레드를 시작하고 실행하는 데 비용이 들기 때문에 스레드가 많이 필요하면 상당히 많은 메모리를 사용한다. 그리고 스레드 사이를 조율하기 위해 Lock 과 같은 특별한 도구를 사용해야 한다.
    • 스레드를 시작하거나 스레드가 종료하기를 기다리는 코드에게 스레드 실행중에 발행한 예외를 돌려주는 파이썬 내장 기능은 없다. 이로 인해 스레드 디버깅이 어렵다.

    댓글