본문 바로가기
공부(~2016)/Java

[Java] java.util.concurrent.CountDownLatch, java.util.concurrent.CyclicBarrier 클래스

by soy; 2015. 1. 19.

java.util.concurrent.CountDownLatch 클래스, java.util.concurrent.CyclicBarrier 클래스는 여러개의 쓰레드를 동기시킬 때 사용하면 편리하다.


java.util.concurrent.CountDownLatch 클래스

어떤 쓰레드가 지정한 쓰레드가 종료되기를 기다릴 때에는 java.lang.Thread 클래스의 join 메소드를 이용한다. 하지만 join 메소드로 기다리는 것이 가능한 것은 '쓰레드의 종료'라고 하는 단 한 번의 액션뿐이다. 따라서 '지정한 횟수만큼 어떠한 액션이 일어나는 것을 기다린다'라는 것은 불가능하다.

java.util.concurrent.CountDownLatch 클래스를 사용하면 '지정한 횟수만큼 countdown 메소드가 호출되기를 기다린다'는 기능을 구현할 수 있다.


아래의 예제는 쓰레드에 MyTask라는 일을 10개 처리하도록 하고, 10개의 일이 전부 끝나기를 기다리는 프로그램이다.

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
 
public class Main {
    private static final int TASKS = 10; // 일의 갯수
 
    public static void main(String[] args) {
        System.out.println("BEGIN");
        ExecutorService service = Executors.newFixedThreadPool(5);
        CountDownLatch doneLatch = new CountDownLatch(TASKS);
        try {
            // 일을 시작한다
            for (int t = 0; t < TASKS; t++) {
                service.execute(new MyTask(doneLatch, t));
            }
            System.out.println("AWAIT");
            // 일이 종료되기를 기다린다
            doneLatch.await();
        } catch (InterruptedException e) {
        } finally {
            service.shutdown();
            System.out.println("END");
        }
    }
}

1. 일을 실행하기 위한 ExecutorService 객체를 생성한다. (쓰레드 5개를 가진 쓰레드풀)

2. CountDownLatch 클래스의 인스턴스를 생성한다. 이 때 생성자의 인자로 일의 갯수를 전달한다.

3. execute 메소드를 10번 호출하여, 10개의 일을 시작하도록 한다. 내부에서 10개의 MyTask 쓰레드가 기동되어 일을 처리한다.

4. CountDownLatch 클래스의 await 메소드를 호출하여, 카운트 값이 0이 될 때까지 기다린다.

5. 카운트 값이 0이 되었다면 shutdown 메소드를 호출하여 service를 종료한다.


import java.util.Random;
 
import java.util.concurrent.CountDownLatch;
 
public class MyTask implements Runnable {
    private final CountDownLatch doneLatch;
    private final int context;
    private static final Random random = new Random(314159);
 
    public MyTask(CountDownLatch doneLatch, int context) {
        this.doneLatch = doneLatch;
        this.context = context;
    }
 
    public void run() {
        doTask();
        doneLatch.countDown();
    }
 
    protected void doTask() {
        String name = Thread.currentThread().getName();
        System.out.println(name + ":MyTask:BEGIN:context = " + context);
        try {
            Thread.sleep(random.nextInt(3000));
        } catch (InterruptedException e) {
        } finally {
            System.out.println(name + ":MyTask:END:context = " + context);
        }
    }
} 

MyTask 클래스는 일의 내용을 나타내는 클래스이며 Runnable 인터페이스를 implements 한다.

run 메소드에서는 다음 처리를 실행한다.

1. doTask 메소드를 호출하여 '실제 처리'를 실행한다.

2. CountDownLatch 클래스의 countDown 메소드를 호출하여 카운트 값을 1 감소시킨다.


지정된 갯수만큼의 모든 MyTask가 처리 완료되면 카운트 값이 0이 되고, 메인쓰레드는 await 메소드로부터 돌아오게 된다. 이로써 MyTask를 실행하던 쓰레드가 메인쓰레드에게 일 처리가 종료되었음을 알리는 것이다.



java.util.concurrent.CyclicBarrier 클래스

앞에서 살펴본 CountDownLatch 클래스의 인스턴스는 카운트다운밖에 할 수 없다. 즉 한 번 카운트가 0에 도달하면 await 메소드를 호출해도 바로 돌아와버린다. 쓰레드의 동기를 몇 번이고 반복하고 싶을 때에는 java.util.concurrent.CyclicBarrier 클래스를 사용한다.


CyclicBarrier는 주기적으로 (cyclic 하게) 장벽(barrier)을 만든다. 장벽에 부딪힌 쓰레드는 장벽이 사라질 떄까지 다음 단게로 나아갈 수 없다. 장벽이 사라지는 조건은 생성자에서 지정한 개수의 쓰레드가 그 장벽에 도착하는 것이다. 즉, 지정한 개수의 쓰레드가 장벽에 도착하면 장벽이 사라지고 쓰레드들이 모두 실행을 시작하게 된다.


아래의 예제는 0~4단계까지 다섯 단계를 거치는 일을 3개의 쓰레드가 처리하는 프로그램이다.

단, 3개의 쓰레드 모두가 N단계를 종료할 떄까지는 어떤 쓰레드도 다음 단계로 나아갈 수 없다. 즉, 3개의 쓰레드들의 단계 맞추기를 위하여 CyclicBarrier 클래스를 사용한다. 또한 3개의 쓰레드가 모든 단계를 종료했다는 사실을 메인쓰레드에게 알리기 위해 CountDownLatch 클래스를 사용한다.


CyclicBarrier 인스턴스를 생성할 때 Runnable 객체를 인자로 전달한다. 이 객체를 barrier action이라고 한다. barrier action은 장벽이 해제될 때마다 실행된다.


import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.CountDownLatch;
 
public class Main {
    private static final int THREADS = 3; // 쓰레드의 갯수
 
    public static void main(String[] args) {
        System.out.println("BEGIN");
 
        // 일을 실행하는 쓰레드를 제공하는 ExecutorService (쓰레드풀)
        ExecutorService service = Executors.newFixedThreadPool(THREADS);
 
        // barrier가 해제될 때 실행될 액션 (Runnable 객체)
        Runnable barrierAction = new Runnable() {
            public void run() {
                System.out.println("Barrier Action!");
            }
        };
 
        // 쓰레드들 간에 단계를 맞추기 위한 CyclicBarrier
        CyclicBarrier phaseBarrier = new CyclicBarrier(THREADS, barrierAction);
 
        // 모든 단계가 CountDownLatch
        CountDownLatch doneLatch = new CountDownLatch(THREADS);
 
        try {

            for (int t = 0; t < THREADS; t++) {
                service.execute(new MyTask(phaseBarrier, doneLatch, t));
            }

            System.out.println("AWAIT");
            doneLatch.await();
        } catch (InterruptedException e) {
        } finally {
            service.shutdown();
            System.out.println("END");
        }
    }
}

댓글0