Java8实战笔记0x07

CompletableFuture:组合异步编程(一)

Future接口

  • Future接口是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回公文袋调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作。
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double> () {
    public Double call() {
        return doSomeLongComputation();
    }
});
doSomethingElse();

try {
    Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
    ee.printStackTrace();
} catch (InterruptedException ie) {
    ie.printStackTrace();
} catch (TimeoutException te) {
    te.printStackTrace();
}

Future接口的局限性

  • Future表达能力有限,某些异步计算用Future难以表达。

使用CompletableFuture构建异步应用

将同步方法转换为异步方法

class Shop {
    private Random random = new Random(47);

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    // 同步方法
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    // 异步方法
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> futurePrice.complete(calculatePrice(product))).start();
        return futurePrice;
    }
}

public class Main {
    public static void main(String[] args) throws Exception {
        Shop shop = new Shop();
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
        long invocationTime = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Invocation returned after " + invocationTime + " msecs");
        // 执行其它操作
        Shop.delay();
        try {
           double price = futurePrice.get();
            System.out.printf("Price is %.2f%n", price);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Price returned after " + retrievalTime + "msecs");
    }
}

错误处理

  • 如果异步方法发生错误,其异常会被限制在当前线程中,并且该线程最终会被杀死,而这会导致get方法永远阻塞。可以使用重载后的get方法,支持超时参数来防止永久阻塞,程序会得到TimeoutException。但是还是无法得知异步方法线程的错误原因,此时应该使用CompletableFuturecompleteExceptionally方法将导致CompletableFuture内发生的问题抛出。
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try {
            futurePrice.complete(calculatePrice(product));
        } catch (Exception e) {
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

使用工厂方法创建CompletableFuture

  • CompletableFuture.supplyAsync()是一个工厂方法,接受一个Supplier<T>参数,返回一个CompletableFuture对象。使用该方法创建的CompletableFuture与上面带有异常处理的代码是等价的。
public Future<Double> geetPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

免受阻塞

class Shop {
    private Random random = new Random();
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    // 同步方法
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    // 异步方法
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            try {
                futurePrice.complete(calculatePrice(product));
            } catch (Exception e) {
                futurePrice.completeExceptionally(e);
            }
        }).start();
        return futurePrice;
    }
}
public class Main {
    public static List<String> findPrices(String product, List<Shop> shops) {
        return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(Collectors.toList());
    }
    public static void main(String[] args) throws Exception {
        List<Shop> shops = Arrays.asList(
                new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll")
        );
        long start = System.nanoTime();
        System.out.println(findPrices("Fallout New Vegas", shops));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");
    }
}
/* Output:
[BestPrice price is 97.75, LetsSaveBig price is 118.44, MyFavoriteShop price is 98.66, BuyItAll price is 149.10]
Done in 4013 msecs
*/

使用并行流对请求进行并行操作

public class Main {
    public static List<String> findPrices(String product, List<Shop> shops) {
        return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(Collectors.toList());
    }
    public static void main(String[] args) throws Exception {
        List<Shop> shops = Arrays.asList(
                new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll")
        );
        long start = System.nanoTime();
        System.out.println(findPrices("Fallout New Vegas", shops));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");
    }
}
/* Output:
[BestPrice price is 163.63, LetsSaveBig price is 125.60, MyFavoriteShop price is 148.02, BuyItAll price is 118.71]
Done in 1022 msecs
*/

使用CompletableFuture发起异步请求

public class Main {
    public static List<String> findPrices(String product, List<Shop> shops) {
        List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getName() + " price is " + shop.getPrice(product)
                ))
                .collect(Collectors.toList());
        return priceFuture.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }
    public static void main(String[] args) throws Exception {
        List<Shop> shops = Arrays.asList(
                new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll")
        );
        long start = System.nanoTime();
        System.out.println(findPrices("Fallout New Vegas", shops));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");
    }
}
/* Output:
[BestPrice price is 116.10675116625683, LetsSaveBig price is 159.1609417232611, MyFavoriteShop price is 166.64834055365026, BuyItAll price is 121.8279536327426]
Done in 1044 msecs
*/
  • 此处执行时间远低于书中所述(2005 ms),比较接近并行流的性能。有待查证。和CPU逻辑处理器数量有关,运行环境是12核,因此当把商店数量改为13后,执行时间为2063 ms。

使用定制的执行器

  • 可以使用线程池。线程池大小估算:T = N * U * (1 + W / C),其中,T为线程池大小,N为CPU核数,U为期望的CPU利用率,W / C为等待时间与计算时间比率。
public class Main {
    private static final List<Shop> shops = Arrays.asList(
            new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("Steam"),
            new Shop("Epic"),
            new Shop("GOG"),
            new Shop("Taptap"),
            new Shop("W"),
            new Shop("V"),
            new Shop("Z"),
            new Shop("Y"),
            new Shop("X")
    );
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 100),
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

    public static List<String> findPricesByFuture(String product) {
        List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getName() + " price is " + shop.getPrice(product)
                ))
                .collect(Collectors.toList());
        return priceFuture.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static List<String> findPricesByFutureWithExec(String product) {
        List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getName() + " price is " + shop.getPrice(product),
                        executor
                ))
                .collect(Collectors.toList());
        return priceFuture.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static void test(Function<String, List<String>> findPrices) {
        long start = System.nanoTime();
        System.out.println(findPrices.apply("Fallout New Vegas"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");
    }

    public static void main(String[] args) throws Exception {
        test(Main::findPricesByFuture);
        test(Main::findPricesByFutureWithExec);
    }
}
/* Output:
[BestPrice price is 123.81188673960165, LetsSaveBig price is 134.2037700185142, MyFavoriteShop price is 108.25237001214239, BuyItAll price is 115.00922099155312, Steam price is 111.23106850193753, Epic price is 99.22328167208421, GOG price is 164.50758310616402, Taptap price is 121.17345205244206, W price is 123.06486848925549, V price is 138.86613263159464, Z price is 160.93733921008214, Y price is 150.61296275286742, X price is 151.1427293015105]
Done in 2063 msecs
[BestPrice price is 147.8957557234686, LetsSaveBig price is 115.91180890944013, MyFavoriteShop price is 145.9127196738407, BuyItAll price is 153.58049341481873, Steam price is 127.4807398778012, Epic price is 143.5858168683404, GOG price is 151.72178968386282, Taptap price is 140.27525423457325, W price is 142.2637869480202, V price is 115.3615656625179, Z price is 151.1027261194255, Y price is 121.78157514886323, X price is 150.22069837374056]
Done in 1003 msecs
*/

并行的方案选择

  • 如果进行计算密集的操作,并且没有I/O,那么推荐使用Stream接口。反之,如果并行单元需要等待I/O操作,那么使用CompletableFuture灵活性更好,还可以根据需要设置线程数。而且,如果此时使用流,那么流的延迟特性会导致难以判断什么时候触发了等待。
  • 流的延迟特性:Stream的操作由零个或多个中间操作和一个结束操作两部分组成。只有执行了结束操作,Stream定义的中间操作才会依次执行。

剑指Offer面试题-实现单例模式

实现单例模式

懒汉式单例

  • 懒汉式单例指第一次调用时才进行实例化。

双重检验锁

class Singleton {
    private volatile static Singleton singleton = null;

    private Singleton() {

    }

    public static Singleton getSingleton() {
        if (singleton == null) {
            synchronized (Singleton.class) {
                if (singleton == null) {
                    singleton = new Singleton();
                }
            }
        }
        return singleton;
    }
}
  • 构造函数应当设为private,获取单例函数应设为public static

  • 单例变量应当声明为volatile,禁止指令重排序。由于初始化实例singleton = new Singleton();不是原子操作,而是分为4个步骤:

    1. 申请内存空间
    2. 初始化默认值
    3. 执行构造器方法
    4. 连接引用和实例

    其中3和4是可以重排序的。如果线程a执行顺序为1243,执行4后切换到线程b,此时单例变量不为空,线程b将获得一个没有初始化完成的对象。

  • synchronized上锁对象是Singleton.class

  • synchronized代码块中还应该进行一次对实例的判空,因为如果线程a通过了第一个判空后,切换到线程b一直执行,直到创建单例再切回线程a,此时线程a已经不需要再创建单例了。

  • 可见性由synchronized保证。

静态内部类

class Singleton {
    private static Singleton singleton = null;

    private Singleton() {

    }

    private static class StaticSingleton {
        private static final Singleton SINGLETON = new Singleton();
    }

    public static Singleton getSingleton() {
        return StaticSingleton.SINGLETON;
    }
}
  • 该方式是线程安全的,因为JVM在执行类的初始化阶段,会获得一个可以同步多个线程对同一个类的初始化的锁。假设线程a先进行了初始化,那么线程b一直等待初始化锁。线程a执行类初始化,即使发生了重排序,也不会影响线程a的初始化。线程a初始化完后,释放锁。线程b获得初始化锁,发现Singleton对象已经初始化完毕,释放锁,不进行初始化,获得Singleton对象。
  • 上面的静态内部类式单例是不能防止反射攻击的,网上有些博客说是可以,我认为是错误的。攻击代码也是一般思路,也可能有优化空间。详见下面代码。
class Singleton {
    private static Singleton singleton = null;

    private Singleton() {

    }

    private static class StaticSingleton {
        private static final Singleton SINGLETON = new Singleton();
    }

    public static Singleton getSingleton() {
        return StaticSingleton.SINGLETON;
    }
}

public class Main {
    public static void main(String[] args) throws IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
        // 保存初始单例
        Singleton s = Singleton.getSingleton();
        // 获取Singleton构造器,并创建一个新的实例
        Constructor<?> c = Singleton.class.getDeclaredConstructors()[0];
        c.setAccessible(true);
        Singleton s2 = (Singleton)c.newInstance();
        // 获取内部类的构造器,并创建一个实例(可能不需要此步骤,没有测试)
        Constructor<?> in = Singleton.class.getDeclaredClasses()[0].getDeclaredConstructors()[0];
        in.setAccessible(true);
        Object ins = in.newInstance();
        // 获取单例成员变量
        Field f = ins.getClass().getDeclaredFields()[0];
        // 去除final修饰符(修改final成员通用方法)
        Field modifiersField = Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
        // 修改单例成员的值为新实例
        f.setAccessible(true);
        f.set(ins, s2);
        // 比较
        System.out.println(s.equals(Singleton.getSingleton()));;
    }
}
/* Output:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by Main (file:./testJava/out/production/testJava/) to field java.lang.reflect.Field.modifiers
WARNING: Please consider reporting this to the maintainers of Main
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
false
*/

线程不安全

class Singleton {
    private static Singleton singleton = null;

    private Singleton() {

    }

    public static Singleton getInstance() {
        if (singleton == null) {
            singleton = new Singleton();
        }
        return singleton;
    }
}
  • 仅适用于单线程。

对象锁

class Singleton {
    private static Singleton singleton = null;

    private Singleton() {

    }

    public static synchronized Singleton getInstance() {
        if (singleton == null) {
            singleton = new Singleton();
        }
        return singleton;
    }
}
  • 线程安全,但是每次调用getInstance()时都会锁住对象,引起线程阻塞,效率较低。

饿汉式单例

  • 指在类初始化时就已经实例化。

使用类实现

public class Singleton {
    private static Singleton singleton = new Singleton();

    private Singleton() {

    }

    public static Singleton getInstance() {
        return singleton;
    }
}
  • 线程安全,JVM会保证类只加载一次。

使用枚举实现

public enum Singleton {
    INSTANCE;
}
  • 枚举实例只会初始化一次,因此可以保证是单例。
  • 线程安全,同样JVM会保证枚举只加载一次。
  • 枚举实现单例可以防止反射调用构造函数,枚举在反编译后是abstact class,无法实例化。并且在反射调用构造函数时会检查所调用的构造函数是否属于枚举类型的,若是则抛出异常。
  • 枚举实现单例可以防止反序列化生成新的实例,因为枚举反序列化时则是通过java.lang.EnumvalueOf方法来根据名字查找枚举对象。

Java8实战笔记0x06

用Optional取代null

如何为缺失值建模

class Person {
    private Car car;
    public Car getCar() {
        return car;
    }
}

class Car {
    private Insurance insurance;

    public Insurance getInsurance() {
        return insurance;
    }
}

class Insurance {
    private String name;

    public String getName() {
        return name;
    }
}

public class Main {
    public static String getCarInsuranceName(Person person) {
        return person.getCar().getInsurance().getName();
    }
}
  • 一般情况下,如果Person没有Car,那么getCar()会设置为返回null,表示该值缺失。因此需要判断返回值是否为null来防止出现NullPointerException

采用防御式检查减少NullPointerException

  • 深层质疑:会增加代码缩进层数,不具备扩展性,代码维护困难。
public static String getCarInsuranceName(Person person) {
        if (person != null) {
            Car car = person.getCar();
            if (car != null) {
                Insurance insurance = car.getInsurance();
                if (insurance != null) {
                    return insurance.getName();
                }
            }
        }
        return "Unknown";
    }
  • 过多的退出语句:退出点数量多,难以维护。
public static String getCarInsuranceName(Person person) {
        if (person == null) {
            return "Unknown";
        }
        Car car = person.getCar();
        if (car == null) {
            return "Unknown";
        }
        Insurance insurance = car.getInsurance();
        if (insurance == null) {
            return "Unknown";
        }
        return insurance.getName();
    }

null带来的问题

  • 错误之源:NullPointerException是目前Java程序开发中最典型的异常。
  • 代码膨胀:需要深度嵌套null检查,可读性极差。
  • 毫无意义:null自身没有任何语义,是以一种错误的方式对缺失值建模。
  • 违反哲学:Java一直试图避免让程序员意识到指针的存在,但null指针例外。
  • 类型缺失:null不属于任何类型,这意味着它可以被赋值给任何变量,而当这个变量传递给系统的其它部分时,将无法确定null变量最初是什么类型。

其他语言中null的替代品

  • Groovy:引入安全导航操作符,可以安全的访问可能为null的变量,避免抛出NullPointerException
def carInsuranceName = person?.car?.insurance?.name
  • Haskell:Maybe类型,本质上是optional值的封装。
  • Scala:Option[T],要显式调用avaliable操作检查该变量是否有值,其实是变相的null检查。

Optional类简介

  • 变量存在时,Optional类型只是对对象的简单封装。变量不存在时,缺失的值会被建模成一个“空”的Optional对象,由方法Optional.empty()返回。Optional.empty()方法是一个静态工厂方法,它返回Optional类的特定单一实例。
class Person {
    private Car car;

    public Optional<Car> getCar() {
        return Optional.ofNullable(car);
    }
}

class Car {
    private Insurance insurance;

    public Optional<Insurance> getInsurance() {
        return Optional.ofNullable(insurance);
    }
}

class Insurance {
    private String name;

    public String getName() {
        return name;
    }
}
  • Optionalnull的重要语义区别是,Optional类清楚地表明允许发生变量缺失,null则不允许。在代码中,如果变量不能为null,那么也不需要为其添加null检查,因为null的检查只会掩盖问题,并未真正修复问题。
  • 书中对应上面代码部分在声明成员变量时形如private Optional<Car> car,但此时IDEA报出警告'Optional' used as field or parameter type,参考StackOverFlowOptional作为成员变量或者函数参数类型会导致不必要的包装和额外的解包逻辑,且无法序列化,因此只需在返回值处使用Optional

应用Optional的几种模式

创建Optional对象

  • 声明一个空的Optional
Optional<Car> optCar = Optional.empty();
  • 依据一个非空值创建Optional
Optional<Car> optCar = Optional.of(car);
  • 可接受nullOptional,如果传入null则获得空对象
Optional<Car> optCar = Optional.ofNullable(car);

使用map从Optional对象中提取和转换值

Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
Optional<String> name = optInsurance.map(Insurance::getName);
  • 如果Optional包含一个值,那函数就将该值作为参数传递给map,对该值进行转换。如果Optional为空,就什么也不做。

使用flatMap链接Optional对象

Optional<Person> optPerson = Optional.of(person);
//!Optional<String> name = optPerson.map(Person::getCar)
//!    .map(Car::getInsurance)
//!    .map(Insurance::getName);
  • 上面代码注释处无法通过编译。第一个map返回值类型是Optional<Optional<Car>>,而不是需要的Optional<Car>
  • 解决上述问题应该使用flatMapflatMap方法接受一个函数作为参数,这个函数的返回值是另一个流。这个方法会应用到流的每一个元素,最终形成一个新的流。
// 与书中略有不同,详情见上
public static String getCarInsuranceName(Person person) {
        return Optional.ofNullable(person)
                .flatMap(Person::getCar)
                .flatMap(Car::getInsurance)
                .map(Insurance::getName)
                .orElse("Unknown");
    }

默认行为及解引用Optional对象

  • get():如果变量存在则返回封装的变量值,否则抛出NoSuchElementException异常。不推荐使用。
  • orElse(T other):如果变量存在则返回,否则返回传入的默认值。
  • orElseGet(Supplier<? extends X> other):是orElse()的延迟调用版。传入的Supplier只有在变量不存在时调用。如果创建对象是耗时操作应该使用该方法。
  • orElseThrow(Supplier<? extends x> exceptionSupplier):类似于get(),不过抛出的错误由传入的Supplier创建。
  • ifPresent(Consumer<? extends T> consumer):在变量存在时执行传入的Consumer,否则就不进行任何操作。

两个Optional对象组合

public Optional<Insurance> safeFind(Person person, Car car) {
    return Optional.ofNullable(person)
            .flatMap(
                   p -> Optional.ofNullable(car).map(c -> find(p, c))
            );
}
  • 首先,c -> find(p, c)返回Insurance,而流的类型为Optional,因此不需要flatMap。之后p -> Optional.ofNullable(car).map(c -> find(p, c))得到的就是Optional<Insurance>Optional.ofNullable(person)的流类型是Optional<Person>,因此需要flatMap

使用filter剔除特定的值

  • filter方法接受一个谓词作为参数。如果Optional对象的值存在,并且它符合谓词条件,filter方法就返回其值;否则它就返回一个空的Optional对象。
public String getCarInsuranceName(Optional<Person> person, int minAge) {
    return person.filter(p -> p.getAge() >= minAge)
        .flatMap(Person::getCar)
        .flatMap(Car::getInsurance)
        .map(Insurance::getName)
        .orElse("Unknown");
}

使用Optional的实战示例

用Optional封装可能为null的值

// 原始代码
Object value = map.get("key");
// if value != null ...

// 使用Optional
Optional<Object> value = Optional.ofNullable(map.get("key"));

异常与Optional对比

// 将String转为Integer
public static Optional<Integer> stringToInt(String s) {
    try {
        return Optional.of(Integer.parseInt(s));
    } catch (NumberFormatExcption e) {
        return Optional.empty();
    }
}

// Java 9中添加了Stream.ofNullable,如果对象为null,则返回OptionalInt
public OptionalInt stringToInt(String s) {
    return Stream.ofNullable(s)
        .filter(str -> str.matches("\\d+"))
        .mapToInt(Integer::parseInt)
        .findAny();
}

Java8实战笔记0x05

默认方法

  • 一旦类库设计者需要更新接口,向其中加入新的方法,继承该接口的代码就需要做出相应的修改。Java 8引入了新的解决方法,一是允许在接口中声明静态方法,二是引入默认方法,通过默认方法可以指定接口方法的默认实现,默认方法使用default修饰。

不断演进的API

不同类型的兼容性

  • 变更对Java程序的影响大体可以分成三种类型的兼容性:
    • 二进制级的兼容性,表示现有的二进制执行文件能无缝持续链接(包括验证、准备和解析)和运行。例如向接口添加一个方法就是二进制级的兼容,如果添加方法不被调用,那么现有程序就不会出现错误。
    • 源代码级的兼容性,表示引入变化之后,现有的程序依然能成功编译通过。例如向接口添加新的方法就不是源代码级的兼容,因为遗留代码没有实现新引入的方法,所以无法顺利通过编译。
    • 函数行为的兼容性,表示变更发生之后,程序接受同样的输入能得到同样的结果。还是例如向接口添加新的方法,是函数行为兼容的,因为新的方法没有调用,或者被实现覆盖而没有影响其表现。

默认方法的使用模式

可选方法

  • 使用默认方法,可以为那些用户不会经常使用的,但是接口中包含的方法提供一个默认实现,这样实体类就无需在自己的实现中显示地提供一个空方法。
interface Iterator<T> {
    boolean hasNext();
    T next();
    default void remove() {
        throw new UnsupportedOperationException();
    }
}

行为的多继承

  • 类型的多继承:允许实现多个接口,而接口可以有默认实现,实质上实现了多继承。
  • 用正交方法精简接口:分解实体类的不同功能点并设计接口,降低接口的重合度。
  • 注意,继承不应该成为代码复用的万金油,例如继承一个100个方法的类就不是一个好选择,因为这会引入不必要的复杂性。可以使用代理模式有效的规避这类问题。

解决冲突的规则

解决为题的三条规则

  1. 类中方法的优先级最高。类或父类中声明的方法的优先级高于任何声明为默认方法的优先级。
  2. 如果1.无法判断,那么子接口的优先级最高。函数签名相同时,优先选择拥有最具体实现的默认方法的接口,如果B继承了A,那么BA更具体。
  3. 如果2.也无法判断,继承了多个接口的类必须通过显式覆盖和调用期望的方法

继承一个类,实现两个接口的情况

  • 注意,上图中D未覆盖hello()方法,但它实现了接口A,那么它拥有A的默认方法;此外,C实现了接口B,因此编译器会在接口A和接口Bhello()之间选择(而不是类D和接口B之间)。由于B 更加具体,所以会选择接口Bhello()方法。

冲突及如何显式地消除歧义

同时实现具有相同函数声明的两个接口

  • 上图中,编译器将无法判断哪一个接口的实现更加具体,此时应该使用显式地声明来确定使用哪一个方法,或者覆盖它。
public class C implements B, A {
    void hello() {
        B.super.hello();
    }
}

菱形继承问题

菱形问题 - 1

  • 上图中实际上只有一个方法可选,即接口A的默认方法。

菱形问题 - 2

  • 此时接口B比接口A更加具体,因此使用接口Bhello()方法。

菱形问题 - 3

  • 上面这种情况会出现冲突,需要显式指定方法。

jOOQ逆向生成Java代码

jOOQ逆向生成Java代码

引入

要操作一下GeoIP库,打算接触一下ORM。又因为最近在看Stream,想找一种类似流水线的Java处理数据库相关的框架。jOOQ符合了我的需求。

官网文档连接

关于xml

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<configuration xmlns="http://www.jooq.org/xsd/jooq-codegen-3.11.0.xsd">
  <!-- Configure the database connection here -->
  <jdbc>
    <driver>com.mysql.jdbc.Driver</driver>
    <url>jdbc:mysql://localhost:3306/library</url>
    <user>root</user>
    <password></password>
  </jdbc>

  <generator>
    <!-- The default code generator. You can override this one, to generate your own code style.
         Supported generators:
         - org.jooq.codegen.JavaGenerator
         - org.jooq.codegen.ScalaGenerator
         Defaults to org.jooq.codegen.JavaGenerator -->
    <name>org.jooq.codegen.JavaGenerator</name>

    <database>
      <!-- The database type. The format here is:
           org.jooq.meta.[database].[database]Database -->
      <name>org.jooq.meta.mysql.MySQLDatabase</name>

      <!-- The database schema (or in the absence of schema support, in your RDBMS this
           can be the owner, user, database name) to be generated -->
      <inputSchema>library</inputSchema>

      <!-- All elements that are generated from your schema
           (A Java regular expression. Use the pipe to separate several expressions)
           Watch out for case-sensitivity. Depending on your database, this might be important! -->
      <includes>.*</includes>

      <!-- All elements that are excluded from your schema
           (A Java regular expression. Use the pipe to separate several expressions).
           Excludes match before includes, i.e. excludes have a higher priority -->
      <excludes></excludes>
    </database>

    <target>
      <!-- The destination package of your generated classes (within the destination directory) -->
      <packageName>test.generated</packageName>

      <!-- The destination directory of your generated classes. Using Maven directory layout here -->
      <directory>C:/workspace/MySQLTest/src/main/java</directory>
    </target>
  </generator>
</configuration>

上面直接贴了官网样例。可能需要修改的有:

  • 数据库连接jdbc
  • 要生成Java代码对应的数据库名inputSchema
  • 目标路径directory和目标包名packageName

生成步骤

首先按官网文档所说,下载jooq-3.11.11.jarjooq-codegen-3.11.11.jarjooq-meta-3.11.11.jar以及mysql-connector-java-5.1.36.jar四个jar包到某个临时目录(例如D:\temp),将上面的xml文件也放进该目录(命名为x.xml)。

java -classpath jooq-3.11.11.jar;jooq-meta-3.11.11.jar;jooq-codegen-3.11.11.jar;mysql-connector-java-5.1.36.jar;. org.jooq.codegen.GenerationTool x.xml

如果参考了官网的命令,注意去掉谜之换行以及MySQL驱动jar包名称后面的-bin


Java8实战笔记0x04

重构、测试和调试

为改善可读性和灵活性重构代码

从匿名类到Lambda表达式的转换

  • 将实现单一抽象方法的匿名类转换为Lambda表达式。
Runnable r1 = new Runnable() {
    public void run() {
        System.out.println("Hello");
    }
};

Runnable r2 = () -> System.out.println("Hello");

转换时应当注意:

  • 匿名类和Lambda表达式中的thissuper的含义是不同的。在匿名类中,this代表的类是自身,但在Lambda中代表的是包含类(外部类)。
  • 匿名类可以屏蔽包含类的变量,而Lambda表达式不能。
int a = 10;
Runnable r1 = () -> {
    //! int a = 2;
    System.out.println(a);
};

Runnable r2 = new Runnable() {
    int a = 2; //正确
    System.out.println(a);
};
  • 在涉及重载的上下文里,Lambda表达式将无法确定其类型,而匿名类则在初始化时确定了其类型。
interface Task {
    public void execute();
}
public class Main {

    public static void test(Runnable runnable) {
        System.out.println("in runnable");
        runnable.run();
    }
    public static void test(Task task) {
        System.out.println("in task");
        task.execute();
    }
    public static void main(String[] args) {
        //! Main.test(() -> System.out.println("error"));
        // 无法通过编译
        // Error:(22, 13) java: 对test的引用不明确
        //     Main 中的方法 test(java.lang.Runnable) 和 Main 中的方法 test(Task) 都匹配
        Main.test((Task) () -> System.out.println("right"));
        // 使用强制转换,正确
    }
}

从Lambda表达式到方法引用的转换

  • 为了改善代码的可读性,尽量使用方法引用,因为方法名往往能更直观地表达代码的意图。另外,还应该考虑使用静态辅助方法,比如comparingmaxBy

从命令式的数据处理切换到Stream

  • Stream API能更清晰地表达数据处理管道的意图。另外,通过短路和延迟载入以及利用多核,可以对Stream进行优化。但是这是一个困难的任务,因为要选择适当的流操作来还原控制流语句,例如breakcontinue以及return
List<String> dishName = new ArrayList<>();
for (Dish dish: menu) {
    if (dish.getCalories() > 300) {
        dishNames.add(dish.getName());
    }
}

menu.parallelStream()
    .filter(d -> d.getCalories() > 300)
    .map(Dish::getName)
    .collect(toList());

使用Lambda重构面向对象的设计模式

策略模式

  • 策略模式代表了解决一类算法的通用解决方案,可以在运行时选择使用哪种方案。策略模式包含三部分内容:
    • 一个代表某个算法的接口。
    • 一个或多个该接口的具体实现,它们代表了算法的多种实现。
    • 一个或多个使用策略对象的客户。
interface ValidationStrategy {
    boolean execute(String s);
}

class IsAllLowerCase implements ValidationStrategy {
    @Override
    public boolean execute(String s) {
        return s.matches("[a-z]+");
    }
}

class IsNumberic implements ValidationStrategy {
    @Override
    public boolean execute(String s) {
        return s.matches("\\d+");
    }
}

class Validator {
    private final ValidationStrategy strategy;

    public Validator(ValidationStrategy strategy) {
        this.strategy = strategy;
    }

    public boolean validate(String s) {
        return strategy.execute(s);
    }
}

public class Main {
    public static void main(String[] args) {
        Validator nv1 = new Validator(new IsNumberic());
        boolean b1 = nv1.validate("aaa");
        Validator lv1 = new Validator(new IsAllLowerCase());
        boolean b2 = lv1.validate("bbbb");

        Validator nv2 = new Validator((String s) -> s.matches("[a-z]+"));
        b1 = nv2.validate("aaa");
        Validator lv2 = new Validator((String s) -> s.matches("\\d+"));
        b2 = lv2.validate("bbb");
    }
}

模版方法

  • 如果需要采用某个算法的框架,同时又希望有一定的灵活度,能对它的某些部分进行改进,那么采用模版方法设计模式是比较通用的方案。
abstract class OnlineBanking {
    public void processCustomer(int id) {
        Custormer c = Database.getCustormerWithId(id);
        makeCustomerHappy(c);
    }
    abstract void makeCustomerHappy(Customer c);
}
  • 上面代码搭建的在线银行算法框架,不同的支行可以通过继承OnlineBanking来提供不同的服务。
class OnlineBanking {
    public void processCustomer(int id, Consumer<Customer> makeCustomerHappy) {
        makeCustomerHappy.accept(c);
    }
}
//...
new OnlineBanking().processCustomer(1337, (Customer c) -> System.out.println("Hello!"));

观察者模式

  • 观察者模式是,某些事件发生时(比如状态转变),如果一个对象(主题)需要自动地通知其他多个对象(观察者)。
// 观察者接口
interface Observer {
    void notify(String tweet);
}
// 不同的观察者
class NYTimes implements Observer {
    public void notify(String tweet) {
        if (tweet != null && tweet.contains("money")) {
            System.out.println("Breaking news in NY! " + tweet);
        }
    }
}

class Guardian implements Observer {
    public void notify(String tweet) {
        if (tweet != null && tweet.contains("queen")) {
            System.out.println("Yet another news in London... " + tweet);
        }
    }
}

class LeMonde implements Observer {
    public void notify(String tweet) {
        if (tweet != null && tweet.contains("wine")) {
            System.out.println("Today cheese, wine and news! " + tweet);
        }
    }
}
// 主题接口
interface Subject {
    void registerObserver(Observer o);
    void notifyObservers(String tweet);
}
class Feed implements Subject {
    private final List<Observer> observers = new ArrayList<>();

    public void registerObserver(Observer o) {
        this.observers.add(o);
    }

   public void notifyObservers(String tweet) {
       observers.forEach(o -> o.notify(tweet));
   }
}
//...
// 使用
Feed f = new Feed();
f.registerObserver(new NYTimes());
f.registerObserver(new Guardian());
f.registerObserver(new LeMonde());
f.notifyObservers("The queen said her favourite book is Java 8 in Action!");

// 优化观察者声明和注册
f.registerObserver((String tweet) -> {
    if (tweet != null && tweet.contains("money")) {
            System.out.println("Breaking news in NY! " + tweet);
        }
});
//...
  • 上面示例仅限简单的观察者模式。如果观察者的逻辑十分复杂,或者持有状态、定义了多个方法等等,此时应该继续使用类的方式。

责任链模式

  • 责任链模式是一种创建处理对象序列的通用方案。一个处理对象可能需要在完成一些工作之后,将结果传递给另一个对象,这个对象接着做一些工作,再转交给下一个处理对象,以此类推。
// 处理抽象类
abstract class ProcessingObject<T> {
    protected ProcessingObject<T> successor;
    public void setSuccessor(ProcessingObject<T> successor) {
        this.successor = successor;
    }
    public T handle(T input) {
        T r = handleWork(input);
        if (successor != null) {
            return successor.handle(r);
        } 
        return r;
    }
    abstract protected T handleWork(T input);
}
// 处理类
class HeaderTextProcessing extends ProcessingObject<String> {
    public String handleWord(String text) {
        return "From Raoul, Mario and Alan: " + text;
    }
}

class SpellCheckerProcessing extends ProcessingObject<String> {
    public String handleWord(String text) {
        return text.replaceAll("labda", "lambda");
    }
}
// 使用
ProcessingObject<String> p1 = new HeaderTextProcessing();
ProcessingObjct<String> p2 = new SpellCheckerProcessing();
p1.setSuccessor(p2);
String result = p1.handle("Aren't labdas really sexy?!!");
System.out.println(result);

// 使用Lambda优化处理对象实现和连接
UnaryOperator<String> headerProcessing = (String text) -> "From Raoul, Mario and Alan: " + text;
UnaryOperator<String> spellCheckerProcessing = (String text) -> text.replaceAll("labda", "lambda");
Function<String, String> pipeline = headerProcessing.andThen(spellCheckerProcessing);
String result = pipeline.apply("Aren't labdas really sexy?!!");

工厂模式

  • 使用工厂模式,无需向客户端暴露实例化的逻辑就能完成对象的创建。
public class ProdectFactory {
    public static Product createProduct(String name) {
        switch(name) {
            case "loan": return new Loan();
            case "stock": return new  Stock();
            case "bond": return new Bond();
            default: throw new RuntimeException("No such product " + name);
        }
    }
}
// ...
// 使传统的工厂模式
Product p = ProductFactory.createProduct("loan");

// 使用Lambda表达式优化工厂模式
final static Map<String, Supplier<Product>> map = new HashMap<>();
static {
    map.put("load", Loan::new);
    map.put("stock", Stock::new);
    map.put("bond", Bond::new);
}
public static Product createProduct(String name) {
    Supplier<Product> p = map.get(name);
    if (p != null) return p.get();
    throw new IllegalArgumentExcetption("No such product " + name);
}

测试

  • 没有必要对Lambda表达式进行测试,更应该关注外围方法的的行为。

调试

  • 方法引用的错误可以体现在栈跟踪中,而Lambda表达式由于没有名字,栈跟踪显示的错误会比较难以理解。此时应该使用日志调试,即在流水线中插入peek方法查看中间值。

Java8实战笔记0x03

并行数据处理与性能

并行流

将顺序流转换为并行流

  • 可以把流转换成并行流,从而让前面的函数归约过程并行运行。对顺序流调用parallel()方法。
public static long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
        .limit(n)
        .parallel()
        reduce(0L, Long::sum);
}
  • 对顺序流调用parallel()方法不意味着流本身有任何实际的变化。它内部实际上就是设了一个boolean标志,表示让调用parallel()方法之后进行的所有操作都并行执行。类似的,对并行流调用sequential()方法就可以把它变成顺序流。不过在一个流水线中,只有最后一个parallel()或者squential()方法生效,影响整个流水线。

高效使用并行流

  • 首先应该进行测量。并行流并不总是比顺序流快。
  • 留意装箱。自动装箱合拆箱操作会大大降低性能。可以使用原始类型流(IntStreamLongStreamDoubleStream)来避免这种操作。
  • 有些操作本身在并行流上的性能就比顺序流差,特别是limitfindFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。
  • 要考虑流的操作流水线的总计算成本。设N是要处理的元素总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
  • 对于较小的数据量,选择并行流几乎不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销
  • 要考虑流背后的数据结构是否容易分解。例如ArrayList的拆分效率比LinkedList高得多,因为前者不用遍历就可以平均拆分,后者则必须遍历。
  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如一个流可以分成大小相同的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能会丢弃的元素个数却无法预测,导致流的大小未知。
  • 要考虑终端操作中合并步骤的代价。如果代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超过通过并行流得到的性能提升。

分支/合并框架

  • 分支/合并框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

使用RecursiveTask

  • 要把任务提交到线程池,必须创建RecursiveTask<R>的一个子类,其中R时并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型。要定义RecursiveTask只需要实现它唯一的抽象方法compute。这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再久拆分或不方便再拆分时,生成单个子任务结果的逻辑。
class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;

    public static final long THRESHOLD = 10_000;

    public ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + (length >> 1));
        leftTask.fork();
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + (length >> 1), end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

使用分支/合并框架的最佳做法

  • 对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
  • 不应该在RecursiveTask内部使用ForkJoinPoolinvoke方法。相反,应该直接调用computefork方法,只有顺序代码才应该用invoke来启动并行计算。
  • 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左右两边的子任务调用fork的效率要比直接对其中一个调用compute低。直接调用compute可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
  • 调试使用分支/合并框架的并行计算可能有点棘手,特别是查看栈跟踪无法起作用,因为调用compute的线程并不是概念上的调用方。
  • 和并行流一样,在多核处理器上使用分支/合并框架不一定比顺序执行快。分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。

工作窃取

  • 在理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。但是实际上,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,例如磁盘访问慢,或是需要和外部服务进行协调执行。
  • 分支/合并框架使用工作窃取来解决工作量不平衡的问题。在实际使用中,任务会被差不多地分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。如果某个线程早早完成了分配给它的所有任务,也就是它的任务队列已经空了,而其它的线程还很忙。这时,该线程会随机选择一个别的线程,从其队尾“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

Spliterator

  • Spliterator,和Iterator一样用于遍历数据源中的元素,但它是为了并行执行而设计的。
// Spliterator接口定义
public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characteristics();
}

拆分过程

拆分过程状态转换

  • Stream拆分成多个部分的算法是一个递归过程,不断地对Spliterator调用trySplit直到它返回null。当所有的Spliterator都返回null则拆分终止。
  • Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代表Spliterator本身特性集的编码。
特性 含义
ORDERED 元素有既定顺序(例如List),因此Spliterator在遍历和划分时也会遵守这一顺序
DISTINCT 对于任意一对遍历过的元素xyx.equals(y)返回false
SORTED 遍历的元素按照一个预定义的顺序排序
SIZED Spliterator由一个已知大小的源建立(例如Set),因此estimatedSize()返回的是准确值
NONNULL 保证遍历的元素不会为null
IMMUTABLE Spliterator的数据源不能被修改。这意味着在遍历时不能添加、删除或修改任何元素
CONCURRENT Spliterator的数据源可以被其他线程同时修改而无需同步
SUBSIZED Spliterator和所有从它拆分出来的Spliterator都是SIZED

实现自定义Spliterator

class WordCounterSpliterator implements Spliterator<Character> {
    private final String string;
    private int currentChar = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        action.accept(string.charAt(currentChar++));
        return currentChar < string.length();
    }

    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentChar;
        if (currentSize < 10) {
            return null;
        }
        // 从中间开始,找下一个空白字符,然后拆分,以避免从单词中间拆开
        for (int splitPos = (currentSize >> 1) + currentChar; splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
                Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
                currentChar = splitPos;
                return spliterator;
            }
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}

class WordCounter {
    private final int counter;
    private final boolean lastSpace;

    public WordCounter(int counter, boolean lastSpace) {
        this.counter = counter;
        this.lastSpace = lastSpace;
    }

    // 遍历,见状态转换图
    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(counter, true);
        } else {
            return lastSpace ? new WordCounter(counter + 1, false) : this;
        }
    }

    // 归约方法,将两个WordCounter合并为一个
    public WordCounter combine(WordCounter wordCounter) {
        return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
    }

    public int getCounter() {
        return counter;
    }
}

public class Main {

    private static int countWords(Stream<Character> stream) {
        // 使用流归约
        WordCounter wordCounter = stream.reduce(
                new WordCounter(0, true),
                WordCounter::accumulate,
                WordCounter::combine
        );
        return wordCounter.getCounter();
    }

    public static void main(String[] args) {
        final String SENTENCE = " Nel mezzo del cammin di nostra vita mi ritrovai in una selva oscura" +
                " che la dritta via era smarrita ";
        Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
        Stream<Character> stream = StreamSupport.stream(spliterator, true);

        System.out.println("Found " + Main.countWords(stream) + " words");
    }
}
  • tryAdvance方法的行为类似于普通的Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且还有其他元素要遍历就返回true。在上面代码中,tryAdvance将当前位置的Character传给了Consumer,并让位置加一。作为参数传递的Consumer是一个Java内部类,在遍历流时将要处理的Character传给了一系列要对其执行的函数。这里传递给了accumulate()
  • trySplit方法定义了拆分要遍历的数据结构的逻辑。首先要设定不再进一步拆分的下限,以避免生成太多的任务。拆分时,一旦找到合适的位置,就可以创建一个新的Spliterator来遍历从当前位置到拆分位置的子串。
  • estimatedSize是这个Spliterator解析的String的总长度和当前遍历位置的差。

Java8实战笔记0x02

用流收集数据

归约和汇总

查找流中的最大值和最小值

  • Collectors.maxByCollectors.minBy可以计算流中的最大或最小值。这两个收集器接收一个Comparator参数来比较流中的元素。
Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCalorieDish = menu.stream()
    .collect(Collectors.maxBy(DishCaloriesComparator));

汇总

  • Collectors类专门为汇总提供了一个工厂方法Collectors.summingInt。它可接受一个把对象映射为求和所需int的函数,并返回一个收集器;该收集器在传递普通的collect方法后即执行所需要的汇总操作。类似的还有Collectors.summingLongCollectors.summingDouble
int totalCalories = menu.stream().collect(summingInt(Dish:getCalories));
  • Collectors.averagingInt,以及对应的Collectors.averagingLongCollectors.averagingDouble可以计算数值的平均数。
double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
  • summarizingInt工厂方法返回的收集器可以一次操作就得到流中元素的个数、所选值的总和、平均值、最大值、最小值。
IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));

连接字符串

  • joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。
String shortMenu = menu.stream().map(Dish::getName).collect(joining())
  • joining在内部使用了StringBuilder来把生成的字符串逐个追加起来。此外,如果对象有toString()方法,那么可以省略map映射。
String shortMenu = menu.stream().collect(joining());
  • joining()还接受一个字符串作为分界符。
String shortMenu = menu.stream().map(Dish::getName).collect(joining(", "));

广义的归约汇总

  • reduce工厂方法则是所有归约情况的一般化,它需要三个参数:初始值、转换函数、累积函数(将两个项目累积成一个同类型的值)。
int totalCalories = menu.stream().collect(reducing(0, Dish::getCalories, Integer::sum));

分组

  • 可以使用Collectors.groupingBy工厂方法返回的收集器来实现元素分组。
Map<Dish.Type, List<Dish>> dishesByType = menu.stream(groupingBy(Dish::getType));
  • 在上面代码中,groupingBy方法接受一个分类函数,用于提取元素的类别从而对元素进行分组。分组的操作结果是一个Map,分组函数返回的值作为映射的键,流中所有具有这个分类值的项目的列表作为对应的映射值。如果没有现成的获取元素类别的函数,可以传入Lambda。
public enum ColaricLevel { DIET, NORMAL, FAT }

Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
    groupingBy(dish -> {
        if (dish.getCalories() <= 400) return CaloricLevel.DIET;
        else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
        else return CaloricLevel.FAT;
    })
);

多级分组

  • Collectors.groupingBy有双参数版本,第二个参数为collector类型。进行二级分组时,可以把一个内层groupingBy传递给外层groupingBy,并定义一个为流中项目分类的二级标准。
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream().collect(
    groupingBy(Dish::getType, groupingBy(dish -> {
        if (dish.getCalories() <= 400) return CaloricLevel.DIET;
        else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
        else return CaloricLevel.FAT;
    }))
);

按子组收集数据

  • groupingBy的第二个参数也可以是其它collect
// 分组求和
Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));

// 找出分组最大值
Map<Dish.Type, Optional<Dish>> mostCaloricByType = menu.stream().collect(groupingBy(
    Dish::getType,
    maxBy(comparingInt(Dish::getCalories))
));
  • Collectors.collectingAndThen可以把收集器返回的结果转换为另一种类型。
Map<Dish.Type, Dish> mostCaloricByType = menu.stream().collect(groupingBy(
    Dish::getType,
    collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)
));

分区

  • 分区是分组的特殊情况:由一个谓词作为分类函数,它成为分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组,true一组false一组。类似groupingBy可以进行多级分区。
Map<Boolean, List<Dish>> partitionedMenu = menu.stream().collect(partitioningBy(Dish::isVegetarian));

收集器接口

// Collector接口定义
public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    Function<A, R> finisher();
    BinaryOperator<A> combiner();
    Set<Characteristics> characteristics();
}

接口解析

泛型

  • T是流中要收集的项目的泛型;A是累加器的类型,累加器是在收集过程中用于累积部分结果的对象;R是收集操作得到的对象(一般情况下是集合,但也有可能不是)。

方法

  • supplier方法:用于创建一个空的累加器实例,供数据收集过程使用
public Supplier<List<T>> suplier() {
    return () -> new ArrayList<T>;
}
// 或者传递构造函数引用
public Supplier<List<T>> suplier() {
    return ArrayList::new;
}
  • accumulator方法:返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行时会有两个参数,保存归约结果的累加器(已收集了流中的n - 1个项目),以及第n个元素本身。该函数将返回void,因为是累加器的原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。
public BiConsumer<List<T>, T> accumulator() {
    return (list, item) -> list.add(item);
}
// 或者传递构造函数引用
public BiConsumer<List<T>, T> accumulator() {
    return List::add;
}
  • finisher方法:在遍历完流后,finisher方法必须返回累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果。如果不需要进行转换,可以返回Function.identity(),该函数直接返回对象本身,即return t -> t;
public Function<List<T>, List<T>> finisher() {
    return Function.identity();
}
  • combiner方法:返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得到的累加器要如何合并。
public BinaryOperator<List<T>> combiner() {
    return (list1, list2) -> {
        list1.addAll(list2);
        return list1;
    }
}
  • 并行归约流程
  1. 原始流会以递归方式拆分为子流,直到定义流是否需要进一步拆分的一个条件为非。
  2. 对所有的子流并行处理,即对每个子流应用归约算法。
  3. 使用收集器combiner方法返回的函数,将所有的部分结果两两合并。
  • characteristics方法:返回一个不可变的Characteristics集合,它定义了收集器的行为,尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。Characteristics是一个包含三个项目的枚举:
    1. UNORDERED:归约结果不受流中项目的遍历和累积顺序的影响
    2. CONCURRENTaccumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源才可以并行归约。
    3. IDENTITY_FINISH:这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,类累加器对象将会直接用作归约过程的最终结果。这也就意味着,将累加器A不加检查地转换为结果R是安全的。

进行自定义收集而不实现接口

  • collector的一个重载版本可以接受三个参数,supplieraccumulatorcombiner
List<Dish> dishes = menuStream.collect(
    ArrayList::new,
    List::add,
    List::addAll
);

Java编程思想笔记0x17

并发(四)

死锁

  • 任务之间相互等待的连续循环,没有哪个线程能够继续执行,即死锁。
  • 死锁发生的条件:
    1. 互斥条件。任务使用的资源中至少有一个是不能共享的。
    2. 允许持有资源等待。至少有一个任务必须持有一个资源并且正在等待获取一个当前被别的任务持有的资源。
    3. 资源不能被抢占,任务必须把资源释放当作普通事件。
    4. 循环等待。一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源,使得所有任务都被锁住。

部分类库的构件

CountDownLatch

  • CountDownLatch用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
  • 可以向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用wait()的方法都将阻塞,直至这个计数值到达0。其他任务结束其工作时,可以在该对象上调用countDown()来减小这个计数值。CountDownLatch设计为只触发一次,计数值不能被重置。
  • 调用countDown()的任务在产生这个调用时并没有被阻塞,只有对await()的调用会被阻塞,直至计数值到达0。
class TaskPortion implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private static Random random = new Random(47);
    private final CountDownLatch latch;

    public TaskPortion(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            doWork();
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doWork() throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
        System.out.println(this + "completed");
    }

    @Override
    public String toString() {
        return String.format("%1$-3d ", id);
    }
}

class WaitingTask implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch latch;

    public WaitingTask(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            latch.await();
            System.out.println("Latch barrier passed for " + this);
        } catch (InterruptedException e) {
            System.out.println(this + " interrupted");
        }
    }

    @Override
    public String toString() {
        return String.format("WaitingTask %1$-3d", id);
    }
}
public class Main {

    static final int SIZE = 100;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(SIZE);
        for (int i = 0; i < 10; i++)
            exec.execute(new WaitingTask(latch));
        for (int i = 0; i < SIZE; i++)
            exec.execute(new TaskPortion(latch));
        System.out.println("Launched all tasks");
        exec.shutdown();
    }
}

CyclicBarrier

  • 一组任务并行执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动,非常像CountDownLatch,只是CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用。
class Horse implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random random = new Random(47);
    private static CyclicBarrier cyclicBarrier;
    public Horse(CyclicBarrier cyclicBarrier) {
        Horse.cyclicBarrier = cyclicBarrier;
    }
    public synchronized int getStrides() {
        return strides;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    strides += random.nextInt(3);
                }
                cyclicBarrier.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return "Horse " + id + " ";
    }
    public String tracks() {
        return "*".repeat(Math.max(0, getStrides())) + id;
    }
}
public class Main {
    private static final int FINISH_LINE = 75;
    private List<Horse> horseList = new ArrayList<>();
    private ExecutorService exec = Executors.newCachedThreadPool();

    private Main(int nHorses, final int pause) {
        CyclicBarrier cyclicBarrie = new CyclicBarrier(nHorses, () -> {
            System.out.println("=".repeat(FINISH_LINE));
            for (Horse horse : horseList)
                System.out.println(horse.tracks());
            for (Horse horse : horseList)
                if (horse.getStrides() >= FINISH_LINE) {
                    System.out.println(horse + "won!");
                    exec.shutdownNow();
                    return;
                }
            try {
                TimeUnit.MILLISECONDS.sleep(pause);
            } catch (InterruptedException e) {
                System.out.println("barrier action sleep");
            }
        });
        for (int i = 0; i < nHorses; i++ ) {
            Horse horse = new Horse(cyclicBarrie);
            horseList.add(horse);
            exec.execute(horse);
        }
    }
    public static void main(String[] args) throws Exception {
        int nHorses = 7;
        int pause = 200;
        new Main(nHorses, pause);
    }
}
  • 可以向CyclicBarrier提供一个“栅栏动作”,一个Runnable,当计数值到达0时自动执行。这是与CountDownLatch的另一个区别。

DelayQueue

  • 一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有任何头元素,并且poll()将返回null
  • 延迟到期时间略带误导,指设定的到期时间减去当前时间,因此DelayQueue中元素排列是按设定时间从小到大排列。
class DelayedTask implements Runnable, Delayed {
    private static int counter = 0;
    private final int id = counter++;
    private final int delta;
    private final long trigger;
    protected static List<DelayedTask> sequence = new ArrayList<>();

    public DelayedTask(int delta) {
        this.delta = delta;
        trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(this.delta, TimeUnit.MILLISECONDS);
        sequence.add(this);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed arg) {
        DelayedTask that = (DelayedTask) arg;
        return Long.compare(trigger, that.trigger);
    }

    @Override
    public void run() {
        System.out.println(this + " ");
    }

    @Override
    public String toString() {
        return String.format("[%1$-4d]", delta) + " Task " + id;
    }

    public String summary() {
        return "(" + id + ": " + delta + ")";
    }

    public static class EndSentinel extends DelayedTask {
        private ExecutorService exec;
        public EndSentinel(int delay, ExecutorService e) {
            super(delay);
            exec = e;
        }

        @Override

        public void run() {
            for (DelayedTask pt: sequence) {
                System.out.println(pt.summary() + " ");
            }
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class DelayedTaskConsumer implements Runnable {
    private DelayQueue<DelayedTask> q;

    public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
        this.q = q;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted())
                q.take().run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished DelayedTaskConsumer");
    }
}
public class Main {

    public static void main(String[] args) throws Exception {
        Random random = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        for (int i = 0; i < 20; i++)
            queue.put(new DelayedTask(random.nextInt(5000)));
        queue.add(new DelayedTask.EndSentinel(5000, exec));
        exec.execute(new DelayedTaskConsumer(queue));
    }
}
  • Delayed接口有一个方法名为getDelay(),它可以用来告知延迟到期有多长时间,或者到场时间之前已经到期。比起BlockingQueueDelayQueue#take()会在没有元素或者没有元素到期时阻塞。

PriorityBlockingQueue

  • 与上面DelayQueue类似,只不过优先级是人为指定。
class PriorizedTask implements Runnable, Comparable<PriorizedTask> {
    private Random random = new Random(47);
    private static int counter = 0;
    private final int id  = counter++;
    private final int priority;
    protected static List<PriorizedTask> sequence = new ArrayList<>();
    public PriorizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }

    @Override
    public int compareTo(PriorizedTask o) {
        return Integer.compare(priority, o.priority);
    }

    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this);
    }

    @Override
    public String toString() {
        return String.format("[%1$-3d]", priority) + " Task " + id;
    }

    public String summary() {
        return "(" + id + ": " + priority + ")";
    }

    public static class EndSentinel extends PriorizedTask {
        private ExecutorService exec;
        public EndSentinel(ExecutorService exec) {
            super(-1);
            this.exec = exec;
        }

        @Override
        public void run() {
            int count = 0;
            for (PriorizedTask pt: sequence) {
                System.out.println(pt.summary());
                if (++count % 5 == 0) System.out.println();
            }
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Random random = new Random(47);
    private Queue<Runnable> queue;
    private ExecutorService exec;

    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }

    @Override
    public void run() {
        for (int i = 0; i <20; i++) {
            queue.add(new PriorizedTask(random.nextInt(10)));
            Thread.yield();
        }
        try {
            for (int i = 0; i < 10; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PriorizedTask(10));
            }
            for (int i = 0; i < 10; i++) {
                queue.add(new PriorizedTask(i));
            }
            queue.add(new PriorizedTask.EndSentinel(exec));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished PrioritizedTaskProducer");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> q;

    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
        this.q = q;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                q.take().run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished PrioritizedTaskConsumer");
    }
}
public class Main {

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

ScheduledExecutorService

  • ScheduledThreadPoolExecutor提供schedule()(运行一次任务)和scheduleAtFixedRate()(可以设定初次延迟和再次运行间隔时间)来自动调度Runnable对象。

Semaphore

public class Main {

    private static final int COUNT = 40;
    private static Executor executor = Executors.newFixedThreadPool(COUNT);
    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < COUNT; i++) {
            executor.execute(new Main.Task());
        }
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
                // 使用资源
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Exchanger

  • 可以让两个线程(只能是两个)交换一个对象。先调用exchange()的线程阻塞,直到另一个线程也调用exchange()
class Producer<T> extends Thread {
    private List<T> list = new ArrayList<>();
    private Exchanger<List<T>> exchanger;
    private Supplier<T> supplier;

    public Producer(Exchanger<List<T>> exchanger, Supplier<T> supplier) {
        super();
        this.exchanger = exchanger;
        this.supplier = supplier;
    }

    @Override
    public void run() {
        list.clear();
        for (int i = 0; i < 5; i++) {
            list.add(supplier.get());
        }
        try {
            list = exchanger.exchange(list);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer<T> extends Thread {
    private List<T> list = new ArrayList<>();
    private Exchanger<List<T>> exchanger;

    public Consumer(Exchanger<List<T>> exchanger) {
        super();
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        try {
            list = exchanger.exchange(list);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.print("From Producer: ");
        System.out.print(list);
    }
}

public class Main {

    public static void main(String[] args) {
        Exchanger<List<Integer>> exchanger = new Exchanger<>();
        Random random = new Random(47);
        new Consumer<>(exchanger).start();
        new Producer<>(exchanger, () -> random.nextInt(100)).start();
    }
}

Java编程思想笔记0x16

并发(三)

线程之间的协作

wait()与notifyAll()

  • wait()会将任务挂起,并且只有notify()notifyAll()发生时,即表示发生了某些感兴趣的食物,这个任务才会被唤醒并去检查所产生的变化。因此,wait()提供了一种在任务之间对活动同步对方式。
  • sleep()的区别:
    1. wait()期间对象锁时释放的
    2. 可以通过notify()notifyAll(),或者令时间到期,从wait()中恢复执行
  • 调用sleep()yield()时锁并不会释放。
  • wait()notify()notifyAll()是基类Object的一部分,而不是Thread的一部分。
  • wait()notify()notifyAll()可以并且只能同步控制方法或者同步控制块中调用,如果在非同步部分调用会得到IllegalMonitorStateException异常,即调用这些方法必须拥有(或者)对象的锁。

错失信号

// T1
synchronized(sharedMonitor) {
    shareMonitor.notify();
}
// T2
while (trye) {
    synchronized(shareMonitor) {
        sharedMonitor.wait();
    }
}
  • 在上面代码中,如果先执行了T1中的notify()后执行T2中的wait(),此时T2已经错失了T1发来的信号,从而产生了死锁。T2的改进方法见下面代码。
synchronized (sharedMonitor) {
    while (someCondition) sharedMonitor.wait();
}
  • 此时如果T1先执行,由于条件改变,T2就不会进入wait()。此外,这种方式还能防止被错误唤醒,如果被错误唤醒但还满足等待条件时会继续进入wait()
  • notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒。

使用wait()和notifyAll()进行线程协作

class Meal {
    private final int orderNum;

    public Meal(int orderNum) {
        this.orderNum = orderNum;
    }

    @Override
    public String toString() {
        return "Meal " + orderNum;
    }
}

class WaitPerson implements Runnable {
    private Restaurant restaurant;

    public WaitPerson(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal == null)
                        wait(); 
                }
                System.out.println("Waitperson got " + restaurant.meal);
                synchronized (restaurant.chef) {
                    restaurant.meal = null;
                    restaurant.chef.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            System.out.println("WaitPerson interrupted");
        }
    }
}

class Chef implements Runnable {
    private Restaurant restaurant;
    private int count = 0;

    public Chef(Restaurant restaurant) {
        this.restaurant = restaurant;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal != null)
                        wait();
                }
                if (++count == 10) {
                    System.out.println("Out of food, closing");
                    restaurant.exec.shutdownNow();
                }
                System.out.println("Order up!");
                synchronized (restaurant.waitPerson) {
                    restaurant.meal = new Meal(count);
                    restaurant.waitPerson.notifyAll();
                }
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println("Chef interrupted");
        }
    }
}

class Restaurant {
    public Meal meal;
    ExecutorService exec = Executors.newCachedThreadPool();
    WaitPerson waitPerson = new WaitPerson(this);
    Chef chef = new Chef(this);
    public Restaurant() {
        exec.execute(chef);
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        exec.execute(waitPerson);
    }
}

public class Main {

    public static void main(String[] args) throws InterruptedException {
        new Restaurant();
    }
}
  • 上面代码使用wait()notifyAll()的主要原因是减少资源竞争,从而降低CPU资源使用。实际上单生产者单消费者在不考虑资源使用的情况下,是没有必要加锁的。
  • 上面代码中只有一个任务,理论上可以调用notify()而不是notifyAll()。但是,在更复杂的情况下,可能会有多个任务在某个特定的对象锁上等待,因此无法知道哪个任务应该被唤醒。因此,调用notifyAll()要更安全一些,这样可以唤醒等待这个锁的所有任务,而每个任务都必须决定这个通知是否与自己相关。(因为获取对象锁之后,线程的行为并不确定,因此应当使用notifyAll()来唤醒所有线程,每个线程来检查自己是否应当被唤醒。

使用显式的Lock对象和Condition对象

  • 可以使用显式的操作来进行同步与协作。使用互斥并允许任务挂起的基本类时Condition,可以通过在Condition上调用await()来挂起一个任务。当外部条件发生变化,意味着某个任务应该继续执行时,可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者调用signalAll()来唤醒所有在这个Condition上被其自身挂起的任务。
class Car {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private boolean waxOn = false;

    public void waxed() {
        lock.lock();
        try {
            waxOn = true;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void buffed() {
        lock.lock();
        try {
            waxOn = false;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void waitForWaxing() throws InterruptedException {
        lock.lock();
        try {
            while (!waxOn)
                condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void waitForBuffing() throws InterruptedException {
        lock.lock();
        try {
            while (waxOn)
                condition.await();
        } finally {
            lock.unlock();
        }
    }
}

class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car car) {
        this.car = car;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.println("Wax on");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Exiting Wax On task");
    }
}

class WaxOff implements Runnable {
    private Car car;

    public WaxOff(Car car) {
        this.car = car;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                car.waitForWaxing();
                System.out.println("Wax Off!");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch (InterruptedException e) {
            System.out.println("Exiting via interrupt");
        }
        System.out.println("Exiting Wax Off task");
    }
}

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}
  • 单个Lock将产生一个Condition对象,这个对象用可以管理任务之间的通信。但是,Condition对象不含任何有关处理状态的信息,因此需要额外的变量来负责表示处理状态的信息,如上面代码中的waxOn

生产者-消费者队列

  • 同步队列时更高级别的抽象,同样可以解决任务协作问题,同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这种队列。其实现LinkedBlockingQueue是一个无界队列,ArrayBlockingQueue则有固定尺寸。
class Toast {
    public enum Status {DRY, BUTTERED, JAMMED}

    private Status status = Status.DRY;
    private final int id;

    public Toast(int idn) {
        id = idn;
    }

    public void butter() {
        status = Status.BUTTERED;
    }

    public void jam() {
        status = Status.JAMMED;
    }

    public Status getStatus() {
        return status;
    }

    public int getId() {
        return id;
    }

    public String toString() {
        return "Toast " + id + ": " + status;
    }
}

class ToastQueue extends LinkedBlockingQueue<Toast> {
}

class Toaster implements Runnable {
    private ToastQueue toasts;
    private int count = 0;
    private Random random = new Random(47);

    public Toaster(ToastQueue toasts) {
        this.toasts = toasts;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));
                Toast t = new Toast(count++);
                System.out.println(t);
                toasts.put(t);
            }
        } catch (InterruptedException e) {
            System.out.println("Toaster interrupted");
        }
        System.out.println("Toaster off ");
    }
}

class Butterer implements Runnable {
    private ToastQueue dryQueue, butteredQueue;

    public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
        this.dryQueue = dryQueue;
        this.butteredQueue = butteredQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Toast t = dryQueue.take();
                t.butter();
                System.out.println(t);
                butteredQueue.put(t);
            }
        } catch (InterruptedException e) {
            System.out.println("Butterer interrupted");
        }
        System.out.println("Butterer off");
    }
}

class Jammer implements Runnable {
    private ToastQueue butterQueue, finishedQueue;

    public Jammer(ToastQueue butterQueue, ToastQueue finishedQueue) {
        this.butterQueue = butterQueue;
        this.finishedQueue = finishedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Toast t = butterQueue.take();
                t.jam();
                System.out.println(t);
                finishedQueue.put(t);
            }
        } catch (InterruptedException e) {
            System.out.println("Jammer interrupted");
        }
        System.out.println("Jammer off");
    }
}

class Eater implements Runnable {
    private ToastQueue finishedQueue;
    private int count = 0;

    public Eater(ToastQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                Toast t = finishedQueue.take();
                if (t.getId() != count++ || t.getStatus() != Toast.Status.JAMMED) {
                    System.out.println(">>>>>Error: " + t);
                    System.exit(1);
                } else System.out.println("Chomp! " + t);
            }
        } catch (InterruptedException e) {
            System.out.println("Eater interrupted");
        }
        System.out.println("Eater off");
    }
}

public class Main {

    public static void main(String[] args) throws InterruptedException {
        ToastQueue dryQueue = new ToastQueue(),
                butteredQueue = new ToastQueue(),
                finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

任务间使用管道进行输入输出

  • 以管道的形式在线程间进行输入输出也很有用,在Java中对应为PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道中读)。
class Sender implements Runnable {
    private Random random = new Random(47);
    private PipedWriter out = new PipedWriter();

    public PipedWriter getOut() {
        return out;
    }

    @Override
    public void run() {
        try {
            while (true) {
                for (char c = 'A'; c <= 'z'; c++) {
                    out.write(c);
                    TimeUnit.MILLISECONDS.sleep(500);
                }
            }
        } catch (IOException e) {
            System.out.println(e + " Sender write exception");
        } catch (InterruptedException e) {
            System.out.println(e + " Sender sleep interrupted");
        }
    }
}

class Receiver implements Runnable {
    private PipedReader in;

    public Receiver(Sender sender) throws IOException {
        in = new PipedReader(sender.getOut());
    }

    @Override
    public void run() {
        try {
            while (true) {
                System.out.println("Read: " + (char) in.read() + ", ");
            }
        } catch (IOException e) {
            System.out.println(e + " Receiver read exception");
        }
    }
}

public class Main {

    public static void main(String[] args) throws InterruptedException, IOException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(receiver);
        TimeUnit.SECONDS.sleep(4);
        exec.shutdownNow();
    }
}