CompletableFuture là gì? Code ví dụ Java CompletableFuture (Java 8)
(Xem lại: Code ví dụ Callable, Future, Executors trong Java)
(Xem lại: So sánh Future và CompletableFuture trong Java)
Trong bài trước chúng ta đã tìm hiểu cách xử lý bất đồng bộ trong Java với Future tuy nhiên nó vẫn có rất nhiều hạn chế. Từ Java 8, ta có thêm class CompletableFuture
để khác phúc các vấn đề đó.
(CompletableFuture là tính năng mới của Java 8 nên nó sẽ sử dụng cú pháp, functional interface của Java 8. Nếu chưa nắm rõ thì các bạn có thể đọc lại: Functional Interface là gì? )
CompletableFuture là gì
CompletableFuture
là kết quả trả về của phép tính / method không đồng bộ, cho phép kiểm tra trạng thái của phép tính (đã thực hiện xong chưa, kết quả trả về là gì…), bắt sự kiện khi method hoàn thành…
Code ví dụ Java CompletableFuture
Ví dụ 1: chạy bất đồng bộ với CompletableFuture.supplyAsync
Mình muốn thực hiện nhiều phép tính tổng 2 số nguyên cùng lúc thì mình sẽ tạo class Calculator.java
và cung cấp method add()
package stackjava.com.tutorial; public class Calculator { public static int add(int a, int b) { int sum = a + b; System.out.println("result: " + a + " + " + b + " = " + sum); return sum; } }
Tiếp theo mình sẽ thực hiện gọi đồng thời 3 method add()
cùng một lúc
package stackjava.com.tutorial; import java.util.concurrent.*; public class Demo1 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor); CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor); System.out.println("Done"); executor.shutdown(); } }
Kết quả:
Done result: 1 + 2 = 3 result: 2 + 3 = 5 result: 1 + 3 = 4
Lệnh in ra "Done"
đươc viết cuối cùng nhưng lại hiện ra đầu tiên, các phép tính add()
cũng không theo thứ tự vì chúng được chạy cùng lúc với các thread khác nhau chứ không chạy lần lượt.
Mình giải thích lại một số dòng code bên trên:
Executors.newFixedThreadPool(10);
tức là tạo ra một thread pool chứa tối đa 10 thread chạy cùng lúc.executor.shutdown();
: Thực hiện tắt executor khi tất cả các Thread đã hoàn thành. Nếu bạn không có lệnh này thì chương trình của bạn sẽ chạy mãi vì nó luôn có một thread kiểm tra task trong executor để thực thi.CompletableFuture.supplyAsync
: thực hiện chạy bất đồng bộ (nó sẽ tạo một thread mới và chạy method trong thread đó). Đầu vào của nó sẽ là 1Supplier
và 1Executor
(thread mới sẽ được đưa vàoExecutor
để quản lý như theo dõi thread đã xong chưa, thread sẽ chạy luôn hay sẽ phải chờ…). Trường hợp kết quả trả về kiểuvoid
thì ta có thể dùngCompletableFuture.runAsync
Ví dụ 2: Lấy kết quả trả về từ CompletableFuture
Để lấy kết quả trả về từ CompletableFuture
ta dùng method get(), tuy nhiên khi dùng method này nó sẽ block lại thread main cho tới khi CompletableFuture
trả về kết quả thì thôi.
Ví dụ:
package stackjava.com.tutorial; import java.util.concurrent.*; public class Demo2 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor); CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor); future1.get(); System.out.println("Done"); executor.shutdown(); } }
Kết quả:
result: 1 + 2 = 3 result: 1 + 3 = 4 Done result: 2 + 3 = 5
Khi gọi: future1.get();
thì nó sẽ chờ Calculator.add(1,2)
trả về kết quả xong thì mới chạy tiếp các lệnh sau. Do đó dòng result: 1 + 2 = 3
sẽ luôn hiện ra trước dòng Done
(*Lưu ý, chính vì method get() block lại thread main nên nếu nó deplay quá lâu sẽ làm chậm chương trình dó đó ta thường dùng method với thời gian timeout nhất định, ví dụ:get(1, TimeUnit.SECONDS)
)
Ví dụ 3: Xử lý kết quả trả về với thenRun
, thenAccept
, handle
Với CompletableFuture
ta có thể bắt được sự kiện khi việc tính toán hoàn thành (điều mà Future
không làm được).
thenRun
là thực hiện làm gì khiCompletableFuture
hoàn thành (không cần quan tâm kết quả là gì).thenAccept
là xử lý kết quả khiCompletableFuture
hoàn thành.handle
dùng xử lý kết quả hoặc lỗi khiCompletableFuture
hoàn thành.
Ví dụ:
package stackjava.com.tutorial; import java.util.concurrent.*; public class Demo3 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor); CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor); System.out.println("Done"); executor.shutdown(); future1.thenRun(()->{ System.out.println("future1 completed!"); }); future2.thenAccept(result ->{ System.out.println("future2 completed, result = " + result); }); future3.handle((data, error) ->{ if (error != null){ System.out.println("future3 error, error: " + error); return null; } else { System.out.println("future3 completed, result = " + data); return data; } }); } }
Kết quả:
Done result: 2 + 3 = 5 result: 1 + 2 = 3 result: 1 + 3 = 4 future1 completed! future2 completed, result = 4 future3 completed, result = 5
Ngoài ra, thay vì sử dụng thenRun
, thenAccept
, handle
. Ta có thể dùng thenRunAsync
, thenAcceptAsync
, handleAsync
để xử lý kết quả trả về một cách bất đồng bộ (tức là khi có kết quả trả về ta sẽ tạo một thread mới để xử lý kết quả đó)
Ví dụ 4: Kết hợp nhiều CompletableFuture
Với CompletableFuture
ta có thể kết hợp xử lý nhiều CompletableFuture
như thực hiện lần lượt từng CompletableFuture
hoặc chờ cho tới khi tất cả các CompletableFuture
đều hoàn thành.
Trường hợp xử lý lần lượt từng CompletableFuture
. Các method thenRun
, thenAccept
đều trả về CompletableFuture
dó đó ta có thể thực hiện thenRun
, thenAccept
nhiều lần (mỗi lần tương ứng với 1 CompletableFuture
), trường hợp muốn dùng lại kết quả của CompletableFuture thì ta dùng method thenApply
Ví dụ:
package stackjava.com.tutorial; import java.util.concurrent.*; public class Demo4 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor); future.thenApplyAsync(data->{ System.out.println("CompletableFuture 1 done"); return Calculator.add(1,3); // CompletableFuture 2 }).thenApplyAsync(data->{ System.out.println("CompletableFuture 2 done"); return Calculator.add(2,3); // CompletableFuture 3 }).thenAcceptAsync(data ->{ System.out.println("CompletableFuture 3 done"); }).thenRun(()->{ System.out.println("Finished!"); }); executor.shutdown(); } }
Kết quả:
result: 1 + 2 = 3 CompletableFuture 1 done result: 1 + 3 = 4 CompletableFuture 2 done result: 2 + 3 = 5 CompletableFuture 3 done Finished!
Trường hợp muốn bắt sự kiện tất cả các CompletableFuture hoàn thành thì ta dùng method allOf()
Ví dụ:
package stackjava.com.tutorial; import java.util.concurrent.*; public class Demo5 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor); CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor); CompletableFuture<Void> futureAll = CompletableFuture.allOf(future1, future2, future3); futureAll.thenRunAsync(()->{ System.out.println("All future is Done!"); }); System.out.println("Done"); executor.shutdown(); } }
Kết quả:
result: 1 + 2 = 3 result: 2 + 3 = 5 result: 1 + 3 = 4 Done All future is Done!
ForkJoinPool.commonPool
Với các method thenRunAsync
, thenAcceptAsync
, runAsync
hay supplyAsync
nếu ta không truyền tham số Executor thì nó sẽ mặc định sử dụng pool mặc định là ForkJoinPool.commonPool()
với size là số CPU của máy tính).
Ví dụ:
package stackjava.com.tutorial; import java.util.concurrent.*; public class Demo6 { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2)); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3)); CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3)); System.out.println("Done"); while(ForkJoinPool.commonPool().getActiveThreadCount() > 0) { } } }
Okay, Done!
Download code ví dụ trên tại đây.
References: