RxJS中的调度程序

您对RxJS中的Scheduler有什么了解?他们向开发人员隐藏了带有Observable执行上下文的工作。就像那些来自哈利波特的家养小精灵一样,他们在霍格沃茨从事所有肮脏的工作,甚至没人听说过。让我们对其进行修复,并进一步了解它们。

什么是调度程序

Scheduler Observable Observer. ( )

, Scheduler Observable. , .

import { of } from "rxjs";

console.log("Start");
of("Observable").subscribe(console.log);
console.log("End");

// Logs:
// Start
// Observable
// End

console.log Observable . . , Observable , observeOn Scheduler.

import { asyncScheduler, of } from "rxjs";
import { observeOn } from "rxjs/operators";

console.log("Start");
of("Observable")
  .pipe(observeOn(asyncScheduler))
  .subscribe(console.log);
console.log("End");

// Logs:
// Start
// End
// Observable

StackBlitz

Observable . . , setTimeout(…, 0), ?

, asyncScheduler . Schedulers. .

Schedulers

Schedulers , Event Loop JavaScript. .

, :

  1. (callstack)

  2. (Promise)

  3. (setTimeout, setInterval, XMLHttpRequest ..).

  4. , . (requestAnimationFrame)

RxJS Scheduler :

queueScheduler

asapScheduler

asyncScheduler

animationFrameScheduler

VirtualTimeScheduler  TestScheduler, . .

.

import { of, merge, asapScheduler,asyncScheduler,queueScheduler, animationFrameScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";

const async$ = of("asyncScheduler").pipe(observeOn(asyncScheduler));
const asap$ = of("asapScheduler").pipe(observeOn(asapScheduler));
const queue$ = of("queueScheduler").pipe(observeOn(queueScheduler));
const animationFrame$ = of("animationFrameScheduler").pipe(
  observeOn(animationFrameScheduler)
);
merge(async$, asap$, queue$, animationFrame$).subscribe(console.log);

console.log("synchronous code");

// Logs:
// queueScheduler
// synchronous code
// asapScheduler
// animationFrameScheduler
// asyncScheduler

StackBlitz

, "queueScheduler" , "synchronous code". "asapScheduler" "asyncScheduler", .

Schedulers

Scheduler observeOn subscribeOn. Scheduler, delay, .

import { of, asyncScheduler } from "rxjs";
import { observeOn, subscribeOn } from "rxjs/operators";

of("observeOn")
  .pipe(observeOn(asyncScheduler, 100))
  .subscribe(console.log);

of("subscribeOn")
  .pipe(subscribeOn(asyncScheduler, 50))
  .subscribe(console.log);

// Logs:
// subscribeOn
// observeOn

StackBlitz

, observeOn observer — next, error complete Scheduler . subscribeOn subscriber — subscribe .

, delay observeOn/subscribeOn, Scheduler, asyncScheduler. — observeOn(animationFrameScheduler, 100).

RxJS 6.5.0 Scheduler of, from, merge, range .. RxJS deprecated,   scheduled .

import { of, scheduled, asapScheduler } from 'rxjs';

// DEPRECATED
// of(2, asapScheduler).subscribe(console.log);

scheduled(of('scheduled'), asapScheduler).subscribe(console.log);

Scheduler

Schedulers RxJS, . , Scheduler , . . . :

const cache = new Map<number, any>();
function get(id: number): Observable<any> {
 if (cache.has(id)) {
   return of(cache.get(id));
 }
 return http.get(‘some-url\’ + id).pipe(
   tap(data => {
     cache.set(id, data);
   }),
 );
}

. . , , , - . , . , .

scheduled asyncScheduler 4 , .

return scheduled(of(cache.get(id)), asyncScheduler);

. , .

Schedulers . RxJS . . , Scheduler.

Schedulers . !




All Articles