Параллельное программирование в Java: Конспект. Потоки, синхронизация и ошибки.

В этом конспекте я соберу основные понятия параллельного программирования в Java: потоки и их жизненный цикл. Приведу примеры программ с проблемами, вроде дэдлоков или data race, и покажу, как их решать, например, с помощью мьютексов и локов.

Потоки и процессы

Процесс – это экземпляр программы, выполняющейся на компьютере, и состоит из кода программы, данных и информации о состоянии. Каждый процесс имеет собственное независимое адресное пространство в памяти.

Потоки – это более мелкие элементы, существующие внутри процесса. Они представляют собой независимые пути выполнения программы – базовые единицы, которые нужны для выполнения задач. Несколько потоков в рамках одного процесса используют одно и то же адресное пространство и делят ресурсы.

Какая разница между потоками и процессами? Процессы изолированы друг от друга, у каждого из них собственное адресное пространство, а потоки в рамках одного процесса могут легко обмениваться ресурсами и взаимодействовать. Конечно, межпроцессное взаимодействие тоже возможно, но его намного сложнее устроить.

Также создание потоков быстрее и требует меньше ресурсов, чем создание и завершение процессов. Переключение между потоками одного процесса также происходит быстрее.

Например, работа с анализами крови в лаборатории – это процесс. Если в больнице две лаборатории, они действуют независимо друг от друга и не знают ничего о данных, которые получают их коллеги. В каждой лаборатории процесс можно разбить на несколько потоков: каждый лаборант может независимо от другого (параллельно) работать с несколькими сэмплами. Они могут общаться, а начальник лаборатории (главный поток) даже может устанавливать приоритеты.

Жизненный цикл потока

Поток в Java – экземпляр класса Thread.

Изначально новый процесс начинается с одного «основного потока», который может создавать другие дочерние потоки. Они выполняются в рамках всё того же процесса и тоже могут создавать своих потомков. Потоки уведомляют родительский поток о завершении работы, и обычно основной поток завершает работу последним.

Основные состояния потоков в Java:

New → Runnable → Terminated

  • В состоянии New поток только был создан, и ему уже соответсвует функция для выполнения, но он пока ещё не запущен и не потребляет ресурсы.
  • В состоянии Runnable поток выполняется и потребляет ресурсы.
  • И в состоянии Terminated поток уже снова ничего не потребляет, так как он завершил своё выполнение. Это может произойти, если он справился со всеми своими задачами или грубо завершён извне.

Дополнительно можно выделить состояния Blocked, Waiting. Timed_Waiting.

  • В состоянии Blocked поток прекращает выполнение в ожидании монитора. Он временно не потребляет ресурсы, но потом возвращается в статус Runnable.
  • Waiting и Timed_Waiting похожи на Blocked по смыслу. В первом состоянии поток ждёт неопределённое время, пока другой поток не выполнит какое-то действие. Во втором состоянии время определено. Поток может спать или ждать другой поток, но с таймаутом.

Работа с несколькими потоками. Join. Daemon-потоки

Пришло время примера. В главном потоке программы создам один дочерний поток, который ничего дельного не совершит, но зато пару секунд поспи (Thread.sleep(n)). В это время главный поток тоже вздремнёт, а потом будет ожидать завершения дочернего потока (метод join()). Этот метод указывает, что поток не может продолжать своё выполнение, пока указанный дочерний поток не завершит своё. То есть главный поток уходит из состояния Runnable в Waiting.

class ChildThread extends Thread {
    @Override
    public void run() {
        System.out.println("Дочерний поток начал выполнение.");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Дочерний поток завершил выполнение.");
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Главный поток начал выполнение.");
        Thread child = new ChildThread();
        System.out.println("Создан дочерний поток. Его статус: " + child.getState());
        child.start();
        System.out.println("Главный поток делает своё дело. Тем временем дочерний поток " + child.getState());
        Thread.sleep(400);
        System.out.println("Жду завершения дочернего потока. Его статус: " + child.getState());
        child.join();
        System.out.println("Дочерний поток завершён. Его статус: " + child.getState());
    }
}

На выводе получаем:

Главный поток начал выполнение.
Создан дочерний поток. Его статус: NEW
Главный поток делает своё дело. Тем временем дочерний поток RUNNABLE
Дочерний поток начал выполнение.
Жду завершения дочернего потока. Его статус: TIMED_WAITING
Дочерний поток завершил выполнение.
Дочерний поток завершён. Его статус: TERMINATED

В этом примере увидели все основные состояние дочернего потока. Во время sleep() он уходит в состояние Timed_Waiting. Из-за join() родительский поток ожидает его завершения, поэтому потом он уже Terminated. Но что будет, если join() убрать?

Главный поток начал выполнение.
Создан дочерний поток. Его статус: NEW
Главный поток делает своё дело. Тем временем дочерний поток RUNNABLE
Дочерний поток начал выполнение.
Жду завершения дочернего потока. Его статус: TIMED_WAITING
Дочерний поток всё ещё не завершён, хотя родительский готов завершиться. Его статус: TIMED_WAITING
Дочерний поток завершил выполнение.

Тут мы видим подтверждение тому, что родительский поток не может завершиться, пока все его дочерние потоки не завершили выполнения. Когда главный поток уже всё сделал, дочерний всё ещё спит в состоянии Timed_Waiting.

Но что если нам очень хочется, чтобы главный поток не ждал дочерний? Нужно сделать его Deamon-потоком.

Deamon-поток – фоновый поток, который не задержит процесс от завершения. Задаётся он методом setDaemon(true).

Thread child = new ChildThread();
child.setDaemon(true);

Теперь в выводе получаем:

Главный поток начал выполнение.
Создан дочерний поток. Его статус: NEW
Главный поток делает своё дело. Тем временем дочерний поток RUNNABLE
Дочерний поток начал выполнение.
Жду завершения дочернего потока. Его статус: TIMED_WAITING
Дочерний поток всё ещё не завершён, но нас это не волнует. Его статус: TIMED_WAITING

Как видно из этого вывода, дочерний поток так и не успел завершиться, но программа всё равно закончила выполнение. Поэтому с такими потоками нужно быть предельно осторожными. Если такой поток работает с данными (записывает что-то в файл, например), то в итоге целостность может нарушиться.

Thread vs Runnable

Кроме потоков Thread в джаве также есть интерфейс Runnable, который Thread реализует. Для его работы нам нужно описать всё тот же метод run(). Поток можно создать, передав в его конструктор объект Runnable.

class ChildThread implements Runnable {
    @Override
    public void run() {
        <...>
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Главный поток начал выполнение.");
        Thread child = new Thread(new ChildThread());
<...>

Этот интерфейс удобно использовать, если поток будет наследником другого класса. В джаве нет множественного наследования, поэтому extends Thread исключает наследование от какого-то другого класса, а вот Runnable позволяет это сделать. В остальном различий практически нет.

Важно не забывать, что запускаются потоки методом start() – если вызвать метод run(), то они просто выполнятся в рамках текущего потока, а не станут отдельными потомками.

Data Race и Mutual exclusion

Data race (гонка данных) – ситуация, которая возникает, когда несколько потоков работают с одной и той же переменной без синхронизации. Например, поле класса хранит информацию о балансе на счету. В то время, когда один из потоков захочет изменить эту переменную, другой поток сделает это быстрее, и его изменения просто потеряются.

Происходит такая ситуация, потому что изменение данных не мгновенно. Потоку нужно считать данные из памяти, преобразовать их и вернуть обратно новое значение.

Чтобы проиллюстрировать такую ситуацию, я создал класс MoneyEarner, который в цикле начисляет на баланс 10 пунктов. В главном методе создаю три таких потока и запускаю их. После завершения всех трёх вывожу баланс.

class MoneyEarner extends Thread {
    static int balance = 0;
    @Override
    public void run() {
        for (int i = 0; i < 10; i++)
            balance++;
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread first = new MoneyEarner();
        Thread second = new MoneyEarner();
        Thread third = new MoneyEarner();
        first.start();
        second.start();
        third.start();
        first.join();
        second.join();
        third.join();
        System.out.println("Баланс теперь " + MoneyEarner.balance);
    }
}

В этом примере я получил 30, как и ожидалось. Но что будет, если увеличить количество итераций до 10 000?

Баланс теперь 18349

Очевидно, что это неверное значение. При этом при каждом запуске оно будет меняться.

Баланс теперь 13069 – ещё меньше.

Баланс теперь 12783 – и ещё меньше.

Наш пользователь теряет больше половины пунктов, которые ему полагаются.

Как можно решить эту проблему? В джаве есть несколько способов: использовать Lock, сделать метод или фрагмент кода synchronized, или превратить переменную в Atomic.

Lock-и

Lock позволяет «запирать» данные, с которыми мы работаем, и не позволяет другим потокам в это время с ними работать. Если лок уже взят одним из потоков, другой поток уже не сможет залочить его и вынужден ждать, пока его не отпустят.

В джаве Lock – интерфейс, который мы не можем использовать как таковой, но зато есть прекрасная реализация ReentrantLock. Такой лок может быть «залочен» одним потоком несколько раз.

Добавлю такой лок в MoneyEarner.

class MoneyEarner extends Thread {
    static int balance = 0;
    static Lock lock = new ReentrantLock();
    @Override
    public void run() {
        lock.lock();
        for (int i = 0; i < 10000; i++) {
            balance++;
        }
        lock.unlock();
    }
}

В этом коде я создал статический лок, чтобы все экземпляры этого класса могли работать с одним и тем же объектом (иначе не сработает). И пока другие потоки ждут своей очереди захватить лок в статусе Waiting, один может спокойно менять переменную balance.

В итоге получаем правильный результат (30000), но правильно ли мы разместили локи и анлоки? В этом коде каждый поток берёт лок лишь раз, после этого выполняет 10 000 увеличений баланса, а потом только отпускает этот лок. Правильная ли это критическая секция?

Критическая секция – это часть программы, которая обращается к общему ресурсу (в моём примере, к статической переменной, которая по определению одна на все экземпляры класса). Необходимо определить эту критическую секцию и сделать её как можно меньше, то есть включить в неё только те моменты, в которых действительно происходит работа с данными. Если сделать эту секцию слишком большой, то все потоки будут зазря ждать, пока выполнятся операции, которые они совсем не должны ждать .

Представленный код у меня выполняется очень быстро: за 1 миллисекунду.

Но если я попробую залочить инкремент, а не весь цикл, то время увеличивается до 7-9 миллисекунд, так как вместо 10000 инкрементов и 2 операций с локом мы получаем уже 10000 инкрементов и ещё 20000 операций с локом.

@Override
public void run() {
    for (int i = 0; i < 10000; i++) {
        lock.lock();
        balance++;
        lock.unlock();
    }
}

Получается, что в этом примере изначальная тактика была лучше. Но что если добавить какую-нибудь ещё операцию? Например попросить поток спать 1 миллисекунду после каждого увеличения. Такие потоки завершатся за 53169 миллисекунд:

@Override
public void run() {
    lock.lock();
    for (int i = 0; i < 10000; i++) {
        balance++;
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    lock.unlock();
}

Если переставить лок и сократить нашу критическую секцию (ведь сон потока вообще никак не задействует эту «опасную» переменную), то время уменьшается втрое: 17953 миллисекунд:

@Override
public void run() {
    for (int i = 0; i < 10000; i++) {
        lock.lock();
        balance++;
        lock.unlock();
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Поэтому очень важно правильно определять, какие же операции нужно залочить. Это умение очень пригодится при работе с synchronized блоками, которые опишу ниже. Но сначала взглянем на другие типы локов.

Try-lock

Try-lock позволяет потоку выполнять другие действия вместо праздного ожидания. Вместо метода lock() необходимо использовать метод trylock(), который возвращает булево значение. True, если потоку удалось захватить лок, и false в противном случае.

В примере поэтому добавлю условие if(lock.tryLock()), которое отправит поток на увеличение баланса, когда лок удалось взять, и попрошу его заняться чем-то менее полезным в противном случае. В моём примере это болтовня и сон – собственно, так бы поступил любой ответственный работник, когда не может начать своё главное задание.

Я также добавил цикл while(true), чтобы поток пытался заполучить лок не один раз, а до выполнения задачи. После этого выхожу из цикла с помощью break;.

@Override
public void run() {
    while(true) {
        if (lock.tryLock()) {
            System.out.println("Готов к работе. " + this.getId());
            for (int i = 0; i < 10000; i++)
                balance++;
            lock.unlock();
            System.out.println("Завершил работу. " + this.getId());
            break;
        } else {
            System.out.println("делаю вид, что у меня есть работа, а на самом деле болтаю. " + this.getId());
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

На выводе получаю:

Готов к работе. 15
делаю вид, что у меня есть работа, а на самом деле болтаю. 17
делаю вид, что у меня есть работа, а на самом деле болтаю. 16
Завершил работу. 15
Готов к работе. 16
делаю вид, что у меня есть работа, а на самом деле болтаю. 17
Завершил работу. 16
Готов к работе. 17
Завершил работу. 17
Баланс теперь 30000

17 поток успел поболтать дважды, пока выполнялись потоки 15 и 16.

Read Write Lock

К обычному локу имеет доступ лишь один поток. Но что если у нас несколько потоков, которые всего лишь хотят прочитать значение, а не изменять его? Для этого существует Read-Write lock, у которого есть два режима:

  • Read: этот режим позволяет нескольким потокам читать одновременно
  • Write: этот режим оставляет доступ только для одного потока для записи.

Такой лок может быть полезен, если в программе больше потоков-читателей, чем писателей. В джаве ему соответствует класс ReentrantReadWriteLock.

Попробуем в примере выше позволить нескольким потокам читать баланс (например, банкоматам), но не давать им считывать данные во время изменения баланса. Для этого преобразуем класс MoneyManager (я его переименовал) так, чтобы потоки типа Writer могли изменять баланс, а остальные только читали его.

class MoneyManager extends Thread {
    static int balance = 0;
    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private String type;
    public MoneyManager(String type) {
        this.type = type;
    }
    @Override
    public void run() {
        if (type.equals("Writer")) {
            try {
                System.out.println("Начинаю изменять баланс. Поток " + this.getId());
                lock.writeLock().lock();
                for (int i = 0; i < 10000; i++) {
                    balance++;
                }
            } finally {
                lock.writeLock().unlock();
            }
            System.out.println("Закончил работу. Поток " + this.getId());
        } else {
            lock.readLock().lock();
            try {
                System.out.println("Поток " + this.getId() + " наблюдает за балансом: " + balance);
                System.out.println("Сейчас за балансом наблюдает сразу " + lock.getReadLockCount() + " потоков");
            } finally {
                lock.readLock().unlock();
            }
            try {
                Thread.sleep(1); 
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread first = new MoneyManager("Writer");
        Thread second = new MoneyManager("Writer");
        Thread third = new MoneyManager("Writer");
        for (int i = 0; i < 10; i++) {
            new MoneyManager("").start();
            Thread.sleep(2);
        }
        first.start();
        second.start();
        third.start();
        first.join();
        second.join();
        third.join();
        long finish = System.currentTimeMillis();
        System.out.println("Баланс теперь " + MoneyManager.balance);
    }
}

В главном методе я создаю десяток читателей, но всё ещё только три писателя. Читатели не только выводят баланс, но и показывают, сколько разных потоков одновременно заполучили readLock с помощью метода getReadLockCount(). И readLock и writeLock получаются вызовом соответствующих методов в классе ReentrantReadWriteLock.

Уфф… с локами закончили. Перейдём наконец-то к другим способам синхронизации.

Сихнронизация: synchronized блоки и методы

Одна из особенностей Java заключается в том, что каждый объект имеет встроенный (intrinsic) лок. Если потоку нужно эксклюзивно работать с данными объекта, он должен захватить этот лок перед доступом, а затем освободить его после завершения работы.

Java предлагает два способа использовать встроенные локи:

  • синхронизированные методы (synchronized)
  • синхронизированные блоки кода (synchronized(obj) { … })

Чтобы сделать метод синхронизированным, достаточно добавить ключевое слово synchronized: Когда поток вызывает такой метод, он автоматически захватывает лок объекта, на котором вызывается метод.

В примере выше можно исключить локи и просто создать такой метод:

class MoneyEarner extends Thread {
    static int balance = 0;
    @Override
    public void run() {
        incrementBalance();
    }
    private static synchronized void incrementBalance() {
        for (int i = 0; i < 10000; i++) {
            balance++;
        }
    }
}

Обратите внимание, что метод статический. Если убрать static, то метод станет обычным экземплярным методом, и каждый поток будет синхронизироваться на своём экземпляре, то есть data race не будет исключена.

Если мы не хотим создавать целый метод и просто обойдёмся синхронизированным блоком кода, то делаем следующим образом:

@Override
public void run() {
    synchronized (MoneyEarner.class) {
        for (int i = 0; i < 10000; i++) {
            balance++;
        }
    }
}

Опять же, нам нужен общий лок, поэтому пишем MoneyEarner.class, а не this.

Атомарные типы данных

Защищать переменные с помощью Lock или synchronized — это хороший способ обеспечить взаимное исключение, но для простых операций, таких как инкремент счётчика, существует более лёгкий и быстрый способ — использование атомарных переменных из пакета java.util.concurrent.atomic.

Преобразуем пример выше и будем использовать класс AtomicInteger. Так как это объект, то нужно его создать через new и передать в конструктор числовое значение. Инкремент в виде двух плюсов здесь не сработает – заменим его методом incrementAndGet(). Всё остальное произойдёт за кулисами.

class MoneyEarner extends Thread {
    static AtomicInteger balance = new AtomicInteger(0);
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            balance.incrementAndGet();
        }
    }
}

Вот так просто. Наверно, с этого стоило бы начинать, но мне хотелось сначала показать более изысканные способы обхождение гонки за данными, которые могут применяться в сложных программах.

Double, Long и разорванное чтение. Переменные volatile

В примерах на data race я показал, что изменение переменных не атомарно, и какой-то поток может продолжить работать с устаревшими значениями, хотя другой поток уже значение поменял.

Но в случае с типами double и long даже чтение не атомарно. Эти два примитивных типа занимают два машинных слова (то есть 8 байт), поэтому читать их приходится не за одну операцию, а за две. В теории может получиться ситуация, в которой поток считает первый байт старого значения, потом другой поток изменит всё значение, и первый поток считает второй байт уже нового значения.

public class ThreadsArticleDemo {

    static long sharedValue = 0L;

    public static void main(String[] args) {
        Thread writer = new Thread(() -> {
            while (true) {
                sharedValue = 0xFFFFFFFFFFFFFFFFL; 
                sharedValue = 0x0000000000000000L; 
            }
        });

        Thread reader = new Thread(() -> {
            while (true) {
                double value = sharedValue;
                if (value != 0 && value != -1L) {
                    System.out.println("Значение сломалось: " + value);
                }
            }
        });

        writer.start();
        reader.start();
    }
}

К сожалению (или к счастью) мне не удалось получить ни одного вывода. Современная JVM (особенно на 64-битных системах) стала безопаснее, и сломать long и double трудно. Но помнить о таком риске всё же стоит.

Чтобы чтение стало точно атомарным, нужно использовать ключевое слово volatile в объявлении переменной. Оно также указывает потокам, что кэшировать значение не надо (то есть поток не будет запоминать значение, которое может незаметно от него изменить другой поток, а этот об этом даже не узнает и продолжит работать с запомненной переменной). volatile гарантирует, что чтение и запись будет происходить напрямую из/в основную память.

Некоторые ошибочно считают, что volatile является альтернативой атомарным типам данных. Но нет, в этом случае только чтение атомарно. А запись уже нет, поэтому варианты с data race это слово не исключает.

volatile лучше всего использовать для флагов остановки потоков и прочих примитивных флагов и счётчиков, которые могут изменяться в нескольких потоках и влиять на их выполнение.

Ошибки в синхронизации. Локи, но другие.

В этом разделе посмотрим на ошибки, которые могут возникнуть, когда синхронизация настроена неверно. Тоже локи, но уже дэд, например. И методы решения этих проблем.

Deadlock. Взаимная блокировка.

Дэдлок возникает, когда несколько взаимно блокируют друг друга, пытаясь получить ресурсы, которые никогда не освободятся. В итоге выполнение программы останавливается, но она так и не завершается, и CPU при этом не потребляется.

Для имитации этого события создам в примере выше два лока в классе MoneyEarner. Предположим, что для выполнения операции потоку требуется сразу два. И создам ситуацию, в которой каждый из трёх потоков сначала захватывает свой лок, а потом обнаруживает, что второй лок, который ему требуется, уже захвачен. И что делать? Никто не собирается освобождать свой лок первым, чтобы дать завершить выполнение своим коллегам. Так и висят (пока писал этот абзац, ничего само собой не разрешилось, конечно).

class MoneyEarner extends Thread {
    private Lock lockA;
    private Lock lockB;
    static int balance = 0;
    MoneyEarner(Lock lockA, Lock lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }
    @Override
    public void run() {
        lockA.lock();
        for (int i = 0; i < 10000; i++) {
            lockB.lock();
            balance++;
            lockB.unlock();
        }
        lockA.unlock();
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        Lock firstLock = new ReentrantLock();
        Lock secondLock = new ReentrantLock();
        Lock thirdLock = new ReentrantLock();
        Thread first = new MoneyEarner(firstLock, secondLock);
        Thread second = new MoneyEarner(secondLock, thirdLock);
        Thread third = new MoneyEarner(thirdLock, firstLock);
        long start = System.currentTimeMillis();
        first.start();
        second.start();
        third.start();
        first.join();
        second.join();
        third.join();
        long finish = System.currentTimeMillis();
        System.out.println("Баланс теперь " + MoneyEarner.balance);
        System.out.println("Время выполнения программы " + (finish - start));
    }
}

Выход из такой ситуации – захватывать локи в одном порядке, не позволять ситуацию, где все локи захвачены разными потоками.

Thread first = new MoneyEarner(firstLock, secondLock);
Thread second = new MoneyEarner(secondLock, thirdLock);
Thread third = new MoneyEarner(firstLock, thirdLock);

Другой вариант – использовать уже знакомый tryLock, но поставить на него таймаут. Тогда, не дождавшись лок по истечении времени поток будет пробовать ещё раз (в моём примере они сначала сообщают о неудаче в консоль – это момент, когда в первой версии программы бы случился дэдлок).

class MoneyEarner extends Thread {
    private Lock lockA;
    private Lock lockB;
    static int balance = 0;
    MoneyEarner(Lock lockA, Lock lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }
    @Override
    public void run() {
        while(true) {
            try {
                if (lockA.tryLock(1, TimeUnit.MICROSECONDS)) {
                    try {
                        if (lockB.tryLock(1, TimeUnit.MICROSECONDS)) {
                            try {
                                for (int i = 0; i < 100000; i++) {
                                    balance++;
                                }
                                break;
                            } finally {
                                lockB.unlock();
                            }
                        } else {
                            System.out.println("Не удалось получить второй лок " + this.getId());
                        }
                    } finally {
                        lockA.unlock();
                    }
                } else {
                    System.out.println("Не удалось получить первый лок " + this.getId());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        Lock firstLock = new ReentrantLock();
        Lock secondLock = new ReentrantLock();
        Lock thirdLock = new ReentrantLock();
        Thread first = new MoneyEarner(firstLock, secondLock);
        Thread second = new MoneyEarner(secondLock, thirdLock);
        Thread third = new MoneyEarner(thirdLock, firstLock);
        long start = System.currentTimeMillis();
        first.start();
        second.start();
        third.start();
        first.join();
        second.join();
        third.join();
        long finish = System.currentTimeMillis();
        System.out.println("Баланс теперь " + MoneyEarner.balance);
        System.out.println("Время выполнения программы " + (finish - start));
    }
}

Livelock. Вежливые потоки

В случае с livelock потоки, наоборот, постоянно уступают друг другу ресурсы, пытаясь разрешить потенциальный deadlock. В итоге программа не продвигается, или продвигается очень медленно, а ресурсы системы постоянно потребляются (в отличие от дэдлока, когда они не используются).

@Override
public void run() {
    while(true) {
        for (int i = 0; i < 100000; i++) {
            lockA.lock();
            if (lockB.tryLock()) {
                balance++;
                lockB.unlock();
                lockA.unlock();
            } else {
                System.out.println("Не удалось получить второй лок. Отпускаю первый " + this.getId());
                lockA.unlock();
            }
        }
        break;
    }
}

В этом примере каждый поток пытается предотвратить дэдлок, отпуская первый лок, если не может получить второй. Консоль заполнена сообщениями об отпущенных локах, и программа завершилась в 1000 раз медленнее, чем в прошлых примерах. Конечно, это не настоящий лайвлок – программа всё же завершилась, но производительность заметно упала при большем потреблении ресурсов.

TryLock с таймаутом, пауза после отпускания лока могут быть подходящими решениями для этой проблемы.

Голодание. Потоки не получают ресурсы.

Если в программе слишком много потоков, они могут просто никогда не добраться до ресурсов. В этом примере я поменял класс на MoneySpender – теперь потоки тратят деньги из копилки. Но я создаю сотню потоков, и очевидно, что денег не так уж много, чтобы хватило всем.

class MoneySpender extends Thread {
    static Lock lock = new ReentrantLock();
    static AtomicInteger balance = new AtomicInteger(10000);
    public int moneyTaken = 0;
    @Override
    public void run() {
        while (balance.get() > 0) {
            lock.lock();
            balance.getAndDecrement();
            moneyTaken++;
            lock.unlock();
        }
        System.out.println("Я потратил столько денег " + moneyTaken);
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        Lock firstLock = new ReentrantLock();
        Lock secondLock = new ReentrantLock();
        Lock thirdLock = new ReentrantLock();
        for (int i = 0; i < 100; i++) {
            new MoneySpender().start();
        }
    }
}

Получаю такой вывод:

Я потратил столько денег 0
Я потратил столько денег 290
Я потратил столько денег 0
Я потратил столько денег 0
Я потратил столько денег 0
Я потратил столько денег 0
Я потратил столько денег 127
Я потратил столько денег 0
Я потратил столько денег 288
Я потратил столько денег 251

Видно, что некоторым потокам вообще не удалось добраться до денег. В этой программе это не так страшно, потому что она ничего полезного не делает, но если бы это была программа на сервере, а потоки представляли пользователей, то страшно подумать, сколько из них мы бы могли оставить без должного сервиса.

Семафоры

Чтобы ограничить количество потоков, которые могут работать с критической секцией, можно использовать семафор. Это счётчик, который позволяет потоку «пройти», если он больше нуля – тогда его значение уменьшается. Если счётчик равен нулю, поток вынужден ждать, пока кто-то не освободит слот.

В джаве они реализованы классом Semaphore. Для получения семафора вызываем метод acquire(), а отпускаем методом release().

Для иллюстрации работы с семафором создам класс Queue, где в очереди будут стоять всё те же сто потоков.

class Queue extends Thread {
    static Semaphore semaphore = new Semaphore(5);
    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println("Я работаю (на самом деле сплю). А меня ждут столько потоков:" + semaphore.getQueueLength());
            Thread.sleep(new Random().nextInt(100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Я завершил работу");
            semaphore.release();
        }
    }
}
public class ThreadsArticleDemo {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            new Queue().start();
        }
    }
}

Здесь с помощью метода getQueueLength() вывожу количество потоков в очереди. Для первых пяти потоков оно 0, что логично, ибо столько мы пускаем на выполнение, а потом очередь достигает максимума и постепенно опустевает.

В моей голове семафоры похожи на trylock, но всё же это не так. Если семафор не бинарный (то есть не установлен на 1 поток), то он не обеспечивает эксклюзивный доступ к ресурсу. А вот локи нужны как раз для этого.

Race Condition и барьеры.

У нас уже была одна гонка выше. Гонка за данные, когда несколько потоков одновременно хотели менять или читать данные, и в итоге некоторые изменения терялись, или они получали неактуальные данные. «Состояние гонки» отличается от этого явления, хотя иногда они взаимосвязаны. В этом случае порядок выполнения потоков влияет на результат задачи.

Например, один наш поток выполняет умножение, а другой будет выполнять сложение. Очевидно, что от порядка действий результат изменится. (1+1)*2 != 1*2+1.

Рассмотрим пример, в котором три потока выполняют сложение, а три – умножение. При этом запускаю я их вроде в одном и том же порядке, но выполняются они не всегда так же предсказуемо.

public class ThreadsArticleDemo {
    static int result = 1;
    public static void main(String[] args) throws InterruptedException {
        Runnable addition = () -> {
            result = result + 1;
            System.out.println("Сложение выполнено, результат: " + result);
        };
        Runnable multiplication = () -> {
            result = result * 2;
            System.out.println("Умножение выполнено, результат: " + result);
        };
        for (int i = 0; i < 5; i++) {
            result = 1;
            Thread[] threads = new Thread[6];
            threads[0] = new Thread(addition);
            threads[1] = new Thread(addition);
            threads[2] = new Thread(addition);
            threads[3] = new Thread(multiplication);
            threads[4] = new Thread(multiplication);
            threads[5] = new Thread(multiplication);
            for (Thread thread : threads) {
                thread.start();
            }
            for (Thread thread : threads) {
                thread.join();
            }
            
            System.out.println("Итоговый результат: " + result);
        }
    }
}

Из пяти прогонок четыре варианта дают верный результат 32, но один даёт неожиданное (на самом деле ожидаемое – ради этого этот пример и затевался) 28.

Сложение выполнено, результат: 2
Сложение выполнено, результат: 3
Умножение выполнено, результат: 6
Умножение выполнено, результат: 28
Умножение выполнено, результат: 14
Сложение выполнено, результат: 7
Итоговый результат: 28

Сложение выполнено, результат: 2
Сложение выполнено, результат: 3
Сложение выполнено, результат: 4
Умножение выполнено, результат: 8
Умножение выполнено, результат: 16
Умножение выполнено, результат: 32
Итоговый результат: 32

Чтобы избежать состояния гонки, нам нужно синхронизировать действия потоков – так, чтобы операции сложения и умножения происходили в правильном порядке. Один из способов добиться этого – использовать барьер.

CyclicBarrier

Барьер это точка, в которой несколько потоков останавливаются и ждут, пока к ним не присоединятся остальные. Только когда все потоки достигнут барьера, они смогут продолжить работу.

В примере выше нужно добавить барьер, к которому будут «подбегать» потоки со сложением. Когда они все вшестером окажутся у барьера, потоки с умножением продолжат свою работу. Точка «ожидания» задаётся методом await().

public class ThreadsArticleDemo {
    static int result = 1;
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(6);
        Runnable addition = () -> {
            result = result + 1;
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        };

        Runnable multiplication = () -> {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            result = result * 2;
        };

        for (int i = 0; i < 5; i++) {
            result = 1;
            Thread[] threads = new Thread[6];
            threads[0] = new Thread(addition);
            threads[1] = new Thread(addition);
            threads[2] = new Thread(addition);
            threads[3] = new Thread(multiplication);
            threads[4] = new Thread(multiplication);
            threads[5] = new Thread(multiplication);
            for (Thread thread : threads) {
                thread.start();
            }
            for (Thread thread : threads) {
                thread.join();
            }
            System.out.println("Итог: " + result + System.lineSeparator());
        }
    }
}

В этом примере я переиспользую барьер несколько раз в цикле. Он сбрасывается, если всё прошло гладко – поэтому он и Цикличный. Но его можно самостоятельно сбросить (например, в случае ошибки) с помощью метода reset() – может выкинуть BrokenBarrierException, если какие-то потоки уже ждут у барьера.

CountDownLatch

В отличие от барьера, который срабатывает каждый раз, когда заданное число потоков достигло точки синхронизации (в нашем примере — все шесть), CountDownLatch срабатывает один раз — когда его счётчик уменьшился до нуля — и не может быть использован повторно.

Поэтому в примере выше не получится уже использовать CountDownLatch в цикле (его придётся убрать) – как только счётчик один раз достиг нуля, обратно его не перезарядить.

И установить счётчик нужно на три потока, а не на шесть, так как для начала умножения нам нужно, чтобы три потока со сложением завершили свою работу и уменьшили счётчик методом countDown(). Остальные три потока вежливо ждут их всё с тем же методом await().

public class ThreadsArticleDemo {
    static int result = 1;
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        Runnable addition = () -> {
            result = result + 1;
            latch.countDown();
        };

        Runnable multiplication = () -> {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            result = result * 2;
        };

        result = 1;
        Thread[] threads = new Thread[6];
        threads[0] = new Thread(addition);
        threads[1] = new Thread(addition);
        threads[2] = new Thread(addition);
        threads[3] = new Thread(multiplication);
        threads[4] = new Thread(multiplication);
        threads[5] = new Thread(multiplication);
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        System.out.println("Итоговый результат: " + result);
    }

Это, наверно, самая большая статья в моём блоге. И на этом достижении завершаю. В этой статье и так слишком много всего: и потоки, и процессы, и семафоры, и мьютексы, и даже барьеры…

Оставьте комментарий