Уже работаете с потоками Kotlin и знакомы с основными концепциями, но никогда не применяли Channel, не знаете разницы между merge, combine и zip или не до конца понимаете SharedFlow и StateFlow и их использование?

Мы собрали ключевые понятия и нюансы всех этих элементов в шпаргалке-справочнике для разбора сложных сценариев с потоками.

Холодные и горячие потоки

Горячие потоки

  • Пример: канал channel, коллекции вроде списка List, множества Set и т. д.
  • Запускаются сразу: значения начинают выдаваться независимо от того, имеются ли подписчики.
  • Элементы сохраняются: и не пересчитываются, все подписчики получают одинаковую последовательность значений.

Холодные потоки

  • Примеры: последовательность Sequence, поток Flow.
  • Запускаются по запросу: значения начинают выдаваться, только когда подписчик активно подписывается на холодный поток. Источники данных «ленивые».
  • Независимая выдача: каждым подписчиком получается собственная последовательность значений. Элементы не сохраняются.

Channel

Основные принципы

  • Это горячий поток значений.
  • Гарантируется отсутствие конфликтов и проблем с общим состоянием, а также справедливость. Поэтому каналы приходятся кстати при взаимодействии корутин.
  • Поддерживается любое количество отправителей и получателей.
  • Каждое значение, отправляемое в канал, получается лишь раз.
  • Если подписано несколько получателей одновременно, элементы распределяются между ними равномерно, это FIFO-очередь получателей.
The channel has 3 receivers, by order of subscription: 
Receiver1, Receiver2, Receiver3.

All the receivers have already subscribed to the channel.

The channel emit 4 values: "A", "B", "C" then "D".

Receiver1 receives "A" and "D"
Receiver2 receives "B"
Receiver3 receives "C"
  • У получателей имеется две приостанавливающие функции: send и receive.
  • receive приостанавливается, если в канале нет элементов, и остается в ожидании доступного элемента, при наличии которого возобновляется.
  • send приостанавливается, если каналом достигнут предел пропускной способности, об этом ниже.
  • Имеются также неприостанавливающие версии trySend и tryReceive, ими возвращается ChannelResult, которым обозначается успешность/неуспешность операции.
  • Их закрывают вручную, когда завершается отправка данных или появляется исключение: myChannel.close(). В противном случае получатели остаются в вечном ожидании элементов.

Типы пропускной способности каналов

val myChannel = Channel<Int>(capacity = 3)

// или

val myChannel = produce(capacity = 3) {
// значения выдаются здесь
}
  • Channel.UNLIMITED: неограниченный буфер, отправка никогда не приостанавливается.
  • Channel.BUFFERED: емкость буфера  —  64, это значение по умолчанию переопределяется системным свойством kotlinx.coroutines.channels.defaultBuffer в виртуальной машине Java.
  • Channel.RENDEZVOUS: это поведение по умолчанию, емкость буфера  —  0. Получатель получает данные, только если подписан на отправителя во время отправки данных.
  • Channel.CONFLATED: емкость буфера  —  1, каждым новым элементом заменяется предыдущий.
  • Любое значение int: емкость буфера, задается этим int.

Обработка переполнения буфера

Что происходит при заполнении буфера, контролируется в каналах параметром onBufferOverflow. Имеется три варианта:

  • BufferOverflow.SUSPEND: это поведение по умолчанию, при заполнении буфера метод send приостанавливается.
  • BufferOverflow.DROP_OLDEST: при заполнении буфера удаляется старейший элемент.
  • BufferOverflow.DROP_LATEST: при заполнении буфера удаляется последний элемент.

Создание канала, который закрывается автоматически

При каждом завершении работы построителя корутин produce  —  он завершается, приостанавливается или отменяется  —  канал им закрывается:

suspend fun myFunction() = coroutineScope {
val channel = produce {
// значения выдаются здесь, и без вызова «close()» в конце
}
}

Автоматическая очистка, если элемент не обрабатывается

Если канал закрыт, отменен или в send, receive или hastNext выбрасывается ошибка:

val myChannel = Channel(
capacity,
onUnderliveredElement = { /* очищаем операции здесь */ }
)

Сценарий применения: активация обновления

В Android типичный сценарий для каналов  —  активация обновления экрана: обновление по касанию или кнопка «Обновить». Во фрагменте кода ниже показывается, как извлекать данные из API, когда мы впервые подписываемся на поток или активируется обновление.

Для активации обновления многие используют SharedFlow. Однако он предназначен для нескольких получателей, поэтому хоть это и рабочее, но не оптимальное решение. Подробнее о SharedFlow  —  далее.

// Это упрощенная версия для демонстрирования возможностей использования каналов. 
// В реальном сценарии требуется дополнительная логика, чтобы избежать
// обновления, например, если данные уже загружаются.

interface ApiService {
suspend fun fetchData(): List<String>
}

class FetchDataUseCase @Inject constructor (
private val apiService: ApiService
) {
// Создается канал с буфером 1, и удаляются самые новые данные.
// Поэтому, если активировать обновление несколько раз подряд,
// сохранится только первый элемент.
private val refreshChannel = Channel<Unit>(
capacity = 1,
onBufferOverflow = BufferOverflow.DROP_LATEST
)

// Этот поток получается во «ViewModel» для создания состояния пользовательского интерфейса
val dataState: Flow<FetchDataState> =
refreshChannel
// Канал преобразуется в поток
.consumeAsFlow()
// Чтобы, подписавшись на поток, получать данные,
// при запуске выдается элемент
.onStart { emit(Unit) }
.map { fetchData() }

fun refresh() {
// Здесь используем функцию «trySend», чтобы не создавать
// приостанавливающую функцию, поэтому не понадобится область для ее вызова.
// Чтобы активировать обновление, из «ViewModel» вызывается этот метод
refreshChannel.trySend(Unit)
}

private suspend fun fetchData(): FetchDataState =
try {
val data = apiService.fetchData()
FetchDataState.Success(data)
} catch (e: Exception) {
FetchDataState.Error(e.message ?: "An error occurred")
}

sealed interface FetchDataState {
data object Loading : FetchDataState
data class Success(val data: List<String>) : FetchDataState
data class Error(val message: String) : FetchDataState
}
}

Flow

Основные принципы

  • Это холодный поток значений.
  • Имеется встроенная поддержка структурированной конкурентности.
  • Последняя операция потока  —  терминальная: collect, first и т. д.
  • Поток изменяется промежуточными операциями: map, onEach, flatMapLastest и т. д.
  • Терминальные операции приостанавливаются, им требуется область.
  • Неперехваченными исключениями поток немедленно отменяется, и с помощью collect исключение выбрасывается повторно.
  • По умолчанию поток принимает свой контекст из контекста, в котором вызывается collect.

Объединение потоков

merge, combine и zip  —  это промежуточные функции, при помощи которых несколько потоков объединяются в один. Разберем ключевые отличия этих трех функций.

merge

  • Элементы не изменяются.
  • Элементы выдаются, как только появляются; появление элемента также в другом потоке до появления значения не ожидается.
  • Функция применяется при наличии нескольких источников событий, которыми выдается одно и то же действие.
flowA emits: 1
flowB emits: 2
flowA emits: 3

merge(flowA, flowB) produces 1, 2, 3

zip

  • Чтобы создать новое значение, объединяются элементы разных потоков.
  • Для указания того, как объединяются элементы, нужна функция.
  • Чтобы создать пару, нужно дождаться выдачи значения каждым потоком.
  • Элементы могут быть частью только одной пары.
  • Элементы, которые остались без пары, теряются.
flowA emits: 1
flowB emits: 2
flowA emits: 3

flowA.zip(flowB) {fA, fB -> fA + fB } produces 3 (1+2 and 3 is dropped)

combine

  • Чтобы создать новое значение, объединяются элементы разных потоков.
  • Для указания того, как объединяются элементы, нужна функция.
  • Нужно дождаться первой выдачи значения в более медленном потоке, прежде чем появится новый элемент.
  • Когда в потоке появляется новый элемент, им заменяется предыдущий, и сразу же выдается новое значение  —  без ожидания выдачи нового элемента каждым потоком.
flowA emits: 1
flowB emits: 2
flowA emits: 3

flowA.combines(flowB) { fA, fB -> fA + fB } produces 3 (1+2) then 5 (3+2)

Отличия fold от scan

И в fold, и в scan все значения, выдаваемые потоком, соответствующей операцией объединяются в один элемент.

  • fold  —  терминальная операция, она приостанавливается до завершения потока и появления конечного значения.
  • scan  —  промежуточная операция, в результате которой получаются все промежуточные значения.
val myflow = flowOf(1, 2, 3, 4)
myFlow.fold(0) { acc, newElement -> acc + newElement } // получается 10

myFlow.scan(0) { acc, newElement -> acc + newElement }
// получается 1, 3 (1+2), 6 (3+3), 10 (6+4)

flatMapConcat, flatMapMerge и flatMapLatest

  • А это промежуточные операторы.
  • Элементы, выдаваемые исходным потоком, преобразуются ими применением другого потока в элементе, в итоге возвращается еще один поток.
myFlowA.flatMapConcat { fA -> myFlowB(fA) } // возвращается значение, выдаваемое потоком «B»

flatMapConcat

  • Каждое выдаваемое значение преобразуется в поток, полученные потоки последовательно конкатенируются.
  • Полностью выдаются значения из первого внутреннего потока до начала следующего.
  • Сценарий применения: при обработке внутренних потоков по порядку, без перекрывания.

flatMapMerge

  • Каждое выдаваемое значение преобразуется в поток, полученные потоки параллельно объединяются.
  • Значения выдаются по мере доступности из всех внутренних потоков, потенциально не по порядку.
  • Сценарий применения: при обработке внутренних потоков параллельно порядок передаваемых значений не важен.

flatMapLatest

  • Каждое выдаваемое значение преобразуется в поток, при выдаче нового значения предыдущие потоки отменяются, значения выдаются из последнего потока.
  • Активен только последний поток, выдаются его значения, предыдущие потоки отменяются.
  • Сценарий применения: когда важно только последнее значение и нужно отменить предыдущие операции.
data class User(val id: Int, val name: String)
data class UserDetails(val userId: Int, val address: String)

fun fetchUserData(): Flow<User> = flow {
emit(User(1, "Alice"))
delay(500)
emit(User(2, "Bob"))
delay(500)
emit(User(3, "Charlie"))
}

fun fetchUserDetails(userId: Int): Flow<UserDetails> = flow {
delay(1000) // Моделируется сетевая задержка
emit(UserDetails(userId, "$userId's address"))
}

// flatMapConcat
fetchUserData()
.flatMapConcat { user ->
fetchUserDetails(user.id)
}
.collect { userDetails ->
println("flatMapConcat: ${userDetails}")
}
// Последовательно извлекаются сведения о каждом пользователе.
// flatMapConcat: UserDetails(userId=1, address=1's address)
// flatMapConcat: UserDetails(userId=2, address=2's address)
// flatMapConcat: UserDetails(userId=3, address=3's address)

// flatMapMerge
fetchUserData()
.flatMapMerge { user ->
fetchUserDetails(user.id)
}
.collect { userDetails ->
println("flatMapMerge: ${userDetails}")
}
// Из-за параллельной выборки сведения о пользователях могут перемежаться.
// flatMapMerge: UserDetails(userId=1, address=1's address)
// flatMapMerge: UserDetails(userId=2, address=2's address)
// flatMapMerge: UserDetails(userId=3, address=3's address)

// flatMapLatest
fetchUserData()
.flatMapLatest { user ->
fetchUserDetails(user.id)
}
.collect { userDetails ->
println("flatMapLatest: ${userDetails}")
}
// Извлекаются и выводятся данные только о последнем пользователе, потому что
// новыми пользователями предыдущие выборки отменяются.
// flatMapLatest: UserDetails(userId=3, address=3's address)

Преобразование функции в поток

val function = suspend {
// этим приостанавливается лямбда-выражение
// функция определяется здесь
}

function.asFlow()

или

suspend fun myFunction(): Flow<T> {
// функция определяется здесь
}

::myFunction.asFlow()

Создание потока, в котором появляются элементы до того, как на него подписываются

Функцией channelFlow создается гибрид потока и канала, здесь появляется горячий поток данных, а также реализуется интерфейс Flow:

val myChannelFlow = channelFlow {
val myData = // данные извлекаются здесь
send(myData)
}

suspend fun fetchData() {
myData.first()
}

Изменение контекста потока

myFlow
.flowOn(Dispatchers.IO)

// или

myFlow
.flowOn(CoroutineName("NewName"))

Запуск потока без дополнительного уровня вложенности

// вместо этого
viewModelScope.launch {
myFlow
.collect()
}

// делаем это
myFlow
.launchIn(viewModelScope)

SharedFlow

Основные принципы

  • Это горячий поток значений.
  • Имеется несколько получателей, и всеми ими получаются одинаковые значения.
  • Приходится кстати при пересылке значений нескольким получателям или совместном использовании состояния или события в разных частях приложения.
  • Никогда не завершается до закрытия всей области.
  • Имеется изменяемая версия MutableSharedFlow для обновления состояния посредством выдачи новых значений с помощью приостанавливающей функции emit.
  • Используется также неприостанавливающая версия tryEmit.
  • Поддерживаются настраиваемое воспроизведение и стратегия переполнения буфера.
  • Все методы общего потока потокобезопасны, безопасно вызываются из параллельно выполняемых корутин без внешней синхронизации.

Параметры конфигурации

В Kotlin имеется метод для создания MutableSharedFlow и определения желаемого поведения буфера:

public fun <T> MutableSharedFlow(
// количество значений, воспроизведенных новым подписчикам
replay: Int = 0,
// количество значений, буферизованных в дополнение к воспроизведенным «replay»
extraBufferCapacity: Int = 0,
// действие при переполнении буфера
// Возможные значения: «SUSPEND», «DROP_OLDEST», «DROP_LATEST»
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

Функция shareIn

  • Flow преобразуется в SharedFlow.
  • Приходится кстати при превращении одного потока в несколько.
  • В первом аргументе scope ожидается область корутины для запуска корутины и «собирания» элемента потока.
  • Вторым аргументом started определяется, когда в sharedFlow начинается прослушивание выдаваемого потоком значения. Им принимается объект SharingStarted, разновидности которого опишем чуть ниже.
  • Третьим аргументом replay  —  по умолчанию  —  определяется количество значений, воспроизведенных новым подписчикам.

Разновидности SharingStarted:

  • SharingStarted.Eagerly прослушивание элементов начинается сразу и не останавливается, пока область не отменена.
  • SharingStarted.Lazily прослушивание начинается при появлении первого подписчика и не останавливается, пока область не отменена.
  • SharingStarted.WhileSubscribed() прослушивание начинается при появлении первого подписчика и, как только исчезает последний подписчик, останавливается. В параметре stopTimeoutMillis настраивается задержка в миллисекундах между исчезновением последнего подписчика и прекращением совместного использования корутины.

Примечание о WhileSubscribed: если открыть новый intent на экране, например в приложении camera, экран поставится на паузу, у SharedFlow больше не будет подписчиков, и выдача прекратится. При возвращении к исходному экрану осуществится повторная подписка на экран с возможностью снова запустить операцию внутри потока. Это чревато проблемой или повторным запуском лишней операции.

Примечание о SharingStarted.Eagerly и SharingStarted.Lazily: если использовать ViewModelScope или LifecycleScope, то при удалении экрана отправка элементов из SharedFlow прекращается.

Преобразование потока в SharedFlow

// из «ViewModel» или класса с «lifeCycleScope»
myFlow.shareIn(
scope = viewModelScope
started = SharingStarted.Lazily
)

// из класса без «lifeCycleScope» — репозиторий или cценарий применения

suspend fun myFunction() = coroutineScope {
myFlow.shareIn(
scope = this
started = SharingStarted.Lazily
)
}

Сценарий применения: наблюдение за изменениями в базе данных из нескольких мест

В библиотеке Room имеется встроенная поддержка Flow. А значит, можно отслеживать изменения в БД и получать новые данные, как только они становятся доступны. Но считать их с диска бывает непросто. Допустим, нужно получить данные на нескольких экранах. С SharedFlow данные извлекаются не для каждого экрана, а сразу для всех.

Чтобы получать обновления на все экраны, достаточно один раз извлечь UserSettings:

// простой объект доступа к данным для их извлечения из Room
@Dao
interface UserSettingsDao {
// Из базы данных извлекаются все пользовательские настройки, выдается поток
@Query("SELECT * FROM user_settings")
fun getAll(): Flow<List<UserSettings>>
}

class UserSettingsRepository @Inject constructor(
private val dao: UserSettingsDao
) {

// Данные считываются из БД лишь раз,
// и вычисляемые здесь данные получаются всеми получателями.
suspend fun getAll(): SharedFlow<List<UserSettings>> = coroutineScope {
dao.getAll.shareIn(
// передаются в область
scope = this,
// начинают выдаваться, только когда имеется получатель
started = SharingStarted.Lazily,
// последний элемент воспроизводится, когда на него подписывается новый получатель
replay = 1
)
}
}

StateFlow

Основные принципы

  • Аналогичен SharedFlow, параметр replay устанавливается на 1.
  • Всегда сохраняется только одно значение.
  • Доступ к сохраненному значению получается с помощью свойства value.
  • В конструкторе задается начальное значение.
  • Современная альтернатива LiveData.
  • Новый элемент не выдается, если он равен предыдущему.

Установка и считывание значения

val state = MutableStateFlow("A") // исходное значение — «A»

state.value = "B" // задается значение «B»

state.value = "B" // новый элемент не выдается, потому что это значение уже «B»
val myValue = state.value // из состояния «state» считывается значение, здесь это «B»

Функция stateIn

  • Поток преобразуется в StateFlow.
  • Нужно указать область.
  • Имеется два варианта: приостанавливающая и неприостанавливающая.

Приостанавливающая stateIn

  • Приостанавливается до выдачи первого элемента потока и высчитывания нового значения:
suspend fun myFunction() = coroutineScope {
myFlow.stateIn(this)
}

Неприостанавливающая stateIn

  • В аргументе initialValue требуется исходное значение.
  • Во втором аргументе started ожидается элемент SharingStarted, подробнее об этом аргументе  —  в описании функции shareIn раздела SharedFlow выше.
myFlow.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = "A"
)

Сценарий применения: выдача данных из ViewModel в представление

Продемонстрируем фрагмент кода с преобразованием потока в StateFlow для выдачи из модели представления состояния, наблюдаемого из вашего представления:

class MyViewModel @Inject constructor(
private val fetchDataUseCase: FetchDataUseCase
) : ViewModel() {

val myState: StateFlow<MyState> =
fetchDataUseCase.dataState
.map {
when (it) {
is FetchDataUseCase.FetchDataState.Loading -> MyState.Loading
is FetchDataUseCase.FetchDataState.Success -> MyState.Success(it.data)
is FetchDataUseCase.FetchDataState.Error -> MyState.Error(it.message)
}
}
// поток преобразуется в поток состояния
.stateIn(
// чтобы при удалении «viewModel» прослушивание остановилось,
// задаем область во «viewModel»
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = MyState.Loading
)


sealed interface MyState {
data object Loading : MyState
data class Success(val data: List<String>) : MyState
data class Error(val message: String) : MyState
}
}

@Composable
fun MyScreen(viewModel = MyViewModel()) {
val state = viewModel.myState.collectAsStateWithLifecycle()
when (state) {
is MyState.Loading -> // показываем представление загрузки
is MyState.Success -> // показываем представление успеха
is MyState.Error -> // показываем представление ошибки
}
}

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Gaëlle Minisini: Advanced Kotlin Flow Cheat sheet (for Android Engineer)

Предыдущая статьяReact-приложение с шаблонами «Репозиторий» и «Адаптер»
Следующая статьяПолезные рабочие плагины TMUX