본문 바로가기
함수형 프로그래밍

[ch.13] 비동기 작업

by tjdgus 2024. 8. 2.

동기 & 비동기

동기(Synchronous)

  • 작업을 순차적으로 수행하며, 하나의 작업이 완료된 후에 다음 작업을 시작할 수 있다.
  • 호출한 작업이 완료될 때까지 호출자는 대기 상태에 있다.
  • 작업의 결과를 즉시 반환받는다.

비동기(Asynchronous)

  • 작업의 완료 여부와 관계없이 다음 작업을 시작하는 방식이다.
  • 작업의 완료를 콜백이나 이벤트를 통해 알림 받는다.
동기와 비동기는 데이터의 순서와 결과 처리에 관점을 둔다.

 

Blocking

  • 자신의 작업을 진행하다가 다른 주체의 작업이 시작되면,
    다른 작업이 끝날 때까지 기다렸다가 자신의 작업을 시작한다. 

Non-Blocking

  • 다른 주체의 작업에 관련없이자신의 작업을 하는 것
Blocking과 Non-Blocking은 작업 제어에 관점을 둔다.

 

  • Sync-Blocking: 메인 프로세스에서 제어권이 넘어가 작업이 끝난 후 제어권을 다시 받는다.
  • Sync-NonBlocking: 메인 프로세스에서 제어권을 넘겼다가 바로 받지만 호출된 기능의 완료에 관심을 보인다.
  • Async-Blocking: 메인 프로세스에서 제어권이 넘어갔지만 호출된 기능의 완료에 관심을 갖지 않는다.
  • Async-NonBlocking: 메인 프로세스에서 제어권을 넘겼다가 바로 받고 호출된 기능의 완료에 관심을 갖지 않는다.

 

자바 Future

  • 자바 5에 도입된 비동기 계산의 최종 결과를 담을 컨테이너 타입
  • 별도의 스레드에서 작업을 시작하지만, 즉시 Future 인스턴스를 반환한다.
  • 이 방식을 통해 현재 스레드는 Future 계산의 최종 결과를 기다리지 않고 더 많은 작업을 수행할 수 있다.
public class FutureEx {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        Callable<Integer> expensiveTask = () -> {
            System.out.println("Task start");

            TimeUnit.SECONDS.sleep(2);

            System.out.println("Task done");
            
            return 42;
        };

        System.out.println("before submitting the task");

        // expensiveTask의 계산은 즉시 시작되어 결괏값에 반영된다.
        Future<Integer> future = executor.submit(expensiveTask);

        System.out.println("after submitting the task");

        // 이 시점에서 계산은 아직 완료되지 않았으므로
        // future의 get 메서드를 호출하면 계산이 완료될 때까지 현재의 스레드가 차단된다.
        Integer theAnswer = future.get();

        System.out.println("after the blocking call future.get()");
    }
}
  • Future는 비동기 작업에 대한 결과값을 반환 받을 수 있다.
  • 하지만 계산이 완료되었는지 확인하고 취소하며 결과를 검색하는 간단한 기능만 제공한다.
  • 효과적인 비동기 프로그래밍을 위해 다음과 같은 기능들이 추가적으로 필요할 수 있다.
    • 완료 또는 실패 시 콜백과 같이 결과를 쉽게 확인할 수 있는 기능.
    • 함수적 구성의 원칙에 따라 여러 작업을 연결하고 결합하는 방법.
    • 통합된 오류 처리 및 복구 가능성
    • ExecutorService 없이도 수동으로 작업을 생성하거나 완료할 수 있는 방법
  • 자바 8은 Future의 부족한 기능을 보완하기 위해
    CompletionStage 인터페이스와 그 구현체인 CompletableFuture 를 도입.

 

CompletableFuture

Future의 진화된 형태로써 외부에서 작업을 완료시킬 수 있을 뿐만 아니라 콜백 등록 및 Future 조합 등이 가능하다.

 

생성

Future와 CompletableFuture는 인스턴스 생성 방법이 유사하지만, 
CompletableFuture는 ExecutorService를 필요로 하지 않아 더 간결한 특성을 가진다.

// Future<T>
ForkJoinPool executorService = ForkJoinPool.commonPool(); //

Future<?> futureRunnable = executorService.submit(() -> System.out.println("not returning a value"));

Future<String> submit = executorService.submit(() -> "Hello, Async World!");

// CompletableFuture<T>
CompletableFuture<Void> completableFutureRunnable =
        CompletableFuture.runAsync(() -> System.out.println("not returning a value"));

CompletableFuture<String> completableFutureSupplier =
        CompletableFuture.supplyAsync(() -> "Hello, Async World!");

 

작업 실행

runAsync와 supplyAsync는 ForkJoinPool의 commonPool()을 사용해 
작업을 실행할 스레드를 스레드 풀로부터 얻어 실행한다.

  • runAsync
void runAsync() throws InterruptedException, ExecutionException {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
    });

    future.get();
    System.out.println("Thread: " + Thread.currentThread().getName());
}
// Thread: ForkJoinPool.commonPool-worker-1
// Thread: main
  • supplyAsync 는 runAsync와 달리 반환값이 존재한다.
void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    });

    System.out.println(future.get());
    System.out.println("Thread: " + Thread.currentThread().getName());
}

 

 

작업 콜백

  • thenApply : 값을 받아서 다른 값을 반환시켜주는 콜백
void thenApply() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    }).thenApply(String::toUpperCase);

    System.out.println(future.get());
}
// THREAD: FORKJOINPOOL.COMMONPOOL-WORKER-1
  • thenAccept : 반환 값을 받아서 사용하고, 값을 반환하지 않는 콜백
void thenAccept() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    }).thenAccept(s -> {
        System.out.println(s.toUpperCase());
    });

    future.get();
}
  • thenRun : 반환 값을 받지 않고, 다른 작업을 실행하는 콜백
void thenRun() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    }).thenRun(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
    });

    future.get();
}

 

작업 조합

  • thenCompose : 두 작업이 이어서 실행하도록 조합하며, 앞선 작업의 결과를 받아서 사용할 수 있다.
void thenCompose() throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = java.util.concurrent.CompletableFuture.supplyAsync(() -> {
        return "Hello";
    });
    
    CompletableFuture<String> future = hello.thenCompose(this::message);
    System.out.println(future.get());
}

private CompletableFuture<String> message(String message) {
    return CompletableFuture.supplyAsync(() -> {
        return message + " " + "World";
    });
}
  • thenCombine : 각각의 작업들이 독립적으로 실행되고, 얻어진 두 결과를 조합해서 작업을 처리한다.
void thenCombine() throws ExecutionException, InterruptedException {
    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        return "Hello";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
        return "World";
    });

    CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
    System.out.println(future.get());
}

 

예외 처리

  • exceptionally : 발생한 에러를 받아 예외 처리
    • throw 여부에 따라 실행 결과가 다르게 출력된다.
void exceptionally(boolean doThrow) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (doThrow) {
            throw new IllegalArgumentException("Invalid Argument");
        }

        return "Thread: " + Thread.currentThread().getName();
    }).exceptionally(Throwable::getMessage);

    System.out.println(future.get());
}

exceptionally(true); // java.lang.IllegalArgumentException: Invalid Argument
exceptionally(false); // Thread: ForkJoinPool.commonPool-worker-1
  • handle : 결과값과 에러를 반환받아 에러가 발생한 경우와 아닌 경우 모두를 처리할 수 있다
void handle(boolean doThrow) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (doThrow) {
            throw new IllegalArgumentException("Invalid Argument");
        }

        return "Thread: " + Thread.currentThread().getName();
    }).handle((result, e) -> {
        return e == null
                ? result
                : e.getMessage();
    });

    System.out.println(future.get());
}

'함수형 프로그래밍' 카테고리의 다른 글

[ch.15] 자바를 위한 함수형 접근 방식  (0) 2024.08.13
[ch.14] 함수형 디자인 패턴  (0) 2024.08.02
[ch.12] 재귀  (1) 2024.07.20
[ch.11] 느긋한 계산법 (지연 평가)  (0) 2024.07.11
[ch.10] 예외 처리  (0) 2024.07.04