「Java 进阶」--Lambda & 函数式编程

前些年 Scala 大肆流行,打出来 Java 颠覆者的旗号,究其底气来源,无非是函数式和面向对象的“完美结合”,各式各样的“语法糖”,但其过高的学习门槛,又给了新来者当头一棒。

随着 Java8 的发布,Lambda 特性的引入,之前的焦灼局面是否有所转变,让我们一起揭开 Java 函数式编程的面纱:

  1. 面向对象 VS 函数式
  2. FunctionalInterface 和 Lambda
  3. 类库的升级改造(默认方法、静态方法、Stream、Optional)
  4. Lambda 下模式的进化
  5. Lambda 下并发程序

1. 面向对象 VS 函数式编程


在现实世界中,数据和行为并存,程序也应如此,可喜可贺的是在 Java 世界中,两者也开启了融合之旅。

首先思考一个问题, 在 Java 编程中,我们如何进行行为传递,例如我们需要打印线程名称和当前时间,并将该任务提交到线程池中运行,会有哪些方法?

方法 1:新建 class Task 实现 Runnable 接口

<code>    public class Task implements Runnable{        @Override        public void run() {            System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms");        }    }    executorService.submit(new Task());/<code> 

方法 2:匿名内部类实现 Runnable 接口

<code>    executorService.submit(new Runnable() {            @Override            public void run() {              System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms");            }        });/<code>

方法 3:使用 Lambda 表达式

<code>    executorService.submit(()-> System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms"));/<code>

方法 4:使用方法引用

<code>     private void print(){        System.out.println(Thread.currentThread().getName() + "-->" + System.currentTimeMillis() + "ms");    }    {        executorService.submit(this::print);    }/<code>


2. FunctionalInterface 和 Lambda

Java 函数式编程,只有两个核心概念:

FunctionalInterface(函数接口)是只有一个抽象方法的接口,用作 Lambda 表达式的类型。

Lambda 表达式,及要传递的行为代码,更像是一个匿名函数(当然 java 中并没有这个概念),将行为像数据那样进行传递。

换个好理解但是不正规的说法,FunctionalInterface 为类型,Lambda 表达式为值;我们可以将一个 Lambda 表达式赋予一个符合 FunctionalInterface 要求的接口变量(局部变量、方法参数)。

2.1. Lambda 表达式

先看几个 Lambda 表达式的例子:

<code>        // 不包含参数,用()表示没有参数        // 表达式主体只有一个语句,可以省略{}        Runnable helloWord = () -> System.out.println("Hello World");        // 表达式主体由多个语句组成,不能省略{}        Runnable helloWords = () -> {            System.out.println("Hello");            System.out.println("Word");            System.out.println("Word");        };        // 表达式中只有一个参数,可以省略()        Consumer<string> infoConsumer = msg -> System.out.println("Hello " + msg);        // 表达式由多个参数组成,不可省略()        BinaryOperator<integer> add1 = (Integer i ,Integer j) -> i + j;        // 编译器会进行类型推断,在没有歧义情况下可以省略类型声明,但是不可省略()        BinaryOperator<integer> add2 = (i, j) -> i + j;/<integer>/<integer>/<string>/<code>

综上可见,一个 Lambda 表达式主要由三部分组成:

  1. 参数列表
  2. 箭头分隔符(->)
  3. 主体,单个表达式或语句块

我们在使用匿名内部类时有一些限制:引用方法中的变量时,需要将变量声明为 final,不能为其进行重新赋值,如下:

<code>        final String msg = "World";        Runnable print = new Runnable() {            @Override            public void run() {                System.out.println("Hello"  + msg);            }        };/<code>

在 Java8 中放松了这个限制,可以引用非 final 变量,但是该变量在既成事实上必须是 final 的,虽然无需将变量声明为 final,在 Lambda 表达式中,也无法用作非最终态变量,及只能给该变量赋值一次(与用 final 声明变量效果相同)。

2.2 FunctionalInterface

FunctionalInterface,只有一个抽象方法的接口就是函数式接口,接口中单一方法命名并不重要,只要方法签名与 Lambda 表达式的类型匹配即可。

Java 内置了常用函数接口如下:

<code>1. Predicate参数类型:T返回值:boolean示例:Predicate<string> isAdmin = name -> "admin".equals(name);/<string>/<code>
<code>2. Consumer参数:T返回值:void示例:Consumer<string> print = msg -> System.out.println(msg);/<string>/<code>
<code>3. Function参数:T返回值:R示例:Function<long> toStr = value -> String.valueOf(value);/<long>/<code>
<code>4. Supplier参数:none返回值:T示例:Supplier<date> now = () -> new Date();/<date>/<code>
<code>5. UnaryOperator参数:T返回值:T示例:UnaryOperator<boolean> negation = value -> !value.booleanValue();/<boolean>/<code>
<code>6. BinaryOperator参数:(T, T)返回值:T示例:BinaryOperator<integer> intDouble = (i, j) -> i + j;/<integer>/<code>
<code>7. Runnable参数:none返回值:void示例:Runnable helloWord = () -> System.out.println("Hello World");/<code>
<code>8. Callable参数:nont返回值:T示例:Callable<date> now1 = () -> new Date();/<date>/<code> 

当然我们也可以根据需求自定义函数接口,为了保证接口的有效性,可以在上面添加 @FunctionalInterface 注解,该注解会强制 javac 检测一个接口是否符合函数式接口的规范,例如:

<code>    @FunctionalInterface    interface CustomFunctionalInterface{        void print(String msg);    }    CustomFunctionalInterface cfi= msg -> System.out.println(msg);/<code>

2.3 方法引用

Lambda 表达式一种常用方法便是直接调用其他方法,针对这种情况,Java8 提供了一个简写语法,及方法引用,用于重用已有方法。

凡是可以使用 Lambda 表达式的地方,都可以使用方法引用。

方法应用的标准语法为 ClassName::methodName,虽然这是一个方法,但不需要再后面加括号,因为这里并不直接调用该方法。

<code>    Function<user> f1 = user->user.getName();    Function<user> f2 = User::getName;    Supplier<user> s1 = ()->new User();    Supplier<user> s2 = User::new;    Function<integer> sa1 = count -> new User[count];    Function<integer> sa2 = User[]::new;/<integer>/<integer>/<user>/<user>/<user>/<user>/<code>


  • 静态方法引用:className::methodName
  • 实例方法引用:instanceName::methodName
  • 超类实体方法引用:supper::mehtodName
  • 构造函数方法引用:className::new
  • 数组构造方法引用:ClassName[]::new

2.4 类型推断

类型推断,是 Java7 就引入的目标类型推断的扩展,在 Java8 中对其进行了改善,程序员可以省略 Lambda 表达式中的所有参数类型,Javac 会根据 Lambda 表达式式上下文信息自动推断出参数的正确类型。

大多数情况下 javac 能够准确的完成类型推断,但由于 Lambda 表达式与函数名无关,只与方法签名相关,因此会出现类型对推断失效的情况,这时可以使用手工类型转换帮助 javac 进行正确的判断。

<code>    // Supplier<string>, Callable<string> 具有相同的方法签名    private void print(Supplier<string> stringSupplier){        System.out.println("Hello " + stringSupplier.get());    }    private void print(Callable<string> stringCallable){        try {            System.out.println("Hello " + stringCallable.call());        } catch (Exception e) {            e.printStackTrace();        }    }    {        // Error, 因为两个print同时满足需求        print(()->"World");// 使用类型转换,为编译器提供更多信息        print((Supplier<string>) ()->"World");        print((Callable<string>) ()-> "world");    }/<string>/<string>/<string>/<string>/<string>/<string>/<code>

3. 类库的升级改造

Java8 另一个变化是引入了 默认方法 和接口的 静态方法 ,自此以后 Java 接口中方法也可以包含代码体了。

3.1 默认方法


默认方法的另一个优势是该方法是可选的,子类可以根据不同的需求 Override 默认实现,为其提供扩展性保证。

其中 Collection 中的 forEach,stream 功能都是通过该技术统一添加到接口中的。

<code>    // Collection 中的forEache实现    default void forEach(Consumer super T> action) {        Objects.requireNonNull(action);        for (T t : this) {            action.accept(t);        }    }    // Collection中的stream实现    default Stream stream() {        return StreamSupport.stream(spliterator(), false);    }/<code>

从上可见,默认方法的写法也是比较简单的,只需在方法声明中添加 defalut 关键字,然后提供方法的默认实现即可。


3.1.1 默认方法与子类


a. 没有重写


<code>    interface Parent{        default void welcome(){            System.out.println("Parent");        }    }    // 调用Parent中的welcome, 输入"Parent"    class ParentNotImpl implements Parent{    }/<code>

b. 子接口重写


<code>     interface Parent{        default void welcome(){            System.out.println("Parent");        }    }    interface ChildInterface extends Parent{        @Override        default void welcome(){            System.out.println("ChildInterface");        }    }    // 执行ChildInterface中的welcome, 输入 "ChildInterface"    class ChildImpl implements ChildInterface{    }/<code>

c. 类重写


<code>     interface Parent{        default void welcome(){            System.out.println("Parent");        }    }    interface ChildInterface extends Parent{        @Override        default void welcome(){            System.out.println("ChildInterface");        }    }    //执行子类中的welcome方法,输出"ChildImpl"    class ChildImpl1 implements ChildInterface{        @Override        public void welcome(){            System.out.println("ChildImpl");        }    }/<code>
3.1.2 多重继承

接口允许多重继承,因此有可能会碰到两个接口包含签名相同的默认方法的情况,此时 javac 并不明确应该继承哪个接口中的方法,因此会导致编译出错,这时需要在类中实现该方法,如果想调用特定父接口中的默认方法,可以使用 ParentInterface.super.method() 的方式来指明具体的接口。

<code>    interface Parent1 {        default void print(){            System.out.println("parent1");        }    }    interface Parent2{        default void print(){            System.out.println("parent2");        }    }    class Child implements Parent1, Parent2{        @Override        public void print() {            System.out.println("self");            Parent1.super.print();            Parent2.super.print();        }    }/<code>

现在的接口提供了某种形式上的多继承功能,然而多重继承存在很多诟病。很多人认为多重继承的问题在于对象状态的继承,而不是代码块的继承,默认方法避免了状态的继承,也因此避免了 C++ 中多重继承最大的缺点。


从某种角度出发,Java 通过接口默认方法实现了代码多重继承,通过类实现了状态单一继承。

3.1.3 三定律


  1. 类胜于方法。 如果在继承链中有方法体或抽象的方法声明,那么就可以忽略接口中定义的方法。
  2. 子类胜于父类。 如果一个接口继承另一个接口,且两个接口都定义了一个默认方法,那么子接口中定义的方法胜出。
  3. 没有规则三。 如果上面两条规则不适用,子类要么实现该方法,要么将该方法声明为抽象方法。

3.2 接口静态方法

人们在编程过程中积累了这样一条经验,创建一个包含很多静态方法的一个类。很多时候类是一个放置工具方法的好地方,比如 Java7 引入的 Objects 类,就包含很多工具方法,这些方法不是属于具体的某个类。


在接口中定义静态方法,只需使用 static 关键字进行描述即可,例如 Stream 接口中的 of 方法。

<code>    /**     * Returns a sequential {@code Stream} containing a single element.     *     * @param t the single element     * @param  the type of stream elements     * @return a singleton sequential stream     */    public static Stream of(T t) {        return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);    }/<code>

3.3 Stream

Stream 是 Java8 中最耀眼的亮点,它使得程序员得以站在更高的抽象层次对集合进行操作。

Stream 是用函数式编程方式在集合类上进行复杂操作的工具。

3.3.1. 从外部迭代到内部迭代

Java 程序员使用集合时,一个通用模式就是在集合上进行迭代,然后处理返回的每一个元素,尽管这种操作可行但存在几个问题:

  • 大量的样板代码
  • 模糊了程序本意
  • 串行化执行


<code>     // 常见写法1,不推荐使用    public void printAll1(List<string> msg){        for (int i=0; i< msg.size(); i++){            String m = msg.get(i);            System.out.println(m);        }    }    // Java5之前,正确写法,过于繁琐    public void printAll2(List<string> msg){        Iterator<string> iterator = msg.iterator();        while (iterator.hasNext()){            String m = iterator.next();            System.out.println(m);        }    }    // Java5之后,加强for循环,采用语法糖,简化for循环,内部转化为Iterator方式    public void printAll3(List<string> msg){        for (String m : msg){            System.out.println(m);        }    }/<string>/<string>/<string>/<string>/<code>

整个迭代过程,通过显示的调用 Iterator 对象的 hasNext 和 next 方法完成整个迭代,这成为外部迭代。

「Java 进阶」--Lambda & 函数式编程

另一种方式成为内部迭代,及将操作行为作为参数传递给 Stream,在 Stream 内部完成迭代操作。

<code>     // Java8中,使用Stream进行内部迭代操作    public void printAll4(List<string> msg){        msg.stream().forEach(System.out::println);    }/<string>/<code>


「Java 进阶」--Lambda & 函数式编程

3.3.2. 惰性求值 VS 及早求值

Stream 中存在两类方法,不产生值的方法称为惰性方法;从 Stream 中产生值的方法叫做及早求值方法。

判断一个方法的类别很简单:如果返回值是 Stream,那么就是惰性方法;如果返回值是另一个值或为空,那么就是及早求值方法。

惰性方法返回的 Stream 对象不是一个新的集合,而是创建新集合的配方,Stream 本身不会做任何迭代操作,只有调用及早求值方法时,才会开始真正的迭代。

整个过程与 Builder 模式有共通之处,惰性方法负责对 Stream 进行装配(设置 builder 的属性),调用及早求值方法时(调用 builder 的 build 方法),按照之前的装配信息进行迭代操作。

常见 Stream 操作: collect(toList())


collect(toList()) 方法由 Stream 里面的值生成一个列表,是一个及早求值操作。

collect 的功能不仅限于此,它是一个非常强大的结构。

<code>     @Data    class User{        private String name;    }    public List<string> getNames(List<user> users){        List<string> names = new ArrayList<>();        for (User user : users){            names.add(user.getName());        }        return names;    }    public List<string> getNamesUseStream(List<user> users){      // 方法引用      //return users.stream().map(User::getName).collect(toList());        // lambda表达式        return users.stream().map(user -> user.getName()).collect(toList());    }/<user>/<string>/<string>/<user>/<string>/<code> count、max、min


Stream 上最常用的操作之一就是求总数、最大值和最小值,count、max 和 min 足以解决问题。

<code>    public Long getCount(List<user> users){        return users.stream().filter(user -> user != null).count();    }    // 求最小年龄    public Integer getMinAge(List<user> users){        return users.stream().map(user -> user.getAge()).min(Integer::compareTo).get();    }    // 求最大年龄    public Integer getMaxAge(List<user> users){        return users.stream().map(user -> user.getAge()).max(Integer::compareTo).get();    }/<user>/<user>/<user>/<code>

min 和 max 入参是一个 Comparator 对象,用于元素之间的比较,返回值是一个 Optional,它代表一个可能不存在的值,如果 Stream 为空,那么该值不存在,如果不为空,该值存在。通过 get 方法可以获取 Optional 中的值。 findAny、findFirst



<code>    public Optional<user> getAnyActiveUser(List<user> users){        return users.stream()                .filter(user -> user.isActive())                .findAny();    }    public Optional<user> getFirstActiveUser(List<user> users){        return users.stream()                .filter(user -> user.isActive())                .findFirst();    }/<user>/<user>/<user>/<user>/<code> allMatch、anyMatch、noneMatch


均以 Predicate 作为输入参数,对集合中的元素进行判断,并返回最终的结果。

<code>    // 所有用户是否都已激活    boolean allMatch = users.stream().allMatch(user -> user.isActive());    // 是否有激活用户    boolean anyMatch = users.stream().anyMatch(user -> user.isActive());    // 是否所有用户都没有激活    boolean noneMatch = users.stream().noneMatch(user -> user.isActive());/<code> forEach


以 Consumer 为参数,对 Stream 中复合条件的对象进行操作。

<code>    public void printActiveName(List<user> users){        users.stream()                .filter(user -> user.isActive())                .map(user -> user.getName())                .forEach(name -> System.out.println(name));    }/<user>/<code> reduce


reduce 操作可以实现从一组值中生成一个值,之前提到的 count、min、max 方法因为比较通用,单独提取成方法,事实上,这些方法都是通过 reduce 完成的。

下图展示的是对 stream 进行求和的过程,以 0 为起点,每一步都将 stream 中的元素累加到 accumulator 中,遍历至最后一个元素,accumulator 就是所有元素值的和。

「Java 进阶」--Lambda & 函数式编程 filter


以 Predicate 作为参数(相当于 if 语句),对 Stream 中的元素进行过滤,只有复合条件的元素才能进入下面的处理流程。


「Java 进阶」--Lambda & 函数式编程

<code>    public List<user> getActiveUser(List<user> users){        return users.stream()                .filter(user -> user.isActive())                .collect(toList());    }/<user>/<user>/<code> map

及早求值方法: 以 Function 作为参数,将 Stream 中的元素从一种类型转换成另外一种类型。


「Java 进阶」--Lambda & 函数式编程

<code>    public List<string> getNames(List<user> users){        return users.stream()                .map(user -> user.getName())                .collect(toList());    }/<user>/<string>/<code> peek

Stream 提供的是内迭代,有时候为了功能调试,需要查看每个值,同时能够继续操作流,这时就会用到 peek 方法。

<code>    public void printActiveName(List<user> users){        users.stream()                .filter(user -> user.isActive())                .peek(user -> System.out.println(user.isActive()))                .map(user -> user.getName())                .forEach(name -> System.out.println(name));    }/<user>/<code> 其他

针对集合 Stream 还提供了许多功能强大的操作,暂不一一列举,简单汇总一下。

  • distinct:进行去重操作
  • sorted:进行排序操作
  • limit:限定结果输出数量
  • skip:跳过 n 个结果,从 n+1 开始输出

3.4 Optional

Java 程序中出现最多的异常就是 NullPointerException,没有之一。Optional 的出现力求改变这一状态。

Optional 对象相当于值的容器,而该值可以通过 get 方法获取,同时 Optional 提供了很多函数用于对值进行操作,从而最大限度的避免 NullPointerException 的出现。

Optional 与 Stream 的用法基本类型,所提供的方法同样分为惰性和及早求值两类,惰性方法主要用于流程组装,及早求值用于最终计算。

3.4.1 of

使用工厂方法 of,可以从一个值中创建一个 Optional 对象,如果值为 null,会报 NullPointerException。

<code>    Optional<string> dataOptional = Optional.of("a");    String data = dataOptional.get(); // data is "a"    Optional<string> dataOptional = Optional.of(null);    String data = dataOptional.get(); // throw NullPointerException/<string>/<string>/<code>
3.4.2 empty

工厂方法 empty,可以创建一个不包含任何值的 Optional 对象。

<code>    Optional<string> dataOptional = Optional.empty();    String data = dataOptional.get(); //throw NoSuchElementException/<string>/<code>
3.4.3 ofNullable

工厂方法 ofNullable,可将一个空值转化成 Optional。

<code>     public static  Optional ofNullable(T value) {        return value == null ? empty() : of(value);    }/<code>
3.4.4 get、orElse、orElseGet、orElseThrow

直接求值方法,用于获取 Optional 中值,避免空指针异常的出现。

<code>    Optional<string> dataOptional = Optional.of("a");    dataOptional.get(); // 获取Optional中的值, 不存在会抛出NoSuchElementException    dataOptional.orElse("b"); //获取Optional中的值,不存在,直接返回"B"    dataOptional.orElseGet(()-> String.valueOf(System.currentTimeMillis())); //获取Optional中的值,不存在,对Supplier进行计算,并返回计算结果    dataOptional.orElseThrow(()-> new XXXException()); //获取Optional中的值,不存在,抛出自定义异常/<string>/<code>
3.4.5 isPresent、ifPresent

直接求值方法,isPresent 用于判断 Optional 中是否有值,ifPresent 接收 Consumer 对象,当 Optional 有值的情况下执行。

<code>    Optional<string> dataOptional = Optional.of("a");    String value = null;    if (dataOptional.isPresent()){        value = dataOptional.get();    }else {        value = "";    }    //等价于    String value2 = dataOptional.orElse("");    // 当Optional中有值的时候执行    dataOptional.ifPresent(v->System.out.println(v));/<string>/<code>
3.4.6 map

惰性求值方法。map 与 Stream 中的用法基本相同,用于对 Optional 中的值进行映射处理,从而避免了大量 if 语句嵌套,多个 map 组合成链,只需对最终的结果进行操作,中间过程中如果存在 null 值,之后的 map 不会执行。

<code>    @Data    static class Order{        private Name owner;    }    @Data    static class User{        private Name name;    }    @Data    static class Name{        String firstName;        String midName;        String lastName;    }    private String getFirstName(Order order){        if (order == null){            return "";        }        if (order.getOwner() == null){            return "";        }        if (order.getOwner().getFirstName() == null){            return "";        }        return order.getOwner().getFirstName();    }    private String getFirstName(Optional<order> orderOptional){        return orderOptional.map(order -> order.getOwner())                .map(user->user.getFirstName())                .orElse("");    }/<order>/<code>
3.4.7 filter

惰性求值,对 Optional 中的值进行过滤,如果 Optional 为 empty,直接返回 empty;如果 Optional 中存在值,则对值进行验证,验证通过返回原 Optional,验证不通过返回 empty。

<code>     public Optional filter(Predicate super T> predicate) {        Objects.requireNonNull(predicate);        if (!isPresent())            return this;        else            return predicate.test(value) ? this : empty();    }/<code>

4. Lambda 下模式的进化


Lambda 表达式大大简化了 Java 中行为传递的问题,对于很多行为式设计模式而言,减少了不少构建成本。

4.1 命令模式


大多数命令模式中的命令对象,其实是一种行为的封装,甚至是对其他对象内部行为的一种适配,这种情况下,Lambda 表达式并有了用武之地。

<code>    interface Command{        void act();    }    interface Editor{        void open();        void write(String data);        void save();    }    class CommandRunner{        private List<command> commands = new ArrayList<>();        public void run(Command command){            command.act();            this.commands.add(command);        }        public void redo(){            this.commands.forEach(Command::act);        }    }    class OpenCommand implements Command{        private final Editor editor;        OpenCommand(Editor editor) {            this.editor = editor;        }        @Override        public void act() {            this.editor.open();        }    }    class WriteCommand implements Command{        private final Editor editor;        private final String data;        WriteCommand(Editor editor, String data) {            this.editor = editor;            this.data = data;        }        @Override        public void act() {            editor.write(this.data);        }    }    class SaveCommand implements Command{        private final Editor editor;        SaveCommand(Editor editor) {            this.editor = editor;        }        @Override        public void act() {            this.editor.save();        }    }    public void useCommand(){        CommandRunner commandRunner = new CommandRunner();        Editor editor = new EditorImpl();        String data1 = "data1";        String data2 = "data2";        commandRunner.run(new OpenCommand(editor));        commandRunner.run(new WriteCommand(editor, data1));        commandRunner.run(new WriteCommand(editor, data2));        commandRunner.run(new SaveCommand(editor));    }    public void useLambda(){        CommandRunner commandRunner = new CommandRunner();        Editor editor = new EditorImpl();        String data1 = "data1";        String data2 = "data2";        commandRunner.run(()->editor.open());        commandRunner.run(()->editor.write(data1));        commandRunner.run(()->editor.write(data2));        commandRunner.run(()->editor.save());    }    class EditorImpl implements Editor{        @Override        public void open() {        }        @Override        public void write(String data) {        }        @Override        public void save() {        }    }/<command>/<code>

从代码中可见,Lambda 表达式的应用,减少了创建子类的负担,增加了代码的灵活性。

4.2 策略模式



「Java 进阶」--Lambda & 函数式编程

<code>    interface CompressionStrategy{        OutputStream compress(OutputStream outputStream) throws IOException;    }    class GzipBasedCompressionStrategy implements CompressionStrategy{        @Override        public OutputStream compress(OutputStream outputStream) throws IOException {            return new GZIPOutputStream(outputStream);        }    }    class ZipBasedCompressionStrategy implements CompressionStrategy{        @Override        public OutputStream compress(OutputStream outputStream) throws IOException {            return new ZipOutputStream(outputStream);        }    }    class Compressor{        private final CompressionStrategy compressionStrategy;        Compressor(CompressionStrategy compressionStrategy) {            this.compressionStrategy = compressionStrategy;        }        public void compress(Path inFile, File outFile) throws IOException {            try (OutputStream outputStream = new FileOutputStream(outFile)){                Files.copy(inFile, this.compressionStrategy.compress(outputStream));            }        }    }    {        Compressor gzipCompressor = new Compressor(new GzipBasedCompressionStrategy());        gzipCompressor.compress(in,out);        Compressor ziCompressor = new Compressor(new ZipBasedCompressionStrategy());        ziCompressor.compress(in,out);    }    {        Compressor gzipCompressor = new Compressor(GZIPOutputStream::new);        gzipCompressor.compress(in,out);        Compressor ziCompressor = new Compressor(ZipOutputStream::new);        ziCompressor.compress(in,out);    }/<code>

4.3 观察者模式



<code>    interface NameObserver{        void onNameChange(String oName, String nName);    }    @Data    class User {        private final List<nameobserver> nameObservers = new ArrayList<>();        @Setter(AccessLevel.PRIVATE)        private String name;        public void updateName(String nName){            String oName = getName();            setName(nName);            nameObservers.forEach(nameObserver -> nameObserver.onNameChange(oName, nName));        }        public void addObserver(NameObserver nameObserver){            this.nameObservers.add(nameObserver);        }    }    class LoggerNameObserver implements NameObserver{        @Override        public void onNameChange(String oName, String nName) {            System.out.println(String.format("old Name is %s, new Name is %s", oName, nName));        }    }    class NameChangeNoticeObserver implements NameObserver{        @Override        public void onNameChange(String oName, String nName) {            notic.send(String.format("old Name is %s, new Name is %s", oName, nName));        }    }    {        User user = new User();        user.addObserver(new LoggerNameObserver());        user.addObserver(new NameChangeNoticeObserver());        user.updateName("张三");    }    {        User user = new User();        user.addObserver((oName, nName) ->                System.out.println(String.format("old Name is %s, new Name is %s", oName, nName)));        user.addObserver((oName, nName) ->                notic.send(String.format("old Name is %s, new Name is %s", oName, nName)));        user.updateName("张三");    }/<nameobserver>/<code>

4.4 模板方法模式


模板方法,实际是行为的一种整合,内部大量用到行为的传递。 先看一个标准的模板方法:

<code>    interface UserChecker{        void check(User user);    }    abstract class AbstractUserChecker implements UserChecker{        @Override        public final void check(User user){            checkName(user);            checkAge(user);        }        abstract void checkName(User user);        abstract void checkAge(User user);    }    class SimpleUserChecker extends AbstractUserChecker {        @Override        void checkName(User user) {            Preconditions.checkArgument(StringUtils.isNotEmpty(user.getName()));        }        @Override        void checkAge(User user) {            Preconditions.checkArgument(user.getAge() != null);            Preconditions.checkArgument(user.getAge().intValue() > 0);            Preconditions.checkArgument(user.getAge().intValue() < 150);        }    }    {        UserChecker userChecker = new SimpleUserChecker();        userChecker.check(new User());    }    class LambdaBaseUserChecker implements UserChecker{        private final List<consumer>> userCheckers = Lists.newArrayList();        public LambdaBaseUserChecker(List<consumer>>userCheckers){            this.userCheckers.addAll(userCheckers);        }        @Override        public void check(User user){            this.userCheckers.forEach(userConsumer -> userConsumer.accept(user));        }    }    {        UserChecker userChecker = new LambdaBaseUserChecker(Arrays.asList(                user -> Preconditions.checkArgument(StringUtils.isNotEmpty(user.getName())),                user -> Preconditions.checkArgument(user.getAge() != null),                user -> Preconditions.checkArgument(user.getAge().intValue() > 0),                user -> Preconditions.checkArgument(user.getAge().intValue() < 150)        ));        userChecker.check(new User());    }    @Data    class User{        private String name;        private Integer age;    }/<consumer>/<consumer>/<code>

在看一个 Spring JdbcTemplate,如果使用 Lambda 进行简化:

<code>    public JdbcTemplate jdbcTemplate;    public User getUserById(Integer id){        return jdbcTemplate.query("select id, name, age from tb_user where id = ?", new PreparedStatementSetter() {            @Override            public void setValues(PreparedStatement preparedStatement) throws SQLException {                preparedStatement.setInt(1, id);            }        }, new ResultSetExtractor<user>() {            @Override            public User extractData(ResultSet resultSet) throws SQLException, DataAccessException {                User user = new User();                user.setId(resultSet.getInt("id"));                user.setName(resultSet.getString("name"));                user.setAge(resultSet.getInt("age"));                return user;            }        });    }    public User getUserByIdLambda(Integer id){        return jdbcTemplate.query("select id, name, age from tb_user where id = ?",                preparedStatement -> preparedStatement.setInt(1, id),                resultSet -> {                    User user = new User();                    user.setId(resultSet.getInt("id"));                    user.setName(resultSet.getString("name"));                    user.setAge(resultSet.getInt("age"));                    return user;                });    }    @Data    class User {        private Integer id;        private String name;        private Integer age;    }/<user>/<code>

5. Lambda 下并发程序


  • 并发是两个任务共享时间段,并行是两个任务同一时间发生。
  • 并行化是指为了缩短任务执行的时间,将任务分解为几个部分,然后并行执行,这和顺序执行的工作量是一样的,区别是多个 CPU 一起来干活,花费的时间自然减少了。
  • 数据并行化。数据并行化是指将数据分为块,为每块数据分配独立的处理单元。
「Java 进阶」--Lambda & 函数式编程

5.1 并行化流操作

并行化流操作是 Stream 提供的一个特性,只需改变一个方法调用,就可以让其拥有并行操作的能力。

如果已经存在一个 Stream 对象,调用他的 parallel 方法就能让其并行执行。

如果已经存在一个集合,调用 parallelStream 方法就能获取一个拥有并行执行能力的 Stream。

并行流主要解决如何高效使用多核 CPU 的事情。

<code>    @Data    class Account{        private String name;        private boolean active;        private Integer amount;    }    public int getActiveAmount(List<account> accounts){        return accounts.parallelStream()                .filter(account -> account.isActive())                .mapToInt(account -> account.getAmount())                .sum();    }    public int getActiveAmount2(List<account> accounts){        return accounts.stream()                .parallel()                .filter(account -> account.isActive())                .mapToInt(Account::getAmount)                .sum();    }/<account>/<account>/<code>

并行流底层使用 fork/join 框架,fork 递归式的分解问题,然后每个段并行执行,最终有 join 合并结果,返回最后的值。

「Java 进阶」--Lambda & 函数式编程

5.2 阻塞 IO VS 非阻塞 IO


BIO 阻塞式 IO,是一种通用且容易理解的方式,与程序交互时通常都符合这种顺序执行的方式,但其主要的缺陷在于每个 socket 会绑定一个 Thread 进行操作,当长链过多时会消耗大量的 Server 资源,从而导致其扩展性性下降。

NIO 非阻塞 IO,一般指的是 IO 多路复用,可以使用一个线程同时对多个 socket 的读写进行监控,从而使用少量线程服务于大量 Socket。

由于客户端开发的简便性,大多数的驱动都是基于 BIO 实现,包括 MySQL、Redis、Mongo 等;在服务器端,由于其高性能的要求,基本上是 NIO 的天下,以最大限度的提升系统的可扩展性。

由于客户端存在大量的 BIO 操作,我们的客户端线程会不停的被 BIO 阻塞,以等待操作返回值,因此线程的效率会大打折扣。

「Java 进阶」--Lambda & 函数式编程

如上图,线程在 IO 与 CPU 之间不停切换,走走停停,同时线程也没有办法释放,一直等到任务完成。

5.3 Future

构建并发操作的另一种方案便是 Future,Future 是一种凭证,调用方法不是直接返回值,而是返回一个 Future 对象,刚创建的 Future 为一个空对象,由后台线程执行耗时操作,并在结束时将结果写回到 Future 中。

当调用 Future 对象的 get 方法获取值时,会有两个可能,如果后台线程已经运行完成,则直接返回;如果后台线程没有运行完成,则阻塞调用线程,知道后台线程运行完成或超时。

使用 Future 方式,可以以并行的方式运行多个子任务。

当主线程需要调用比较耗时的操作时,可以将其放在辅助线程中执行,并在需要数据的时候从 future 中获取,如果辅助线程已经运行完成,则立即拿到返回的结果,如果辅助线程还没有运行完成,则主线程等待,并在完成时获取结果。

「Java 进阶」--Lambda & 函数式编程

一种常见的场景是在Controller中从多个Service中获取结果,并将其封装成一个View对象返回给前端用于显示,假设需要从三个接口中获取结果,每个接口的平均响应时间是20ms,那按照串行模式,总耗时为sum(i1, i2, i3) = 60ms;如果按照Future并发模式将加载任务交由辅助线程处理,总耗时为max(i1, i2, i3 ) = 20ms, 大大减少了系统的响应时间。

<code>     private ExecutorService executorService = Executors.newFixedThreadPool(20);    private User loadUserByUid(Long uid){       sleep(20);       return new User();    }    private Address loadAddressByUid(Long uid){        sleep(20);        return new Address();    }    private Account loadAccountByUid(Long uid){        sleep(20);        return new Account();    }    /**     * 总耗时 sum(LoadUser, LoadAddress, LoadAccount) = 60ms     * @param uid     * @return     */    public View getViewByUid1(Long uid){        User user = loadUserByUid(uid);        Address address = loadAddressByUid(uid);        Account account = loadAccountByUid(uid);        View view = new View();        view.setUser(user);        view.setAddress(address);        view.setAccount(account);        return view;    }    /**     * 总耗时 max(LoadUser, LoadAddress, LoadAccount) = 20ms     * @param uid     * @return     * @throws ExecutionException     * @throws InterruptedException     */    public View getViewByUid(Long uid) throws ExecutionException, InterruptedException {        Future<user> userFuture = executorService.submit(()->loadUserByUid(uid));        Future<address> addressFuture = executorService.submit(()->loadAddressByUid(uid));        Future<account> accountFuture = executorService.submit(()->loadAccountByUid(uid));        View view = new View();        view.setUser(userFuture.get());        view.setAddress(addressFuture.get());        view.setAccount(accountFuture.get());        return view;    }    private void sleep(long time){        try {            TimeUnit.MILLISECONDS.sleep(time);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    @Data    class View{        private User user;        private Address address;        private Account account;    }    class User{    }    class Address{    }    class Account{    }/<account>/<address>/<user>/<code>

Future 方式存在一个问题,及在调用 get 方法时会阻塞主线程,这是资源的极大浪费,我们真正需要的是一种不必调用 get 方法阻塞当前线程,就可以操作 future 对象返回的结果。


  • 将两个 Future 结果合并成一个,同时第二个又依赖于第一个的结果
  • 等待 Future 集合中所有记录的完成
  • 等待 Future 集合中的最快的任务完成
  • 定义任务完成后的操作

对此,我们引入了 CompletableFuture 对象。

5.4 CompletableFuture

  • CompletableFuture 结合了 Future 和回调两种策略,以更好的处理事件驱动任务。
  • CompletableFuture 与Stream 的设计思路一致,通过注册 Lambda 表达式,把高阶函数链接起来,从而定制更复杂的处理流程。

CompletableFuture 提供了一组函数用于定义流程,其中包括:

5.4.1 创建函数

CompletableFuture 提供了一组静态方法用于创建 CompletableFuture 实例:

<code>public static  CompletableFuture completedFuture(U value)// :使用已经创建好的值,创建 CompletableFuture 对象。public static CompletableFuture<void> runAsync(Runnable runnable)// 基于 Runnable 创建 CompletableFuture 对象,返回值为 Void,及没有返回值public static CompletableFuture<void> runAsync(Runnable runnable, Executor executor)// 基于 Runnable 和自定义线程池创建 CompletableFuture 对象,返回值为 Void,及没有返回值public static  CompletableFuture supplyAsync(Supplier supplier)// 基于 Supplier 创建 CompletableFuture 对象,返回值为 Upublic static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)   // 基于 Supplier 和自定义线程池创建 CompletableFuture 对象,返回值为 U 

以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

方法的参数类型都是函数式接口,所以可以使用 Lambda 表达式实现异步任务。

5.4.2 计算结果完成后

当 CompletableFuture 计算完成或者计算过程中抛出异常时进行回调。

<code>public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action)public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action)public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)public CompletableFuture     exceptionally(Function<throwable> fn)/<throwable>/<code>

Action 的类型是 BiConsumer super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

exceptionally 针对异常情况进行处理,当原始的 CompletableFuture 抛出异常的时候,就会触发这个 CompletableFuture 的计算。

下面一组方法虽然也返回 CompletableFuture 对象,但是对象的值和原来的 CompletableFuture 计算的值不同。当原先的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发这个 CompletableFuture 对象的计算,结果由 BiFunction 参数计算而得。因此这组方法兼有 whenComplete 和转换的两个功能。

<code>public  CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)public  CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn)public  CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn, Executor executor)/<code>
5.4.3 转化函数

转化函数类似于 Stream 中的惰性求助函数,主要对 CompletableFuture 的中间结果进行流程定制。

<code>public  CompletableFuture thenApply(Function super T,? extends U> fn)public  CompletableFuture thenApplyAsync(Function super T,? extends U> fn)public  CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)/<code>

通过函数完成对 CompletableFuture 中的值得转化,Async 在线的线程池中处理,Executor 可以自定义线程池。

5.4.4 纯消费函数

上面的方法当计算完成的时候,会生成新的计算结果 (thenApply, handle),或者返回同样的计算结果 whenComplete,CompletableFuture 还提供了一种处理结果的方法,只对结果执行 Action,而不返回新的计算值,因此计算值为 Void。

<code>public CompletableFuture<void> thenAccept(Consumer super T> action)public CompletableFuture<void> thenAcceptAsync(Consumer super T> action)public CompletableFuture<void> thenAcceptAsync(Consumer super T> action, Executor executor)/<void>/<void>/<void>/<code>

其他的参数类型与之前的含义一致,不同的是函数接口 Consumer,这个接口只有输入,没有返回值。

thenAcceptBoth 以及相关方法提供了类似的功能,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的 action,它用来组合另外一个异步的结果。

<code>public  CompletableFuture<void> thenAcceptBoth(CompletionStage   extends U> other, BiConsumer super T,? super U> action)    public    CompletableFuture<void> thenAcceptBothAsync(CompletionStage extends   U> other, BiConsumer super T,? super U> action)    public    CompletableFuture<void> thenAcceptBothAsync(CompletionStage extends   U> other, BiConsumer super T,? super U> action, Executor executor)/<void>/<void>/<void>/<code>
5.4.5. 组合函数

组合函数主要应用于后续计算需要 CompletableFuture 计算结果的场景。

<code>public  CompletableFuture thenCompose(Function super T,? extends CompletionStage> fn)public  CompletableFuture thenComposeAsync(Function super T,? extends CompletionStage> fn)public  
CompletableFuture thenComposeAsync(Function super T,? extends CompletionStage> fn, Executor executor)

这一组方法接受一个 Function 作为参数,这个 Function 的输入是当前的 CompletableFuture 的计算值,返回结果将是一个新的 CompletableFuture,这个新的 CompletableFuture 会组合原来的 CompletableFuture 和函数返回的 CompletableFuture。因此它的功能类似:

<code>A +–> B +—> C /<code>

下面的一组方法 thenCombine 用来复合另外一个 CompletionStage 的结果。两个 CompletionStage 是并行执行的,它们之间并没有先后依赖顺序,other 并不会等待先前的 CompletableFuture 执行完毕后再执行,当两个 CompletionStage 全部执行完成后,统一调用 BiFunction 函数,计算最终的结果。

<code>public  CompletableFuture thenCombine(CompletionStage   extends U> other, BiFunction super T,? super U,? extends V> fn)public  CompletableFuture thenCombineAsync(CompletionStage   extends U> other, BiFunction super T,? super U,? extends V> fn)public  CompletableFuture thenCombineAsync(CompletionStage   extends U> other, BiFunction super T,? super U,? extends V> fn,   Executor executor)/<code>
5.4.6. Either

Either 系列方法不会等两个 CompletableFuture 都计算完成后执行计算,而是当任意一个 CompletableFuture 计算完成的时候就会执行。

<code>public CompletableFuture<void> acceptEither(CompletionStage extends T> other, Consumer super T> action)public CompletableFuture<void> acceptEitherAsync(CompletionStage extends T> other, Consumer super T> action)public CompletableFuture<void> acceptEitherAsync(CompletionStage extends T> other, Consumer super T> action, Executor executor)public  CompletableFuture applyToEither(CompletionStage extends T> other, Function super T,U> fn)public  CompletableFuture applyToEitherAsync(CompletionStage extends T> other, Function super T,U> fn)public  CompletableFuture applyToEitherAsync(CompletionStage extends T> other, Function super T,U> fn, Executor executor)/<void>/<void>/<void>/<code>
5.4.7 辅助方法

辅助方法主要指 allOf 和 anyOf,这两个静态方法用于组合多个 CompletableFuture。

<code>public static CompletableFuture<void> allOf(CompletableFuture>... cfs)// allOf方法是当所有的CompletableFuture都执行完后执行计算。public static CompletableFuture<object> anyOf(CompletableFuture>... cfs)// anyOf方法是当任意一个CompletableFuture执行完后就会执行计算。/<object>/<void>/<code>

