STACKJAVA

CompletableFuture là gì? Code ví dụ Java CompletableFuture (Java 8)

CompletableFuture là gì? Code ví dụ Java CompletableFuture (Java 8)

(Xem lại: Code ví dụ Callable, Future, Executors trong Java)

(Xem lại: So sánh Future và CompletableFuture trong Java)

Trong bài trước chúng ta đã tìm hiểu cách xử lý bất đồng bộ trong Java với Future tuy nhiên nó vẫn có rất nhiều hạn chế. Từ Java 8, ta có thêm class CompletableFuture để khác phúc các vấn đề đó.

(CompletableFuture là tính năng mới của Java 8 nên nó sẽ sử dụng cú pháp, functional interface của Java 8. Nếu chưa nắm rõ thì các bạn có thể đọc lại: Functional Interface là gì? )

CompletableFuture là gì

CompletableFuture là kết quả trả về của phép tính / method không đồng bộ, cho phép kiểm tra trạng thái của phép tính (đã thực hiện xong chưa, kết quả trả về là gì…), bắt sự kiện khi method hoàn thành…

Code ví dụ Java CompletableFuture

Ví dụ 1: chạy bất đồng bộ với CompletableFuture.supplyAsync

Mình muốn thực hiện nhiều phép tính tổng 2 số nguyên cùng lúc  thì mình sẽ tạo class Calculator.java và cung cấp method add()

package stackjava.com.tutorial;

public class Calculator {

  public static int add(int a, int b) {
    int sum = a + b;
    System.out.println("result: " + a + " + " + b + " = " + sum);
    return sum;
  }

}

Tiếp theo mình sẽ thực hiện gọi đồng thời 3 method add() cùng một lúc

package stackjava.com.tutorial;

import java.util.concurrent.*;

public class Demo1 {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor);
    CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor);
    
    System.out.println("Done");
    executor.shutdown();
  }
}

Kết quả:

Done
result: 1 + 2 = 3
result: 2 + 3 = 5
result: 1 + 3 = 4

Lệnh in ra "Done" đươc viết cuối cùng nhưng lại hiện ra đầu tiên, các phép tính add() cũng không theo thứ tự vì chúng được chạy cùng lúc với các thread khác nhau chứ không chạy lần lượt.

Mình giải thích lại một số dòng code bên trên:

Ví dụ 2: Lấy kết quả trả về từ CompletableFuture

Để lấy kết quả trả về từ CompletableFuture ta dùng method get(), tuy nhiên khi dùng method này nó sẽ block lại thread main cho tới khi CompletableFuture trả về kết quả thì thôi.

Ví dụ:

package stackjava.com.tutorial;

import java.util.concurrent.*;

public class Demo2 {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor);
    CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor);
    future1.get();

    System.out.println("Done");
    executor.shutdown();
  }
}

Kết quả:

result: 1 + 2 = 3
result: 1 + 3 = 4
Done
result: 2 + 3 = 5

Khi gọi: future1.get(); thì nó sẽ chờ Calculator.add(1,2) trả về kết quả xong thì mới chạy tiếp các lệnh sau. Do đó dòng result: 1 + 2 = 3 sẽ luôn hiện ra trước dòng Done

(*Lưu ý, chính vì method get() block lại thread main nên nếu nó deplay quá lâu sẽ làm chậm chương trình dó đó ta thường dùng method với thời gian timeout nhất định, ví dụ:get(1, TimeUnit.SECONDS))

Ví dụ 3: Xử lý kết quả trả về với thenRun, thenAccepthandle

Với CompletableFuture ta có thể bắt được sự kiện khi việc tính toán hoàn thành (điều mà Future không làm được).

Ví dụ:

package stackjava.com.tutorial;

import java.util.concurrent.*;

public class Demo3 {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor);
    CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor);
    
    System.out.println("Done");
    executor.shutdown();
    
    future1.thenRun(()->{
      System.out.println("future1 completed!");
    });
    future2.thenAccept(result ->{
      System.out.println("future2 completed, result = " + result);
    });
    future3.handle((data, error) ->{
      if (error != null){
        System.out.println("future3 error, error: " + error);
        return null;
      } else {
        System.out.println("future3 completed, result = " + data);
        return data;
      }
      
    });
    
    
  }
}

Kết quả:

Done
result: 2 + 3 = 5
result: 1 + 2 = 3
result: 1 + 3 = 4
future1 completed!
future2 completed, result = 4
future3 completed, result = 5

Ngoài ra, thay vì sử dụng thenRun, thenAccepthandle. Ta có thể dùng thenRunAsync, thenAcceptAsynchandleAsync để xử lý kết quả trả về một cách bất đồng bộ (tức là khi có kết quả trả về ta sẽ tạo một thread mới để xử lý kết quả đó)

Ví dụ 4: Kết hợp nhiều CompletableFuture

Với CompletableFuture ta có thể kết hợp xử lý nhiều CompletableFuture như thực hiện lần lượt từng CompletableFuture hoặc chờ cho tới khi tất cả các CompletableFuture đều hoàn thành.

Trường hợp xử lý lần lượt từng CompletableFuture. Các method thenRun, thenAccept đều trả về CompletableFuture dó đó ta có thể thực hiện thenRun, thenAccept nhiều lần (mỗi lần tương ứng với 1 CompletableFuture), trường hợp muốn dùng lại kết quả của CompletableFuture thì ta dùng method thenApply

Ví dụ:

package stackjava.com.tutorial;

import java.util.concurrent.*;

public class Demo4 {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor);
    
    future.thenApplyAsync(data->{
      System.out.println("CompletableFuture 1 done");
      return Calculator.add(1,3); // CompletableFuture 2
    }).thenApplyAsync(data->{
      System.out.println("CompletableFuture 2 done");
      return Calculator.add(2,3); // CompletableFuture 3
    }).thenAcceptAsync(data ->{
      System.out.println("CompletableFuture 3 done");
    }).thenRun(()->{
      System.out.println("Finished!");
    });
    
    executor.shutdown();
  }
}

Kết quả:

result: 1 + 2 = 3
CompletableFuture 1 done
result: 1 + 3 = 4
CompletableFuture 2 done
result: 2 + 3 = 5
CompletableFuture 3 done
Finished!

Trường hợp muốn bắt sự kiện tất cả các CompletableFuture hoàn thành thì ta dùng method allOf()

Ví dụ:

package stackjava.com.tutorial;

import java.util.concurrent.*;

public class Demo5 {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2), executor);
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3), executor);
    CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3), executor);

    CompletableFuture<Void> futureAll = CompletableFuture.allOf(future1, future2, future3);
    futureAll.thenRunAsync(()->{
      System.out.println("All future is Done!");
    });

    System.out.println("Done");
    executor.shutdown();
  }
}

Kết quả:

result: 1 + 2 = 3
result: 2 + 3 = 5
result: 1 + 3 = 4
Done
All future is Done!

ForkJoinPool.commonPool

Với các method thenRunAsync, thenAcceptAsync, runAsync hay supplyAsync nếu ta không truyền tham số Executor thì nó sẽ mặc định sử dụng pool mặc định là ForkJoinPool.commonPool() với size là số CPU của máy tính).

Ví dụ:

package stackjava.com.tutorial;

import java.util.concurrent.*;

public class Demo6 {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> Calculator.add(1,2));
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> Calculator.add(1,3));
    CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> Calculator.add(2,3));
    
    System.out.println("Done");
    while(ForkJoinPool.commonPool().getActiveThreadCount() > 0) {
    }
  }
}

 

Okay, Done!

Download code ví dụ trên tại đây.

 

References:

https://docs.oracle.com/javase/8/…/CompletableFuture.html