Flows Coroutines (потоки в корутинах)

Flow – это реактивное программирование с сопрограммами. В этом посте мы поговорим о потоках в сопрограммах. Если вы не имеете представления о сопрограммах, я рекомендую вам прочитать этот пост , прежде чем продолжить, чтобы получить представление о том, что такое сопрограммы и как мы можем использовать их в реальном приложении. Я предполагаю, что вы уже хорошо знакомы с основами сопрограмм.

Что такое Flows (потоки) в сопрограммах

Потоки в сопрограммах позволяют нам выдавать значения асинхронно. Это observer (наблюдатели), который собирает данные из источника. Как и observer в RxJava, Flows дает нам множество операторов для управления данными. В этой теме мы поговорим о следующем:

  • Создание потоков
  • Свойства потоков
  • Операторы
  • Буферизация
  • Составление потоков
  • Обработка исключений

Асинхронный поток

Мы можем определить поток в сопрограмме как поток значений, выдаваемых сопрограммой, которые вычисляются асинхронно.

Существует несколько способов создания потока в сопрограммах, один из них — с помощью построителя потока.

import kotlinx.coroutines.flow.flow
flow{// Наш код}

Чтобы отправить значение из потока, мы используем функцию emit(value: T), значение которой может быть любого типа. Как только мы создадим поток эмиттера, мы должны собрать эти испускаемые данные. Мы можем использовать collect{} функцию для получения наших данных. Хорошо, давайте посмотрим на реальный случай, как мы можем создавать и потреблять данные, используя потоки в сопрограмме. Мы собираемся реализовать функцию, которая возвращает Flow<Integer> целое число, эта функция даст нам числа Фибоначчи. Мы добавим задержку между каждым числом, делая вид, что есть некоторая работа, которую мы должны выполнить. В нашей основной функции мы собираемся собирать числа, выдаваемые этой функцией, и печатать сообщение, содержащее собранное значение.

import kotlinx.coroutines.delay 
import kotlinx.coroutines.flow.Flow 
import kotlinx.coroutines.flow.collect 
import kotlinx.coroutines.flow.flow 
import kotlinx.coroutines.runBlocking 

fun main() { 
    runBlocking { 
        println ("получение чисел Фибоначчи" ) 
        sendFibonacciNumbers().collect { 
            println ("сбор чисел Фибоначчи: $it ") 
        } 

        println ("Завершить сбор чисел Фибоначчи") 
    }
 } 

suspend fun sendFibonacciNumbers(): Flow<Int> { 
    return flow {
         val fibonacciNumbersList = listOf(0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55) 
        fibonacciNumbersList.forEach {
             delay( it * 100L) 
            emit( it ) 
        } 
    }
 }

Как мы видим здесь, поток выдает значение, вызывая функцию emit(value). Как только мы подпишемся на поток использующий функции collect(), мы сможем использовать данные, выдаваемые нашим потоком.

В этом примере мы попробовали один из способов для создания построителя потока. Но есть и другие способы создать поток.

Создание потока

Сопрограммы могут преобразовать коллекцию Kotlin в коллекцию flow, которая выдает значения того же типа. Итак, наша функция, отправляющая числа Фибоначчи, может быть записана так:

fun sendFibonacciNumbers(): Flow<Int> { 
    return listOf (0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55).asFlow()
}

Итак, мы создаем список чисел Фибоначчи и преобразуем его в flow используемую функцию asFlow(). Если вы перейдете к реализации этой функции, вы заметите, что это не более чем функция расширения Iterable, которая выполняет итерацию коллекции и выдает все значения. Это так просто. Вот исходный код функции asFlow():

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

Другой способ создать flow – использовать функцию flowOf(vararg elements: T), которая будет испускать все элементы, переданные в качестве параметра. Итак, в нашем коде функция sendFibonacciNumbers() будет:

fun sendFibonacciNumbers() = 
    flowOf (0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55)

Конечно, вы можете указать тип параметров, все элементы будут выведены в файле flow.

Свойства потока

В этом разделе мы поговорим о двух важных свойствах потоков в сопрограмме.

Первое свойство потоков состоит в том, что они являются Cold. Это означает, что поток никогда не начнет выдавать значение, пока один наблюдатель не подпишется на этот поток. Давайте сделаем пример, в котором мы создаем поток, печатаем сообщение, как только поток начинает выдавать значения. с другой стороны, мы собираемся создать сопрограмму, которая будет получать поток в переменной, но мы подпишемся на этот поток только через 2 секунды, просто чтобы убедиться, что поток не будет выполняться, пока на него не подпишутся.

fun main() {
    runBlocking {
        println("Запуск сопрограммы")
        val colorFlow = sendColorFlow()
        delay(2000)
        println("Подписка на цветовой поток")
        colorFlow.collect{
            println("Цвет $it")
        }
    }
}

fun sendColorFlow() = flow {
    println("Начало выдачи значений")
    listOf("красный", "зеленый", "синий").forEach {
        emit(it)
    }
}
/* 
Вывод на консоль:
Запуск сопрограммы
// Задержка 2 с
Подписка на цветовой поток 
Начало выдачи значений 
Цвет красный 
Цвет зеленый 
Цвет синий
*/

Холодные потоке никогда не будут выдавать значения, пока наблюдатель не подпишется на этот поток.

Другое свойство состоит в том, что flow не может быть отменен сам по себе. Он будет отменен, если будет отменена охватывающая сопрограмма. Но flow прозрачен для отмены. Давайте запустим пример, чтобы увидеть, как поток будет отменен. Мы будем выдавать значения с задержкой в ​​500 миллисекунд между каждым значением, и соберём эти значения в сопрограмме, которая будет работать всего одну секунду.

fun main() { 
    runBlocking { 
        println ("Поток еще не запущен!") 
        println ("Запускаем поток сейчас") 
        withTimeoutOrNull(1000) { 
            sendNumbersFlow ().collect { 
                println ("Мы получили значение: $it ") 
            } 
        } 
    }
 } 

fun sendNumbersFlow() = flow { 
    println ("Начало выдачи значений") 
    listOf (1, 2, 3, 4, 5). forEach {
         delay(300) 
        emit(it) 
    } 
}
/* 
Вывод на консоль:
Поток еще не запущен!
Запускаем поток сейчас
Начало выдачи значений
Мы получили значение: 1
Мы получили значение: 2
Мы получили значение: 3
*/

Как мы видим здесь, поток выдал первые три значения, а затем был отменен, потому что сопрограмма, подписанная на этот поток, была завершена.

Операторы

Зная, как создавать потоки и как мы собираем испускаемые значения, давайте посмотрим, как мы можем манипулировать данными, испускаемыми потоками. Для этого мы будем использовать operators. Если вы знакомы с Rx, вы, вероятно, имеете представление об операторах и о том, как они могут преобразовывать поток. Для потоков та же концепция. Мы не будем говорить обо всех операторах, их много, мы просто объясним, как мы можем их использовать, и представим примеры некоторых из операторов, которые мы можем применить к потокам.

Применение оператора к потоку не означает, что мы подписаны на этот поток! Нам просто нужно вызывать функцию collect().

Map

map оператор — это функция, применяемая к каждому значению, выдаваемому, flow потоком и возвращающая результат этой функции, который будет выдан вместо начального значения. Давайте приведём пример, когда у нас есть поток, который выдает целочисленное значение, и мы хотим получить квадрат этих чисел.

fun main() { 
    runBlocking {
         sendSquareNumbersFlow(). collect { 
            println ("Значение: $it ") 
        } 
    }
 } 

fun sendSquareNumbersFlow() = listOf (1, 2, 3, 4, 5) 
    .asFlow()
    .map { it * it } //Здесь мы вызываем оператор map

Filter

Оператор filter фильтрует испускаемое значение на основе функции предиката и возвращает только те значения, которые соответствуют этому условию. Предположим, у нас есть поток, который выдает числа, и нам нужны четные числа. Мы применяем оператор фильтра к этому потоку с функцией предиката, которая вернет true, если это число четное.

fun main() {
    runBlocking {
        sendEvenNumbersFlow().collect {
            println("Значение: $it")
        }
    }
}

fun sendEvenNumbersFlow() = (1..10)
    .asFlow()
    .filter { it % 2 == 0 } // Применить оператор фильтра с функцией предиката

Transform

Это общий оператор, он позволяет нам преобразовывать наше значение во что угодно. Давайте увидим это на следующем примере:

fun main() {
    runBlocking {
        transformOperator()
    }
}

suspend fun transformOperator() = (1..10)
    .asFlow()
    .transform {
        emit("Испускаемое значение $it")
        emit("Квадрат $it равен ${it * it}")
    }
    .collect {
        println(it)
    }

Как видите, в операторе преобразования мы выдаем два значения, оба являются строковыми значениями, содержащими сообщение.

Take

Как следует из названия, этот оператор берет n первых значений из потока flow. У take оператора есть целочисленный параметр, указывающий, сколько значений мы хотели бы взять из этого flow.

fun main() {
    runBlocking {
        takeOperator()
    }
}

suspend fun takeOperator() = (1..10)
    .asFlow()
    .take(3) // Берем 3 значения
    .collect {
        println(it)
    }

В этом примере мы возьмем только первые три значения из этого flow.

Reduce

Оператор reduce накапливает значение, начиная с первого элемента, и применяет операцию к текущему значению аккумулятора и каждому элементу. Он выдает NoSuchElementException , если поток пуст. В качестве примера вычислим факториал заданного числа. Факториал числа – это произведение всех чисел от 1 до n.

fun main() {
    runBlocking {
        val number = 4
        val factorial = reduceOperator(number)
        println("Факториал $number равен $factorial")
    }
}

suspend fun reduceOperator(n: Int) = (1..n)
    .asFlow()
    .reduce { accumulator, value ->
        accumulator * value
    }

reduce терминальный оператор, это означает, что он возвращает конечное значение, а не преобразованное, flow как другие операторы. Есть некоторые другие терминальные операторы, такие как toList и toSet, которые преобразуют поток в List или Set подобие.

Buffering (Буферизация)

Представьте, что у нас есть flow поток с очень быстро испускаемыми значениями, и обработка этих значений для наблюдателя занимает много времени. Это означает, что flow необходимо дождаться, пока наблюдатель снова будет готов, и собрать испускаемые значения, которые вызывают пустую трату времени для потока. В этом случае мы можем использовать то, что мы называем буфером, для накопления значений из нашего потока, пока наблюдатель не будет готов. Потом снова взять значения и обработать их, но на этот раз из буфера. Возьмем в качестве примера flow, который выдает значение каждые 100 мс и для обработки этого значения требуется 300 мс. Поток будет ожидать после каждого отправленного значения. Давайте напишем эту программу и измерим время, необходимое для обработки всех значений.

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            generateValues()
                .collect {
                    delay(300)
                    println("Значение $it")
                }
        }
        println("Поток обрабатывается за $time ms")
    }
}

fun generateValues() = flow {
    (1..3).forEach {
        delay(100)
        emit(it)
    }
}
/* 
Вывод на консоль:
Значение 1 
Значение 2 
Значение 3 
Поток обрабатывается за 1235 мс
*/

Теперь добавим a buffer и посмотрим разницу во времени. Чтобы добавить буфер, просто вызовите функцию в своем потоке следующим образом:

generateValues()
    .buffer() // Здесь мы добавили буфер в наш поток
    .collect {
        delay(300)
        println("Значение $it")
    }
/* 
Вывод на консоль:
Значение 1 
Значение 2 
Значение 3 
Поток обрабатывается за 1081 мс
*/

Значение, которое нас интересует — это общее время выполнения, которое составляет 1081 мс. Помните, что без использования буфера это было 1235 мс, это почти на 20% быстрее. Но что произошло, когда мы добавили буфер?

Сравнение диаграмм между выполнением без буфера и после добавления буфера

Как мы видим на этой диаграмме, слева, когда мы запускаем эту программу без буфера, flow ожидает после каждого испускаемого значения до тех пор, пока оно не будет обработано наблюдателем, что означает, что значения 2 и 3 задерживаются, и это вызывает пустую трату времени. На варианте с буфером (справа) в flow продолжается передача данных, и буфер собирает эти данные до тех пор, пока этот наблюдатель не будет готов, и поток не завершит передачу, а наблюдатель обработает данные, как только они будут готовы, и соберет их из буфера.

Вы можете определить максимальное количество значений, которые вы можете буферизовать, просто поместив число в качестве параметра для функции буфера .buffer(max).

Составление потока

В некоторых случаях вам может понадобиться составить несколько потоков вместе, если у вас есть несколько источников данных, и вы хотите создать новый flow, составленный из этих потоков. На самом деле некоторые операторы позволяют составить flow из нескольких исходящих потоков. Давайте посмотрим на некоторых из этих операторов.

Zip

Если вы знакомы с Rx, zip в сопрограммах делает то же самое. Он берет два реактивных потока, объединяет их вместе и возвращает один поток, состоящий из этих данных, применяя функцию, которую вы пишете для создания нового потока. Возьмем для примера два потока, первый выдает названия цветов, второй выдает код цвета. Мы собираемся применить zip оператор к этим двум потокам и выдать значение, которое соответствует цветовому коду и названию цвета.

fun main() {
    runBlocking {
        zipColor()
            .collect {
                println(it)
            }
    }
}

suspend fun zipColor(): Flow<String> {
    val colorName = flowOf("красный", "зеленый", "синий")
    val colorCode = flowOf("FF0000", "00FF00", "0000FF")

    return colorName.zip(colorCode) { name, code ->
        "$name код $code"
    }
}
/* 
Вывод на консоль:
красный код FF0000 
зеленый код 00FF00 
синий код 0000FF
*/

После применения zip оператора к двум потокам мы собираем испускаемые данные, которые будут одним потоком, соответствующим этим двум потокам.

Combine

Этот оператор позволяет нам объединить последнее значение одного потока с последним значением второго потока. Таким образом, в основном он берет последнее значение каждого потока и объединяет их. Эта небольшая программа позволит нам лучше понять, как работает combine.

fun main() {
    runBlocking {
        combineOperator()
            .collect {
                println(it)
            }
    }
}

suspend fun combineOperator(): Flow<String> {
    val numbers = (1..5).asFlow()
        .onEach { delay(500) }
    val values = flowOf("Один", "Два", "Три", "Четыре", "Пять")
        .onEach { delay(400) }

    return numbers.combine(values) { n, v -> "$n -> $v" }
}
/* 
Вывод на консоль:
1 -> Один 
1 -> Два 
2 -> Два 
2 -> Три 
3 -> Три 
3 -> Четыре 
4 -> Четыре 
4 -> Пять 
5 -> Пять
*/

Покажу диаграмму, чтобы объяснить, как работает оператор combine:

Оператор Combine на диаграмме

Обработка исключений

При работе с потоками в любой момент могут возникнуть исключения, и мы должны их перехватывать их. На самом деле есть три способа, которые позволяют нам поймать эти исключения.

Try/Catch

Мы можем использовать классический блок try/catch для перехвата исключений:

fun main() {
    runBlocking {
        try {
            generateExceptionFlow()
                .collect {
                    println(it)
                }
        } catch (e: Exception) {
            println("Исключение ${e.message}")
        }
    }
}

suspend fun generateExceptionFlow(): Flow<Int> {
    return (1..3).asFlow()
        .onEach { check(it != 2) }
}

Для следующих операторов мы будем использовать ту же функцию generateExceptionFlow(). Посмотрим, как мы можем перехватить исключение по-другому.

Catch

Оператор catch, позволяет нам поймать исключение.

fun main() {
    runBlocking {
        generateExceptionFlow()
            .catch { println("Исключение ${it.message}") }
            .collect {
                println(it)
            }
    }
}

Запуск этого кода перехватит исключение. Если исключение было выброшено после этого оператора, оно не будет перехвачено! Вы можете использовать столько catch операторов, сколько хотите, это позволит вам определить, как обрабатывать исключение в каждом месте, где оно возникает.

onCompletion

fun main() {
    runBlocking {
        generateExceptionFlow()
            .onCompletion { e ->
                if (e == null) {
                    println("Программа успешно завершена")
                } else {
                    println("Программа завершена с исключением ${e.message}")
                }
            }
            .catch { e ->
                println("Исключение $e")
            }
            .collect {
                println(it)
            }
    }
}

Так что, если передаваемый объект onCompletion имеет значение null, это означает, что исключения не было. В противном случае мы получим исключение в onCompletion. nCompletion не ловит исключение! Вы всегда должны добавлять оператор catch, чтобы поймать его.

Flows в Android

В Android есть некоторые фреймворки, поддерживающие сопрограммы, такие как Room или Retrofit. Вы можете вернуть flow вместо коллекции, когда у вас есть список данных, и управлять им как реактивным потоком.

Допустим, у вас есть сетевой вызов, который возвращает список сообщений. Что вы можете сделать в вашем NetworkService Api? Скажем, у вас есть метод для получения этих сообщений и возврата списка сообщений. Это можно записать так:

@GET("posts/")
suspend fun fetchPosts(): Flow<List<Post>>

Вы должны добавить ключевое слово suspend для запуска этой функции внутри сопрограммы. То же самое для Room, в ваших классах DAO вы можете вернуть поток вместо коллекции или объекта.

@Query("SELECT * FROM posts")
suspend fun getAllPosts(): Flow<List<User>>

Вы также можете передавать combine flow значения из сетевого источника вместе с flow исходящими значениями из базы данных и возвращать flow значение с последними обновленными данными. В Android мы делаем это в repository, который внедряем в нашу ViewModels.

После применения этих изменений в ваших DAO и сетевых вызовах, давайте перейдем к ViewModel.Вы можете взять комбинированное flow в своем репозитории и преобразовать его в LiveData, просто применив функцию asLiveData(). Делая это, вы будете получать любые обновления в своем пользовательском интерфейсе без всего шаблонного кода и с высоким уровнем абстракции между слоями.

Поделись с друзьями:
Если вам понравилась статья, подписывайтесь на наши социальные сети.

Оставьте комментарий

четыре × 5 =