동기 & 비동기
동기(Synchronous)
- 작업을 순차적으로 수행하며, 하나의 작업이 완료된 후에 다음 작업을 시작할 수 있다.
- 호출한 작업이 완료될 때까지 호출자는 대기 상태에 있다.
- 작업의 결과를 즉시 반환받는다.
비동기(Asynchronous)
- 작업의 완료 여부와 관계없이 다음 작업을 시작하는 방식이다.
- 작업의 완료를 콜백이나 이벤트를 통해 알림 받는다.
동기와 비동기는 데이터의 순서와 결과 처리에 관점을 둔다.
Blocking
- 자신의 작업을 진행하다가 다른 주체의 작업이 시작되면,
다른 작업이 끝날 때까지 기다렸다가 자신의 작업을 시작한다.
Non-Blocking
- 다른 주체의 작업에 관련없이자신의 작업을 하는 것
Blocking과 Non-Blocking은 작업 제어에 관점을 둔다.
- Sync-Blocking: 메인 프로세스에서 제어권이 넘어가 작업이 끝난 후 제어권을 다시 받는다.
- Sync-NonBlocking: 메인 프로세스에서 제어권을 넘겼다가 바로 받지만 호출된 기능의 완료에 관심을 보인다.
- Async-Blocking: 메인 프로세스에서 제어권이 넘어갔지만 호출된 기능의 완료에 관심을 갖지 않는다.
- Async-NonBlocking: 메인 프로세스에서 제어권을 넘겼다가 바로 받고 호출된 기능의 완료에 관심을 갖지 않는다.
자바 Future
- 자바 5에 도입된 비동기 계산의 최종 결과를 담을 컨테이너 타입
- 별도의 스레드에서 작업을 시작하지만, 즉시 Future 인스턴스를 반환한다.
- 이 방식을 통해 현재 스레드는 Future 계산의 최종 결과를 기다리지 않고 더 많은 작업을 수행할 수 있다.
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<Integer> expensiveTask = () -> {
System.out.println("Task start");
TimeUnit.SECONDS.sleep(2);
System.out.println("Task done");
return 42;
};
System.out.println("before submitting the task");
// expensiveTask의 계산은 즉시 시작되어 결괏값에 반영된다.
Future<Integer> future = executor.submit(expensiveTask);
System.out.println("after submitting the task");
// 이 시점에서 계산은 아직 완료되지 않았으므로
// future의 get 메서드를 호출하면 계산이 완료될 때까지 현재의 스레드가 차단된다.
Integer theAnswer = future.get();
System.out.println("after the blocking call future.get()");
}
}
- Future는 비동기 작업에 대한 결과값을 반환 받을 수 있다.
- 하지만 계산이 완료되었는지 확인하고 취소하며 결과를 검색하는 간단한 기능만 제공한다.
- 효과적인 비동기 프로그래밍을 위해 다음과 같은 기능들이 추가적으로 필요할 수 있다.
- 완료 또는 실패 시 콜백과 같이 결과를 쉽게 확인할 수 있는 기능.
- 함수적 구성의 원칙에 따라 여러 작업을 연결하고 결합하는 방법.
- 통합된 오류 처리 및 복구 가능성
- ExecutorService 없이도 수동으로 작업을 생성하거나 완료할 수 있는 방법
- 자바 8은 Future의 부족한 기능을 보완하기 위해
CompletionStage 인터페이스와 그 구현체인 CompletableFuture 를 도입.
CompletableFuture
Future의 진화된 형태로써 외부에서 작업을 완료시킬 수 있을 뿐만 아니라 콜백 등록 및 Future 조합 등이 가능하다.
생성
Future와 CompletableFuture는 인스턴스 생성 방법이 유사하지만,
CompletableFuture는 ExecutorService를 필요로 하지 않아 더 간결한 특성을 가진다.
// Future<T>
ForkJoinPool executorService = ForkJoinPool.commonPool(); //
Future<?> futureRunnable = executorService.submit(() -> System.out.println("not returning a value"));
Future<String> submit = executorService.submit(() -> "Hello, Async World!");
// CompletableFuture<T>
CompletableFuture<Void> completableFutureRunnable =
CompletableFuture.runAsync(() -> System.out.println("not returning a value"));
CompletableFuture<String> completableFutureSupplier =
CompletableFuture.supplyAsync(() -> "Hello, Async World!");
작업 실행
runAsync와 supplyAsync는 ForkJoinPool의 commonPool()을 사용해
작업을 실행할 스레드를 스레드 풀로부터 얻어 실행한다.
- runAsync
void runAsync() throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
future.get();
System.out.println("Thread: " + Thread.currentThread().getName());
}
// Thread: ForkJoinPool.commonPool-worker-1
// Thread: main
- supplyAsync 는 runAsync와 달리 반환값이 존재한다.
void supplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Thread: " + Thread.currentThread().getName();
});
System.out.println(future.get());
System.out.println("Thread: " + Thread.currentThread().getName());
}
작업 콜백
- thenApply : 값을 받아서 다른 값을 반환시켜주는 콜백
void thenApply() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Thread: " + Thread.currentThread().getName();
}).thenApply(String::toUpperCase);
System.out.println(future.get());
}
// THREAD: FORKJOINPOOL.COMMONPOOL-WORKER-1
- thenAccept : 반환 값을 받아서 사용하고, 값을 반환하지 않는 콜백
void thenAccept() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "Thread: " + Thread.currentThread().getName();
}).thenAccept(s -> {
System.out.println(s.toUpperCase());
});
future.get();
}
- thenRun : 반환 값을 받지 않고, 다른 작업을 실행하는 콜백
void thenRun() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "Thread: " + Thread.currentThread().getName();
}).thenRun(() -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
future.get();
}
작업 조합
- thenCompose : 두 작업이 이어서 실행하도록 조합하며, 앞선 작업의 결과를 받아서 사용할 수 있다.
void thenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = java.util.concurrent.CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> future = hello.thenCompose(this::message);
System.out.println(future.get());
}
private CompletableFuture<String> message(String message) {
return CompletableFuture.supplyAsync(() -> {
return message + " " + "World";
});
}
- thenCombine : 각각의 작업들이 독립적으로 실행되고, 얻어진 두 결과를 조합해서 작업을 처리한다.
void thenCombine() throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
return "World";
});
CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
System.out.println(future.get());
}
예외 처리
- exceptionally : 발생한 에러를 받아 예외 처리
- throw 여부에 따라 실행 결과가 다르게 출력된다.
void exceptionally(boolean doThrow) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (doThrow) {
throw new IllegalArgumentException("Invalid Argument");
}
return "Thread: " + Thread.currentThread().getName();
}).exceptionally(Throwable::getMessage);
System.out.println(future.get());
}
exceptionally(true); // java.lang.IllegalArgumentException: Invalid Argument
exceptionally(false); // Thread: ForkJoinPool.commonPool-worker-1
- handle : 결과값과 에러를 반환받아 에러가 발생한 경우와 아닌 경우 모두를 처리할 수 있다
void handle(boolean doThrow) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (doThrow) {
throw new IllegalArgumentException("Invalid Argument");
}
return "Thread: " + Thread.currentThread().getName();
}).handle((result, e) -> {
return e == null
? result
: e.getMessage();
});
System.out.println(future.get());
}
'함수형 프로그래밍' 카테고리의 다른 글
[ch.15] 자바를 위한 함수형 접근 방식 (0) | 2024.08.13 |
---|---|
[ch.14] 함수형 디자인 패턴 (0) | 2024.08.02 |
[ch.12] 재귀 (1) | 2024.07.20 |
[ch.11] 느긋한 계산법 (지연 평가) (0) | 2024.07.11 |
[ch.10] 예외 처리 (0) | 2024.07.04 |