Вплоть до Java 5 создавать потоки и управлять ими было возможно только на уровне приложения. Объекты Thread требуют значительного объема памяти. Таким образом, если постоянно создавать много таких объектов в крупномасштабном приложении, это приведет к существенным затратам памяти. Поэтому создание потоков и управление ими лучше отделить от остальной части приложения.

В качестве решения Java предоставляет фреймворк Executor. Он содержит множество функций для эффективного управления несколькими потоками. Не нужно каждый раз создавать новые потоки с помощью Executor — он позволяет использовать уже созданные потоки, когда вам это необходимо. В результате экономится как память вашего Java-приложения, так и ваше драгоценное время.

Из этой статьи вы узнаете о фреймворке Executor, пуле потоков и различных методах их создания, а также о том, как с их помощью управлять потоками. Давайте начнем.

Интерфейсы и классы Executor

Во фреймворке Executor присутствуют три интерфейса: Executor, ExecutorService и ScheduledExecutorService.

Executor — простой интерфейс, содержащий метод execute() для запуска задачи, заданной запускаемым объектом Runnable.

ExecutorService представляет собой суб-интерфейс Executor, который добавляет функциональность для управления жизненным циклом потоков. Он также включает в себя метод submit(), который аналогичен методу execute(), но более универсален. Перегруженные версии метода submit() могут принимать как выполняемый (Runnable), так и вызываемый (Callable) объект. Вызываемые объекты аналогичны выполняемым, за тем исключением, что задача, определенная вызываемым объектом, также может возвращать значение. Поэтому, если мы передаем объект Callable методу submit(), он возвращает объект Future. Этот объект можно использовать для получения возвращаемого значения Callable и управления статусом как Callable, так и Runnable задач.

ScheduledExecutorService, в свою очередь, — это суб-интерфейс ExecutorService. Он добавляет функциональность, которая позволяет планировать выполнение задач в коде.

Помимо трех вышеупомянутых интерфейсов, Executor Framework также содержит класс Executors, который по умолчанию включает в себя методы для создания различных типов служб-исполнителей. С помощью этого класса и интерфейсов можно создавать пулы потоков. Что же это такое? 

Пулы потоков

Пул потоков — это набор объектов Runnable и постоянно работающих потоков. Коллекция объектов Runnable называется рабочей очередью. Постоянно запущенные потоки проверяют рабочий запрос на наличие новой работы, и если новая работа должна быть выполнена, то из рабочей очереди будет запущен объект Runnable. Чтобы использовать фреймворк Executor, нам нужно создать пул потоков и отправить туда задачу для выполнения. В классе Executors есть четыре основных метода, которые используются для создания пулов потоков. Рассмотрим каждый из них на примере.

newSingleThreadExecutor()

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MyRunnable implements Runnable {
    private final String task;

    MyRunnable(String task) {
        this.task = task;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
           System.out.println("Executing "+ task +" with "+Thread.currentThread().getName());
        }
        System.out.println();
    }
}

public class Exec1 {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 1; i <= 5; i++) {
            Runnable worker = new MyRunnable("Task" + i);
            executor.execute(worker);
        }
        executor.shutdown();    /* После этого исполнитель перестанет принимать какие-либо новые потоки и завершит все существующие в очереди */    
    }
}

В данном примере мы отправляем на исполнение пять задач. Но так как применяется метод newSingleThreadExecutor(), будет создан только один новый поток и одновременно будет выполняться только одна задача. Остальные четыре задачи находятся в очереди ожидания. Как только задача выполнится потоком, этот поток тут же выберет и выполнит следующую. Метод shutdown() ожидает завершения выполнения задач, в настоящий момент переданных исполнителю, чтобы завершить его работу. Однако, если вам хочется завершить работу исполнителя без ожидания, используйте вместо этого метод shutdownNow().

newFixedThreadPool()

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MyRunnable implements Runnable {
    private final String task;

    MyRunnable(String task) {
        this.task = task;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
           System.out.println("Executing "+ task +" with "+Thread.currentThread().getName());
        }
        System.out.println();
    }
}

public class Exec2 {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 1; i <= 5; i++) {
            Runnable worker = new MyRunnable("Task" + i);
            executor.execute(worker);
        }
        executor.shutdown();
    }
}

Использован тот же пример, что и в предыдущем случае, только на этот раз — с методом newFixedThreadPool(). Этот метод позволяет создать пул с фиксированным количеством потоков. Таким образом, когда мы отправим пять задач, в коде будет создано три новых потока и будут выполнены три задачи. Остальные две задачи находятся в очереди ожидания. Как только какая-либо задача выполнится потоком, этим же потоком будет выбрана и выполнена следующая задача.

newCachedThreadPool()

Когда мы создаем пул потоков с помощью этого метода, максимальный размер пула потоков устанавливается на максимальное целочисленное значение в Java. Этот метод создает новые потоки по запросу и разрушает потоки, которые простаивают больше минуты, если запрос отсутствует.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MyRunnable implements Runnable {
    private final String task;

    MyRunnable(String task) {
        this.task = task;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
           System.out.println("Executing "+ task +" with "+Thread.currentThread().getName());
        }
        System.out.println();
    }
}

public class Exec3 {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 1; i <= 5; i++) {
            Runnable worker = new MyRunnable("Task" + i);
            executor.execute(worker);
        }
        executor.shutdown();  
    }
}

В данном примере метод newCachedThreadPool() изначально создаст пять новых потоков и обработает пять задач. Никакой очереди ожидания здесь не будет. Если поток остается в бездействии более минуты, метод устраняет его. Таким образом, этот метод — хороший выбор, если вам хочется добиться большей производительности очереди, чем это возможно с методом newFixedThreadPool(). Но если вы хотите ограничить количество параллельно выполняемых задач во имя управления ресурсами, лучше использовать newFixedThreadPool().

newScheduledThreadPool()

Метод newScheduledThreadPool() создает пул потоков, который может планировать выполнение задач после заданной задержки или через регулярные промежутки времени. Этот метод возвращает ScheduledExecutorService. Существует три метода для планирования задач в таком пуле потоков: schedule(), scheduleAtFixedRate() и scheduleWithFixedDelay(). Рассмотрим пример реализации пула потоков с помощью метода schedule().

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

class RunnableTask implements Runnable{
 
    private String task;  
    public RunnableTask(String s){
        this.task = s;
    }
 
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" Start Time for "+task+" "+new Date());
        // Обрабатываем задачу здесь
        try {
			Thread.sleep(3000); // обработка задачи занимает три секунды
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
        System.out.println(Thread.currentThread().getName()+" End Time for "+task+" "+new Date());
    }
 
    @Override
    public String toString(){
        return this.task;
    }
}

public class ScheduledThreadPoolMain {
 
	public static void main(String[] args) throws InterruptedException {
	    System.out.println("Current Time = "+new Date());
		
	    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
		
	    for(int i=1; i<=3; i++){
		Thread.sleep(2000);
		RunnableTask task = new RunnableTask("Task "+i);
		scheduledThreadPool.schedule(task,3, TimeUnit.SECONDS); // планируется задержка выполнения задачи на три секунды
	    }
		
	    Thread.sleep(6000); // добавляем некоторую задержку
	    scheduledThreadPool.shutdown();
	    System.out.println("Completed all threads");
	}
}

Как видно из примера: 

  • Метод schedule принимает три аргумента: задачу, задержку и промежуток времени задержки. 
  • Метод schedule() используется для планирования задачи после фиксированной задержки. 
  • Метод scheduleAtFixedRate() используется для планирования задачи после фиксированной задержки и последующего периодического выполнения этой задачи. 
  • Метод scheduleWithFixedDelay() используется для планирования задачи после начальной задержки, а затем выполнения задач с фиксированной задержкой после завершения предыдущей задачи.

Каждый из этих методов полезен в своём определённом сценариев.

Вот и всё о пулах потоков. Теперь  —  некоторые важные замечания насчет того, как использовать фреймворк Executor.

Важные замечания

  • Никогда не ставьте в очередь задачи, которые в это самое время ожидают результатов от других задач. Это может привести к тупику.
  • Пул потоков после окончания работы должен быть явно завершен путем вызова метода shutdown(). Если этого не сделать, программа будет продолжать работать без конца. Если вы отправите исполнителю другую задачу после завершения работы, она выдаст исключение RejectedExecutionException.
  • Будьте осторожны, применяя потоки для длительных операций. Это может привести к бесконечному ожиданию потока и в конечном итоге  —  к утечке ресурсов.
  • Для эффективной настройки пула потоков необходимо понимать особенности задач. Если задачи очень разные, имеет смысл использовать разные пулы потоков для разных типов задач, чтобы правильно их настроить.

На этом всё. Надеюсь, пулы потоков и Java ExecutorService стали для вас понятнее. Спасибо за чтение и счастливого кодирования!

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи Manusha Chethiyawardhana: “How to use the Executor Framework in Java”

Предыдущая статьяУскорьте создание пиксельной графики в 10 раз ✨?
Следующая статьяМоделирование связей графа в DynamoDB