Создание computedAsync для вычисления значений сигналов в Angular

Немного истории

Обработка асинхронных операций в Angular всегда была задачей Observables. Observables  —  это отличный способ обработки асинхронных операций. Однако появившиеся в Angular сигналы (Signals), которые многие пытаются использовать для всего подряд, не предназначены для обработки асинхронных операций. Сигналы предназначены для обработки значений, а не событий. Так как же обрабатывать асинхронные операции с помощью сигналов? Сейчас узнаете.

Мотивация

Возьмем простой пример вызова API. Допустим, есть компонент, который вызывает API и отображает данные. Для вызова API используется HttpClient, который возвращает Observable. Подписка на Observable позволит получить данные. Посмотрим, как это можно сделать, используя сигналы.

export class UserComponent {
private imagesService = inject(ImagesService);

user = input.required<User>();

favoriteImages = signal<string[]>([]);

constructor() {
effect(() => {
this.imagesService.getImages(this.user().favoriteImages).subscribe(images => {
this.favoriteImages.set(images);
});
});
}
}

Как видите, для применения встроенной реактивности используются также вводы сигналов.

Но отписка не выполняется. Поэтому надо обеспечить ее вручную. Функция effect включает колбэк-функцию, которая вызывается каждый раз при запуске effect. Можно использовать ее, чтобы отписаться от подписки.

Реализуем это следующим образом:

export class UserComponent {
constructor() {
// onCleanup - это колбэк-функция, которая вызывается каждый раз при запуске effect
effect((onCleanup) => {
const sub = this.imagesService.getImages(this.user().favoriteImages).subscribe(images => {
this.favoriteImages.set(images);
});
onCleanup(() => sub.unsubscribe()) // отписка от подписки
});
}
}

Вот краткое описание того, что происходит в приведенном выше коде.

  • Регистрируем функцию effect, которая будет запускаться при каждом изменении ввода user.
  • По умолчанию функция effect запускается хотя бы один раз, что обеспечит начальный вызов API.
  • Функция effect будет запускаться снова при каждом изменении ввода user.
  • При каждом повторном запуске effect функция onCleanup будет вызывать переданную ей колбэк-функцию.
  • Колбэк-функция отписывается от предыдущей подписки (подобно оператору switchMap в RxJS).
  • При возврате вызова API устанавливаем значение сигнала favoriteImages.

Проблема

Приведенный выше пример  —  попытка получить производное состояние на основе идентификаторов favoriteImages пользователя. Применение функции effect может оказаться не самым простым способом сделать это. Кроме того, можно просто забыть отписаться от подписки.

Альтернативное решение  —  использование вспомогательной функции toObservable.

export class UserComponent {
private imagesService = inject(ImagesService);

user = input.required<User>();

favoriteImages = toSignal(toObservable(this.user).pipe(
switchMap(user => this.imagesService.getImages(user.favoriteImages))
), { initialValue: [] });
}

Это решение лучше предыдущего, но и оно не идеально. Что, если добавится еще какой-нибудь ввод и нужно будет включить его в вызов API? Придется использовать оператор combineLatest.

export class UserComponent {
private imagesService = inject(ImagesService);

user = input.required<User>();
otherInput = input.required<string>();

favoriteImages = toSignal(combineLatest([
toObservable(this.user),
toObservable(this.otherInput)
]).pipe(
switchMap(([user, otherInput]) => this.imagesService.getImages(user.favoriteImages, otherInput))
), { initialValue: [] });
}

Код очень быстро становится запутанным! Приходится включать все больше и больше операторов RxJS и повсюду использовать toObservable и toSignal.

Есть более эффективное решение.

Создание computedAsync

Нужно, чтобы функция computedAsync вела себя так же, как функция computed, но при этом обрабатывала асинхронные операции. По сути, она должна возвращать сигнал, который будет иметь значение асинхронной операции.

favoriteImages = computedAsync(() =>
this.imagesService.getImages(this.user().favoriteImages)
);

Нужно возвращать Observable (или Promise), а функция computedAsync должна обрабатывать подписку и отписываться от нее.

Обработка колбэк-функции 

Нужно, чтобы разработчик мог передавать либо Observable, либо Promise, либо просто обычное значение. Поэтому необходимо обработать все эти случаи.

Вот возможные варианты:

type ComputationResult<T> = Promise<T> | Observable<T> | T | undefined;

Нам необходимо принять колбэк-функцию и вернуть сигнал. Итак, принимаем колбэк-функцию, которая возвращает ComputationResult<T>.

export function computedAsync<T>(
computation: () => ComputationResult<T>
): Signal<T> {
// ...
}

Обработка текущего значения и результата

Теперь нам нужно обработать текущее значение и вернуть результат вычислений. Можно использовать WritableSignal для обработки текущего значения и вычисляемый сигнал для возврата результата вычислений.

export function computedAsync<T>(
computation: () => ComputationResult<T>
): Signal<T> {
const sourceValue = signal<T | undefined>(undefined);
return computed(() => sourceValue()!);
}

Обработка вычислений

Поскольку вычисления будут включать сигналы, а единственный способ прослушать изменения сигналов  —  применить функцию effect, будем использовать effect для обработки вычислений.

import { isObservable } from 'rxjs';

export function computedAsync<T>(
computation: () => ComputationResult<T>
): Signal<T> {
const sourceValue = signal<T | undefined>(undefined);

effect(() => {
const value = computation(); // сохранение результата вычислений

// обработка результата, если это Observable, Promise или обычное значение
if (isObservable(value) || isPromise(value)) {
// Задача: обработка Observable и Promise
} else {
// Задача: обработка обычного значения
}
});

return computed(() => sourceValue()!);
}

// вспомогательная функция для проверки того, является ли значение Promise
function isPromise<T>(value: any): value is Promise<T> {
return value && typeof value.then === 'function';
}

Но effect зависит от токена DestroyRef, поэтому он должен быть в контексте инъекции. В противном случае надо передать ему инжектор. Разберемся с этим.

Обработка контекста инъекции

Будем использовать вспомогательную функцию assertInjector (созданную Чау Траном), предоставляемую библиотекой ngxtension.

Функция assertInjector проверит, есть ли инжектор. Если нет, то выдаст ошибку. В третьем аргументе можно передать колбэк-функцию, которая будет вызвана в контексте инъекции.

Создадим интерфейс ComputedAsyncOptions, который будет включать инжектор и функцию equal (которую также включает обычная функция computed).

interface ComputedAsyncOptions<T> extends CreateComputedOptions<T> {
injector?: Injector;
}

Теперь можем использовать функцию assertInjector.

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
// Здесь мы можем безопасно использовать effect и функцию инъекции, потому что находимся в контексте инъекции
effect(() => { /* ... */ }, { injector: options?.injector });
});
}

Обработка подписки

Так же, как сигнал sourceValue нужен для обработки текущего значения, Observable sourceEvent$ необходима для обработки подписки. Будем использовать Subject для такой обработки.

Нужно, чтобы значение sourceEvent$ было либо Promise, либо Observable.

const sourceEvent$ = new Subject<Promise<T> | Observable<T>>();

Подпишемся на sourceEvent$ и установим значение сигнала sourceValue.

Позаботимся также о разворачивании Observable, что необходимо для передачи Observable или Promise sourceEvent$. Будем использовать оператор switchMap для разворачивания Observable.

Здесь есть побочный эффект: switchMap отменит предыдущую подписку.

const sourceResult = sourceEvent$
.pipe(switchMap(s$ => s$))
.subscribe({
// установка значения сигнала sourceValue, когда source$ выдает значение
next: (value) => sourceValue.set(value),
error: (error) => {
// Примечание: ошибка должна быть обработана пользователем (с помощью catchError или .catch())
sourceValue.set(error);
}
});

switchMap(s$ => s$) можно заменить оператором switchAll().

const sourceResult = sourceEvent$
.pipe(switchAll())
.subscribe();

Очистка подписки

Поскольку мы подписались на Observable sourceEvent$, нужно отписаться от нее. Для этого можно использовать токен DestroyRef. DestroyRef обладает методом onDestroy, который вызовет переданную ему колбэк-функцию, когда текущий контекст инжектора будет удален (в данном случае  —  когда будет удален компонент).

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
const destroyRef = inject(DestroyRef);

const sourceEvent$ = new Subject<Promise<T> | Observable<T>>();

effect(() => { /* ... */ });

const sourceResult = source$.subscribe(/* ... */);

destroyRef.onDestroy(() => {
sourceResultSubcription.unsubscribe();
});
});
}

Вот и все! Мы подписываемся на получение значения и отписываемся, когда компонент удаляется.

Обработка возвращенных Observable или Promise в вычислениях

В настоящее время computedAsync выглядит следующим образом:

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
const destroyRef = inject(DestroyRef);

const sourceValue = signal<T | undefined>(undefined);

const sourceEvent$ = new Subject<Promise<T> | Observable<T>>();

effect(() => {
const value = computation(); // сохранение результата вычислений

// обработка результата, если это Observable, Promise или обычное значение
if (isObservable(value) || isPromise(value)) {
// Задача: обработка Observable и Promise
} else {
// Задача: обработка обычного значения
}
});

const sourceResult = sourceEvent$
.pipe(switchAll())
.subscribe({
next: (value) => sourceValue.set(value),
error: (error) => sourceValue.set(error)
});

destroyRef.onDestroy(() => {
sourceResultSubcription.unsubscribe();
});

return computed(() => sourceValue()!);
});
}

Разберемся с задачами в приведенном выше коде.

Сначала рассмотрим случай с нормальным значением. Нужно просто установить значение сигнала sourceValue.

effect(() => {
const value = computation(); // сохранение результата вычислений

// обработка результата, если это Observable, Promise или обычное значение
if (isObservable(value) || isPromise(value)) {
// Задача: обработка Observable и Promise
} else {
sourceValue.set(value);
}
});

Это приведет к ошибке, поскольку нельзя установить значение сигнала внутри effect, не включив предварительно соответствующую поддержку.

effect(() => {
// ...
}, { allowSignalWrites: true }) // разрешить записи сигнала

Но есть и другой способ решить эту проблему. Можно использовать функцию untracked, чтобы установить значение сигнала, не подключая его поддержку effect (действие будет то же самое, что и у приведенного выше кода).

Воспользуемся этим лайфхаком:

untracked(() => sourceValue.set(value));

Разберемся с Observable и Promise. Так же, как было установлено значение в сигнале, требуется next для Observable sourceEvent$.

effect(() => {
const value = computation(); // сохранение результата вычислений

// обработка результата, если это Observable, Promise или обычное значение
if (isObservable(value) || isPromise(value)) {
sourceEvent$.next(value);
} else {
untracked(() => sourceValue.set(value));
}
});

Если оставить все как есть, могут возникнуть проблемы.

Взгляните на этот пример:

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

someValue = signal<string>('');

favoriteImages = computedAsync(() => {
return this.imagesService.getImages(this.user().favoriteImages).pipe(
tap(() => this.someValue.set('some value'))
);
});
}

Сигнал someValue будет установлен внутри вычисления, которое находится в effect. По сути, значение сигнала устанавливается внутри effect. Это приведет к ошибке. Поэтому нужно к sourceEvent$.next() применить функцию untracked.

effect(() => {
const value = computation(); // сохранение результата вычислений

// обработка результата, если это Observable, Promise или обычное значение
if (isObservable(value) || isPromise(value)) {
untracked(() => sourceEvent$.next(value));
} else {
untracked(() => sourceValue.set(value));
}
});

Вот теперь функция computedAsync готова!

Передача начального значения

По умолчанию начальное значение сигнала sourceValue установлено в undefined. Но есть возможность передать начальное значение в функцию computedAsync.

interface ComputedAsyncOptions<T> extends CreateComputedOptions<T> {
initialValue?: T;
injector?: Injector;
}

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
// ...
const sourceValue = signal<T | undefined>(options?.initialValue ?? undefined);
// ...
});
}

Теперь можно передать начальное значение в функцию computedAsync.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() => {
return this.imagesService.getImages(this.user().favoriteImages);
}, { initialValue: [] });
}

Обработка условий гонки (опция behavior)

В настоящее время для обработки подписки используется только оператор switchAll, который отменяет предыдущие вызовы. Но разработчикам может понадобиться обеспечить другое поведение, для чего придется добавить опцию behavior в функцию computedAsync.

type ComputedAsyncBehavior = 'switch' | 'merge' | 'concat' | 'exhaust';

interface ComputedAsyncOptions<T> extends CreateComputedOptions<T> {
initialValue?: T;
injector?: Injector;
behavior?: ComputedAsyncBehavior;
}

Опция behavior может использоваться для обработки подписки.

Создадим функцию createFlattenObservable, которая будет обрабатывать оператор на основе опции behavior.

function createFlattenObservable<T>(
source: Subject<Promise<T> | Observable<T>>,
behavior: ComputedAsyncBehavior,
): Observable<T> {
const KEY_OPERATOR_MAP = {
merge: mergeAll,
concat: concatAll,
exhaust: exhaustAll,
switch: switchAll,
};

return source.pipe(KEY_OPERATOR_MAP[behavior]());
}

Теперь можно использовать функцию createFlattenObservable для обработки подписки.

const source$: Observable<T> = createFlattenObservable(
sourceEvent$,
options?.behavior ?? 'switch',
);

По умолчанию используем поведение switch, но можно передать и другое поведение.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() =>
this.imagesService.getImages(this.user().favoriteImages),
{ initialValue: [], behavior: 'merge' }
);
}

Поскольку операторы RxJS также поддерживают промисы, можно передать Promise в sourceEvent$, и он будет обработан так же, как Observable.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() =>
fetch(`https://localhost/api/images/${this.user().favoriteImages}`).then(res => res.json()),
{ initialValue: [], behavior: 'merge' }
);
}

Как использовать предыдущее значение в вычислениях?

Внутри функции effect можно получить текущее значение из сигнала sourceValue. Но при чтении сигнала внутри effect он регистрируется как зависимость. Поэтому нужно сначала применить к нему функцию untracked, а затем передать в функцию computation.

effect(() => {
const currentSourceValue = untracked(() => sourceValue());
const value = computation(currentSourceValue); // сохранение результата вычислений
// ...
});

Это позволяет задействовать предыдущее значение в вычислениях.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync((previousFavoriteImages) => {
if (previousFavoriteImages) { /* какие-либо действия */ }
return this.imagesService.getImages(this.user().favoriteImages);
},
{ initialValue: [], behavior: 'merge' });
}

Использование computedAsync из ngxtension

Функция computedAsync доступна в библиотеке ngxtension.

npm install ngxtension
# или
yarn add ngxtension

Импортировать computedAsync из библиотеки ngxtension можно следующим образом:

import { computedAsync } from 'ngxtension/computed-async';

А так можно ее использовать:

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() =>
this.imagesService.getImages(this.user().favoriteImages),
{ initialValue: [] }
);
}

Документация по computedAsync и ngxtension находится здесь.

Читайте также:

Читайте нас в Telegram, VK и Дзен


Перевод статьи Enea Jahollari: Building ComputedAsync for Signals in Angular

Предыдущая статьяSpring Boot, Kafka и WebSocket для отправки сообщений в реальном времени
Следующая статьяVIM — это не только скорость