이 내용은 "Java 언어로 배우는 디자인 패턴 입문 - 멀티쓰레드편" 책의 Chapter 05 부분을 정리한 것이다.
Producer-Consumer 패턴
- OS 수업에서 쓰레드를 배울 때 가장 먼저 접하게 되는 예제! 생산자-소비자 패턴
- 생산자쓰레드는 데이터를 작성하고, 소비자쓰레드는 데이터를 사용한다. 즉 생산자가 소비자에게 안전하게 데이터를 전달하는 것인데, 각각이 별개의 쓰레드로 동작함으로서 양쪽 처리속도에 차이가 있으면 문제가 생기게 된다.
- 생산활동과 소비활동이 올바른 순서대로 일어날 수 있도록 양쪽 쓰레드간에 처리 속도의 차이를 메워주는 중개자가 필요하다.
구현 예제
시나리오
- 요리사(Class MakerThread)는 케익(String cake)을 만들어서 테이블(Class Table)에 놓는다.
- 테이블에는 케익을 3개까지 놓을 수 있다.
- 테이블 위에 이미 3개의 케익이 있는데 요리사가 케익을 또 만들었다면 요리사는 기다려야 한다.
- 손님(Class EaterThread)은 테이블에 놓인 케익을 먹는다.
- 손님은 테이블에 먼저 놓여진 순서대로 케익을 먹는다.
- 테이블 위에 케익이 한 개도 없는데 손님이 케익을 먹고자 한다면 손님은 기다려야 한다.
public class Main { public static void main(String[] args) { Table table = new Table(3); // 케익을 3개까지 놓을 수 있는 테이블 인스턴스 new MakerThread("MakerThread-1", table, 31415).start(); // 숫자는 rand의 seed로 사용할 임의의 값 new MakerThread("MakerThread-2", table, 92653).start(); new MakerThread("MakerThread-3", table, 58979).start(); new EaterThread("EaterThread-1", table, 32384).start(); new EaterThread("EaterThread-2", table, 62643).start(); new EaterThread("EaterThread-3", table, 38327).start(); } }
Main 클래스.
케익을 3개 놓을 수 있는 테이블 인스턴스를 생성.
요리사 3명과 손님 3명을 생성 및 start.
import java.util.Random; public class MakerThread extends Thread { private final Random random; private final Table table; private static int id = 0; // 결과 출력시 알아보기 쉽도록, 케익에 만들어진 순서대로 id를 붙임 public MakerThread(String name, Table table, long seed) { // 생성자 super(name); this.table = table; this.random = new Random(seed); } public void run() { try { while (true) { Thread.sleep(random.nextInt(1000)); // 케익을 만드는 시간... String cake = "[ Cake No." + nextId() + " by " + getName() + " ]"; table.put(cake); // 케익을 만들어서 테이블 위에 올림. } } catch (InterruptedException e) { } } private static synchronized int nextId() { return id++; } }
요리사 역할을 하는 MakerThread 클래스. 그리고 케익 역할을 하는 String cake.
이 쓰레드는 케익(String cake)을 만들어서 테이블에 올리는(table.put(cake);) 작업을 영원히 반복하는 "생성자" 클래스이다.
import java.util.Random; public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { // 생성자 super(name); this.table = table; this.random = new Random(seed); } public void run() { try { while (true) { String cake = table.take(); // 테이블에 있는 케익 하나를 먹음 Thread.sleep(random.nextInt(1000)); // 케익을 먹는 시간... } } catch (InterruptedException e) { } } }
손님 역할을 하는 EaterThread 클래스.
이 쓰레드는 테이블에서 케익 하나를 집어서(table.take();) 먹는 작업을 영원히 반복하는 "소비자" 클래스이다.
public class Table { private final String[] buffer; private int tail; // 다음에 put(생산)할 위치 private int head; // 다음에 take(소비)할 위치 private int count; // buffer 안에 있는 케익 수 public Table(int count) { // 생성자 this.buffer = new String[count]; this.head = 0; this.tail = 0; this.count = 0; } // 케익을 놓는다 public synchronized void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName() + " puts " + cake); while (count >= buffer.length) { // Guarded Suspension 패턴. 케익을 놓을 장소가 없으면 기다린다 wait(); } buffer[tail] = cake; tail = (tail + 1) % buffer.length; count++; notifyAll(); } // 케익을 먹는다 public synchronized String take() throws InterruptedException { while (count <= 0) { // Guarded Suspension 패턴 먹을 케익이 없으면 기다린다 wait(); } String cake = buffer[head]; head = (head + 1) % buffer.length; count--; notifyAll(); System.out.println(Thread.currentThread().getName() + " takes " + cake); return cake; } }
- Table 클래스. FIFO 방식의 Queue로 구현되어 있음.
- put 메소드, take 메소드는 synchronized로 선언. (생산자/소비자간의 간섭을 막기 위한 것이 아니라, 여러개의 생산자쓰레드간의 간섭 및 여러개의 소비자쓰레드간의 간섭을 막기 위함)
- 메소드 선언부의 throws interruptedException 에 유의. "이 메소드는 취소가 가능하다"라는 의미로 일단 이해.
put 메소드
- Guarded Suspension 패턴이 적용되었다.
- 케익이 생산된 이후에는 Table 인스턴스의 상태가 변한 것이므로 notifyAll을 실행하여 wait 하고 있는 쓰레드를 모두 깨움.
take 메소드
- Guarded Suspension 패턴이 적용되었다.
- 케익이 소비된 이후에는 Table 인스턴스의 상태가 변한 것이므로 notifyAll을 실행하여 wait 하고 있는 쓰레드를 모두 깨움
Producer-Consumer 패턴에 필요한 등장 인물
1. Data : 예제의 String cake
- Data는 Producer 역할에 의해 만들어지며 Consumer에 의해 사용된다.
2. Producer : 예제의 MakerThread 클래스
- Producer는 Data를생성하여 Channel에 전달한다.
3. Consumer : 예제의 EaterThread 클래스
- Consumer는 Channel로부터 Data를 받아와서 사용한다.
4. Channel : 예제의 Table 클래스
- Channel은 Producer로부터 Data를 받아서 보관한다. 또한 Consumer의 요청이 들어오면 Data를 전달한다.
- Channel은 안전성을 확보하기 위해 Producer와 Consumer의 접근에 대해 Mutual Exclusion을 수행한다.
- 즉, Channel은 Producer와 Consumer 사이에서 Data를 전달하는 중계 지점, 채널으로서의 역할을 한다.
Safety는 Channel의 책임
이 패턴에서 안전성을 지킬 책임은 Channel에게 있다. Channel은 생산자/소비자 쓰레드 사이에서 Mutual Exclusion을 수행하며 Data를 정확하게 전달한다.
생산자 클래스와 소비자 클래스에는 Channel 클래스의 구현에 의존하지 않는다. 또한 다른 쓰레드의 작동을 전혀 신경쓰지 않고 put, take 메소드를 호출한다. synchronized, wait, notifyAll 등을 사용하여 멀티쓰레딩 환경을 고려하는 코드는 모두 Channel 클래스 안에 감추어져 있다.
Buffer의 크기 (Array vs LinkedList)
예제에서는 Queue를 크기가 3인 배열로 구현했다. 만약 데이터가 소비되는 간격보다 생산하는 간격이 빠른 경우, 언젠가 buffer는 꽉 차게 되고 소비자는 기다릴 수 밖에 없게 된다.
buffer의 사이즈를 늘리거나, 배열 대신 사이즈의 제한이 없는 링크드리스트를 사용한다면 보관할 수 있는 데이터의 수의 제한이 없어진다. 그러나 언젠가는 메모리 부족 현상이 발생할 가능성이 있다.
Queue vs Stack vs Priority Queue
Channel은 생산자로부터 데이터를 받아서 소비자에게 전달한다. 이 때, Channel이 가지고 있는 데이터가 여러개라면 그 중 어떠한 데이터를 전달해야 할까?
1. Queue - 먼저 받은 것부터 전달한다 (FIFO, 선입선출)
- Producer-Consumer 패턴에서 가장 자주 사용된다. 예제의 Table 클래스에서는 Queue를 사용하고 있다. Array로 구현하든 LinkedList로 구현하든 먼저 넣은 것이 먼저 나오면 Queue이다.
2. Stack - 나중에 받은 것을 먼저 전달한다 (LIFO, 후입선출)
- Producer-Consumer 패턴에서는 스택을 그다지 사용하지 않는다.
3. Priority Queue - 우선순위가 높은 것을 먼저 전달한다
- 데이터에 각각 우선순위를 매기고, 우선순위가 높은 것부터 소비자에게 전달하는 방법이다. 우선순위를 매기는 방법은 구현하기 나름이다.
- Queue, Stack 모두 Priority Queue의 특수한 경우라고 볼 수 있다. 오래전에 도착한 Data일수록 우선시되는 것이 Queue이고, 최근에 도착한 Data일수록 우선시되는 것이 Stack이다.
notify vs notifyAll
예제의 Table 클래스 (Channel 역할)의 notifyAll 메소드를 notify( 메소드로 바꾼다면 문제가 생길 수도 있다.
notify 메소드는 wait set 안에서 대기하고 있는 쓰레드들 중에서 한 개만 깨운다. 따라서 관계가 없는 쓰레드가 wait set에 들어가 있었다면, notify로 인해 그 쓰레드가 깨어날 수도 있다. 따라서 이러한 패턴에서는 반드시 notifyAll 메소드를 이용한다.
* 더 중요한 것은, wait set 안에 관계 없는 쓰레드가 섞이지 않도록 하는 것이다. 락을 걸 때 사용되는 object는 로컬 변수나 private 필드에 보관한다.
관련 패턴
Worker Thread 패턴
- Producer-Consumer 패턴에서는 생산자가 소비자에게 데이터를 전달하는 것에 초점이 맞춰진다.
- Worker Thread 패턴에서도 Client가 Worker에게 데이터를 전달한다. 그러나 데이터를 처리하는 쓰레드를 공유함으로써 돌려가면서 사용하고, 그 결과 쓰레드를 start할 때 들어가는 비용을 낮추는 부분에 초점이 맞춰진다.
Command 패턴
- 생산자가 소비자에게 전달하는 데이터 부분에 Command 패턴이 사용될 수 있다.
Strategy 패턴
- 생산자가 소비자에게 건내는 데이터의 순서를 정하는 부분에 Strategy 패턴을 사용할 수 있다.
BlockingQueue 인터페이스
java.util.concurrent 패키지의 Queue
- java.util.concurrent 패키지에는 Producer-Consumer 패턴의 Channel에 해당하는 역할을 하는 BlockingQueue 인터페이스와 이를 구현한 클래스가 있다.
- 적절한 상태가 될 때까지 쓰레드가 블록(wait)하는 큐이다. BlockingQueue 인터페이스는 java.util.Queue 인터페이스의 서브 인터페이스로서 offer 메소드, poll 메소드 또한 가지고 있다. 그러나 실제로 '블록한다'고 하는 기능을 충족하는 메소드는 BlockingQueue 인터페이스 고유의 put 메소드와 take 메소드이다.
- Blocking Queue는 인터페이스이므로 실제 사용하려면 이를 구현한 콘크리트 클래스를 이용하게 된다.
ArrayBlockingQueue 클래스 - Array로 구현한 BlockingQueue
- ArrayBlockingQueue 클래스에는 element 개수에 제한이 있는 BlockingQueue이다. Array가 꽉 찼는데도 데이터를 put하려 했을 때와, Array가 비어있는데 데이터를 가져가려 했을 때 블록한다.
LinkedBlockingQueue 클래스 - LinkedList로 구현한 BlockingQueue
- LinkedBlockingQueue 클래스는 element 개수에 제한이 없는 BlockingQueue이다. 특별한 지정이 없는 한 element 개수에는 제한이 없어서 메모리가 허락하는 한 데이터를 put할 수 있다.
PriorityBlockingQueue 클래스 - 우선순위가 있는 BlockingQueue
- 각 데이터에 우선순위를 지정. 우선순위는 Comparable, Comparator 인터페이스를 사용하여 지정한다.
DelayQueue 클래스 - 일정 시간이 지나지 않으면 take 할 수 없는 BlockingQueue
- DelayQueue 클래스는 java.util.concurrent.Delayed 객체를 보관하는 Queue이다. 이 Queue로부터 take할 때에는 각 element별로 지정한 시간이 지난 것이 아니면 take할 수 없다. 또한 경과시간이 오래된 element부터 우선적으로 take한다.
SynchronousQueue 클래스 - 직접 전달하는 BlockingQueue
- SynchronousQueue 클래스는 생산자로부터 소비자에게 데이터를 직접 전달하는 BlockingQueue이다. 생산자가 먼저 put 하려면 소비자가 take 하고 올 때까지 생산자는 계속 블록하고, 반대로 소비자가 먼저 take 하려면 생산자가 put 하고 올 때까지 계속 블록한다.
ConcurrentLinkedQueue 클래스 - element 개수에 제한이 없는 thread-safety Queue
- ConcurrentLinkedQueue 클래스는 BlockingQueue를 구현한 클래스는 아니지만, element 개수에 제한이 없는 thread-safety한 Queue이다. 이 클래스에서는 간섭하지 않는 쓰레드끼리는 mutual exclusion을 하지 않도록 하기 위해 내부 데이터의 구조를 분할한다. 쓰레드의 상황에 따라서는 수행 능력을 향상시킬 수 있다.
java.util.concurrent.ArrayBlockingQueue를 사용한 경우의 Table 클래스 (Channel 역할)
import java.util.concurrent.ArrayBlockingQueue; public class Table extends ArrayBlockingQueue<String> { public Table(int count) { super(count); } public void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName() + " puts " + cake); super.put(cake); } public String take() throws InterruptedException { String cake = super.take(); System.out.println(Thread.currentThread().getName() + " takes " + cake); return cake; } }
java.util.concurrent.Exchanger 클래스에 의한 버퍼 교환
java.util.concurrent.Exchanger 클래스는 두 개의 쓰레드가 객체를 안전하게 교환하기 위한 클래스이다.이를 Channel로 이용하여 Producer-Consumer 패턴을 구현할 수 있다.
버퍼에 데이터를 채워넣는 생산자클래스와 버퍼로부터 데이터를 꺼내는 소비자클래스가 각자의 버퍼를 서로 교환하도록 구현한다.
ProducerThread는 다음의 처리를 반복한다.
1. 버퍼가 꽉 찰 때까지 문자를 채운다.
2. 가득 찬 버퍼를 exchange 메소드로 ConsumerThread에게 전달한다.
3. 전달한 버퍼와 빈 버퍼를 교환하여 받는다.
ConsumerThread는 다음의 처리를 반복한다.
1. 빈 버퍼를 exchange 메소드로 ProducerThread에게 전달한다.
2. 빈 버퍼와 문자가 꽉 찬 버퍼를 교환한다.
3. 버퍼 안의 문자를 사용한다.
import java.util.concurrent.Exchanger; public class Main { public static void main(String[] args) { Exchanger<char[]> exchanger = new Exchanger<char[]>(); char[] buffer1 = new char[10]; char[] buffer2 = new char[10]; new ProducerThread(exchanger, buffer1, 314159).start(); new ConsumerThread(exchanger, buffer2, 265358).start(); } }
import java.util.Random; import java.util.concurrent.Exchanger; public class ProducerThread extends Thread { private final Exchanger<char[]> exchanger; private char[] buffer = null; private char index = 0; private final Random random; public ProducerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) { super("ProducerThread"); this.exchanger = exchanger; this.buffer = buffer; this.random = new Random(seed); } public void run() { try { while (true) { // 버퍼에 문자를 채워 넣는다 for (int i = 0; i < buffer.length; i++) { buffer[i] = nextChar(); System.out.println(Thread.currentThread().getName() + ": " + buffer[i] + " -> "); } // 버퍼를 교환한다 System.out.println(Thread.currentThread().getName() + ": BEFORE exchange"); buffer = exchanger.exchange(buffer); System.out.println(Thread.currentThread().getName() + ": AFTER exchange"); } } catch (InterruptedException e) { } } // 문자를 생산한다 private char nextChar() throws InterruptedException { char c = (char)('A' + index % 26); index++; Thread.sleep(random.nextInt(1000)); return c; } }
import java.util.Random; import java.util.concurrent.Exchanger; public class ConsumerThread extends Thread { private final Exchanger<char[]> exchanger; private char[] buffer = null; private final Random random; public ConsumerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) { super("ConsumerThread"); this.exchanger = exchanger; this.buffer = buffer; this.random = new Random(seed); } public void run() { try { while (true) { // 버퍼를 교환한다 System.out.println(Thread.currentThread().getName() + ": BEFORE exchange"); buffer = exchanger.exchange(buffer); System.out.println(Thread.currentThread().getName() + ": AFTER exchange"); // 버퍼로부터 문자를 꺼낸다 for (int i = 0; i < buffer.length; i++) { System.out.println(Thread.currentThread().getName() + ": -> " + buffer[i]); Thread.sleep(random.nextInt(1000)); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
댓글