Новый подход к пониманию RxJava

Реактивное программирование  —  очень важный способ написания чистого кода, а ReactiveX  —  одна из самых известных библиотек для асинхронной обработки последовательностей данных реактивным способом.

Для разработки на Android применяется RxJava.

В этом примере изучим RxJava с помощью анимированных и интерактивных диаграмм в приложении RxLab.

Что такое RxLab?

RxLab  —  это приложение для Android с открытым исходным кодом. Оно представляет собой любопытный инструмент для изучения программирования ReactiveX, оснащенный анимацией, планировщиками и таймером.

Чтобы приступить к обучению, загрузите приложение из Google Play или Github с последним обновлением и исходным кодом: https://github.com/anastr/RxLab

Прежде чем идти дальше, я объясню, как устроен сигнал (вывод) этого приложения:

Так выглядит сигнал
  1. цвет самого сигнала указывает на наблюдаемый (observable) объект, откуда он исходит;
  2. значение;
  3. цвет, указывающий на текущий поток;
  4. индекс потока.

Когда присутствует указатель на текущий поток (п.3), то в нем применяются следующие обозначения:

  • красный: основной поток;
  • голубой: поток вычислений;
  • желтый: поток ввода-вывода;
  • зеленый: одиночный поток.

Давайте начнем с простой операции.

just()

Этот код будет выводить в лог буквы от А до D, но взгляните на интерактивную диаграмму:

Observable.just('A', 'B', 'C', 'D')
     .subscribe {
           Log.i(TAG, "$it")
     }

Observable.just  —  это объект наблюдаемого (observable), а внутри находится метод subscribe, куда мы помещаем наблюдатель (observer).

На нашей диаграмме just()  —  это observable, а стрелка  —  observer. И (A, B, C, D)  —  это сигналы.

Операция just() берет несколько элементов и передает наблюдателю observer в том же порядке и в том же потоке (в нашем случае это основной поток). Когда наблюдаемое observable закончит посылать сигналы, оно вызовет onComplete в наблюдателе (вертикальная линия на диаграмме), что означает: наблюдаемое отправило все свои сигналы, и наблюдатель больше ничего не получит.

map()

Допустим, у нас есть сигналы типа string, и нужно сопоставить их со списком длин этих строк.

Observable.just("Dr.", "Anas", "Altair")
    .map { it.length }
    .subscribe(...)

Сигналы один за другим будут уходить от наблюдаемого observable к операции map, которая преобразует строки в целые числа (длину строки), а затем отправляет их наблюдателю observer.

Сигналы, опять же, будут идти в том же порядке и в том же потоке.

interval()

Если нужно, чтобы сигнал уходил каждые две секунды, то с RxJava это очень просто сделать:

Observable.interval(2000, TimeUnit.MILLISECONDS)
.subscribe(...)

Вот что получит наблюдатель:

Полосатый желтый квадрат обозначает время. Наблюдаемое observable будет автоматически посылать сигнал каждые две секунды. Первый сигнал будет 0, затем 1, 2, 3…. Сигналы будут уходить вечно.

Операция interval по умолчанию работает в потоке вычислений и будет посылать сигналы наблюдателю в том же потоке, поэтому нужно соблюдать осторожность, имея дело с сигналами внутри наблюдателя. Для изменения потока воспользуйтесь операцией observeOn().

flatMap() and concatMap()

Взглянем на эти два observable:

Observable.just("A,B,C", "D,E,F")
    .flatMap { stringEmit ->
        Observable.fromIterable(stringEmit.split(','))
            .subscribeOn(Schedulers.computation())
            .doOnNext { longProcess(..) }
    }
    .subscribe(...)Observable.just("A,B,C", "D,E,F")
    .concatMap { stringEmit ->
        Observable.fromIterable(stringEmit.split(','))
            .subscribeOn(Schedulers.computation())
            .doOnNext { longProcess(..) }
    }
    .subscribe(...)

В этом примере у нас есть два наблюдаемых observable и два наблюдателя observer. Они не влияют друг на друга.

flatMap()

Первый содержит просто два строковых сигнала (“A, B, C” и “D, E, F”). Операция flatMap преобразует каждый сигнал в наблюдаемое. В примере он преобразует “A, B, C” в наблюдаемое, которое будет посылать три сигнала (“A”, “B” и “C”). Следом вызываем subscribeOn, чтобы изменить поток, над которым будут работать новые наблюдаемые, а затем выполнить длительный процесс longProcessнад каждой буквой сигнала.

Вернитесь к схеме и очень внимательно посмотрите на операцию flatMap. Она будет принимать все сигналы из восходящего потока и обрабатывать их вместе, каждый  —  в отдельном потоке вычислений. Затем буквы-сигналы отправляются к конечному наблюдателю в случайном порядке (первому, кто завершит longProcess).

concatMap()

Он выполняет ту же работу, что и flatMap, но берет первую строку сигнала из восходящего потока, преобразовывает ее и отправляет все новые наблюдаемые сигналы наблюдателю. Исключительно после этого concatMap() возьмется обрабатывать следующую строку из восходящего потока.

flatMap и concatMap, таким образом, выполняют одну и ту же работу, но concatMap уважает порядок.

В RxLab есть еще много операций, которые я предоставляю вам исследовать самостоятельно (filter, reduce, scan, zip, combineLatest, distinct, throttle и другие).

Заключение

RxLab  —  очень полезное приложение, которое можно держать открытым, когда нужна какая-то операция, но вы забываете принципы ее действия, или когда нужно взглянуть на поток, время выполнения и его результат.

Этот проект не поможет вам выучить RxJava с нуля, но поможет понять его и запомнить принципы работы.

Читайте также:

Читайте нас в Telegram, VK и Яндекс.Дзен


Перевод статьи: Anas Altair, “Understand RxJava in a New Way”