Observables

可观察对象(Observable)

Observables provide support for passing messages between publishers and subscribers in your application. Observables offer significant benefits over other techniques for event handling, asynchronous programming, and handling multiple values.

可观察对象支持在应用中的发布者和订阅者之间传递消息。 在需要进行事件处理、异步编程和处理多个值的时候,可观察对象相对其它技术有着显著的优点。

Observables are declarative—that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.

可观察对象是声明式的 —— 也就是说,虽然你定义了一个用于发布值的函数,但是在有消费者订阅它之前,这个函数并不会实际执行。 订阅之后,当这个函数执行完或取消订阅时,订阅者就会收到通知。

An observable can deliver multiple values of any type—literals, messages, or events, depending on the context. The API for receiving values is the same whether the values are delivered synchronously or asynchronously. Because setup and teardown logic are both handled by the observable, your application code only needs to worry about subscribing to consume values, and when done, unsubscribing. Whether the stream was keystrokes, an HTTP response, or an interval timer, the interface for listening to values and stopping listening is the same.

可观察对象可以发送多个任意类型的值 —— 字面量、消息、事件。无论这些值是同步发送的还是异步发送的,接收这些值的 API 都是一样的。 由于准备(setup)和清场(teardown)的逻辑都是由可观察对象自己处理的,因此你的应用代码只管订阅并消费这些值就可以了,做完之后,取消订阅。无论这个流是击键流、HTTP 响应流还是定时器,对这些值进行监听和停止监听的接口都是一样的。

Because of these advantages, observables are used extensively within Angular, and are recommended for app development as well.

由于这些优点,可观察对象在 Angular 中得到广泛使用,也同样建议应用开发者好好使用它。

Basic usage and terms

基本用法和词汇

As a publisher, you create an Observable instance that defines a subscriber function. This is the function that is executed when a consumer calls the subscribe() method. The subscriber function defines how to obtain or generate values or messages to be published.

作为发布者,你创建一个 Observable 的实例,其中定义了一个订阅者(subscriber)函数。 当有消费者调用 subscribe() 方法时,这个函数就会执行。 订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。

To execute the observable you have created and begin receiving notifications, you call its subscribe() method, passing an observer. This is a JavaScript object that defines the handlers for the notifications you receive. The subscribe() call returns a Subscription object that has an unsubscribe() method, which you call to stop receiving notifications.

要执行所创建的可观察对象,并开始从中接收通知,你就要调用它的 subscribe() 方法,并传入一个观察者(observer)。 这是一个 JavaScript 对象,它定义了你收到的这些消息的处理器(handler)。 subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。 当调用该方法时,你就会停止接收通知。

Here's an example that demonstrates the basic usage model by showing how an observable could be used to provide geolocation updates.

下面这个例子中示范了这种基本用法,它展示了如何使用可观察对象来对当前地理位置进行更新。

// Create an Observable that will start listening to geolocation updates // when a consumer subscribes. const locations = new Observable((observer) => { // Get the next and error callbacks. These will be passed in when // the consumer subscribes. const {next, error} = observer; let watchId; // Simple geolocation API check provides values to publish if ('geolocation' in navigator) { watchId = navigator.geolocation.watchPosition(next, error); } else { error('Geolocation not available'); } // When the consumer unsubscribes, clean up data ready for next subscription. return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }}; }); // Call subscribe() to start listening for updates. const locationsSubscription = locations.subscribe({ next(position) { console.log('Current Position: ', position); }, error(msg) { console.log('Error Getting Location: ', msg); } }); // Stop listening for location after 10 seconds setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);

Defining observers

定义观察者

A handler for receiving observable notifications implements the Observer interface. It is an object that defines callback methods to handle the three types of notifications that an observable can send:

用于接收可观察对象通知的处理器要实现 Observer 接口。这个对象定义了一些回调函数来处理可观察对象可能会发来的三种通知:

Notification type通知类型Description说明
nextRequired. A handler for each delivered value. Called zero or more times after execution starts.必要。用来处理每个送达值。在开始执行后可能执行零次或多次。
errorOptional. A handler for an error notification. An error halts execution of the observable instance.可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。
completeOptional. A handler for the execution-complete notification. Delayed values can continue to be delivered to the next handler after execution is complete.可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。

An observer object can define any combination of these handlers. If you don't supply a handler for a notification type, the observer ignores notifications of that type.

观察者对象可以定义这三种处理器的任意组合。如果你不为某种通知类型提供处理器,这个观察者就会忽略相应类型的通知。

Subscribing

订阅

An Observable instance begins publishing values only when someone subscribes to it. You subscribe by calling the subscribe() method of the instance, passing an observer object to receive the notifications.

只有当有人订阅 Observable 的实例时,它才会开始发布值。 订阅时要先调用该实例的 subscribe() 方法,并把一个观察者对象传给它,用来接收通知。

In order to show how subscribing works, we need to create a new observable. There is a constructor that you use to create new instances, but for illustration, we can use some methods from the RxJS library that create simple observables of frequently used types:

为了展示订阅的原理,我们需要创建新的可观察对象。它有一个构造函数可以用来创建新实例,但是为了更简明,也可以使用 Observable 上定义的一些静态方法来创建一些常用的简单可观察对象:

  • of(...items)—Returns an Observable instance that synchronously delivers the values provided as arguments.

    of(...items) —— 返回一个 Observable 实例,它用同步的方式把参数中提供的这些值发送出来。

  • from(iterable)—Converts its argument to an Observable instance. This method is commonly used to convert an array to an observable.

    from(iterable) —— 把它的参数转换成一个 Observable 实例。 该方法通常用于把一个数组转换成一个(发送多个值的)可观察对象。

Here's an example of creating and subscribing to a simple observable, with an observer that logs the received message to the console:

下面的例子会创建并订阅一个简单的可观察对象,它的观察者会把接收到的消息记录到控制台中:

// Create simple observable that emits three values const myObservable = of(1, 2, 3); // Create observer object const myObserver = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), }; // Execute with the observer object myObservable.subscribe(myObserver); // Logs: // Observer got a next value: 1 // Observer got a next value: 2 // Observer got a next value: 3 // Observer got a complete notification

Alternatively, the subscribe() method can accept callback function definitions in line, for next, error, and complete handlers. For example, the following subscribe() call is the same as the one that specifies the predefined observer:

另外,subscribe() 方法还可以接收定义在同一行中的回调函数,无论 nexterror 还是 complete 处理器。比如,下面的 subscribe() 调用和前面指定预定义观察者的例子是等价的。

myObservable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification') );

In either case, a next handler is required. The error and complete handlers are optional.

无论哪种情况,next 处理器都是必要的,而 errorcomplete 处理器是可选的。

Note that a next() function could receive, for instance, message strings, or event objects, numeric values, or structures, depending on context. As a general term, we refer to data published by an observable as a stream. Any type of value can be represented with an observable, and the values are published as a stream.

注意,next() 函数可以接受消息字符串、事件对象、数字值或各种结构,具体类型取决于上下文。 为了更通用一点,我们把由可观察对象发布出来的数据统称为。任何类型的值都可以表示为可观察对象,而这些值会被发布为一个流。

Creating observables

创建可观察对象

Use the Observable constructor to create an observable stream of any type. The constructor takes as its argument the subscriber function to run when the observable’s subscribe() method executes. A subscriber function receives an Observer object, and can publish values to the observer's next() method.

使用 Observable 构造函数可以创建任何类型的可观察流。 当执行可观察对象的 subscribe() 方法时,这个构造函数就会把它接收到的参数作为订阅函数来运行。 订阅函数会接收一个 Observer 对象,并把值发布给观察者的 next() 方法。

For example, to create an observable equivalent to the of(1, 2, 3) above, you could do something like this:

比如,要创建一个与前面的 of(1, 2, 3) 等价的可观察对象,你可以这样做:

// This function runs when subscribe() is called function sequenceSubscriber(observer) { // synchronously deliver 1, 2, and 3, then complete observer.next(1); observer.next(2); observer.next(3); observer.complete(); // unsubscribe function doesn't need to do anything in this // because values are delivered synchronously return {unsubscribe() {}}; } // Create a new Observable that will deliver the above sequence const sequence = new Observable(sequenceSubscriber); // execute the Observable and print the result of each notification sequence.subscribe({ next(num) { console.log(num); }, complete() { console.log('Finished sequence'); } }); // Logs: // 1 // 2 // 3 // Finished sequence

To take this example a little further, we can create an observable that publishes events. In this example, the subscriber function is defined inline.

如果要略微加强这个例子,我们可以创建一个用来发布事件的可观察对象。在这个例子中,订阅函数是用内联方式定义的。

function fromEvent(target, eventName) { return new Observable((observer) => { const handler = (e) => observer.next(e); // Add the event handler to the target target.addEventListener(eventName, handler); return () => { // Detach the event handler from the target target.removeEventListener(eventName, handler); }; }); }

Now you can use this function to create an observable that publishes keydown events:

现在,你就可以使用这个函数来创建可发布 keydown 事件的可观察对象了:

const ESC_KEY = 27; const nameInput = document.getElementById('name') as HTMLInputElement; const subscription = fromEvent(nameInput, 'keydown') .subscribe((e: KeyboardEvent) => { if (e.keyCode === ESC_KEY) { nameInput.value = ''; } });

Multicasting

多播

A typical observable creates a new, independent execution for each subscribed observer. When an observer subscribes, the observable wires up an event handler and delivers values to that observer. When a second observer subscribes, the observable then wires up a new event handler and delivers values to that second observer in a separate execution.

典型的可观察对象会为每一个观察者创建一次新的、独立的执行。 当观察者进行订阅时,该可观察对象会连上一个事件处理器,并且向那个观察者发送一些值。当第二个观察者订阅时,这个可观察对象就会连上一个新的事件处理器,并独立执行一次,把这些值发送给第二个可观察对象。

Sometimes, instead of starting an independent execution for each subscriber, you want each subscription to get the same values—even if values have already started emitting. This might be the case with something like an observable of clicks on the document object.

有时候,不应该对每一个订阅者都独立执行一次,你可能会希望每次订阅都得到同一批值 —— 即使是那些你已经发送过的。这在某些情况下有用,比如用来发送 document 上的点击事件的可观察对象。

Multicasting is the practice of broadcasting to a list of multiple subscribers in a single execution. With a multicasting observable, you don't register multiple listeners on the document, but instead re-use the first listener and send values out to each subscriber.

多播用来让可观察对象在一次执行中同时广播给多个订阅者。借助支持多播的可观察对象,你不必注册多个监听器,而是复用第一个(next)监听器,并且把值发送给各个订阅者。

When creating an observable you should determine how you want that observable to be used and whether or not you want to multicast its values.

当创建可观察对象时,你要决定你希望别人怎么用这个对象以及是否对它的值进行多播。

Let’s look at an example that counts from 1 to 3, with a one-second delay after each number emitted.

来看一个从 1 到 3 进行计数的例子,它每发出一个数字就会等待 1 秒。

function sequenceSubscriber(observer) { const seq = [1, 2, 3]; let timeoutId; // Will run through an array of numbers, emitting one value // per second until it gets to the end of the array. function doSequence(arr, idx) { timeoutId = setTimeout(() => { observer.next(arr[idx]); if (idx === arr.length - 1) { observer.complete(); } else { doSequence(arr, ++idx); } }, 1000); } doSequence(seq, 0); // Unsubscribe should clear the timeout to stop execution return {unsubscribe() { clearTimeout(timeoutId); }}; } // Create a new Observable that will deliver the above sequence const sequence = new Observable(sequenceSubscriber); sequence.subscribe({ next(num) { console.log(num); }, complete() { console.log('Finished sequence'); } }); // Logs: // (at 1 second): 1 // (at 2 seconds): 2 // (at 3 seconds): 3 // (at 3 seconds): Finished sequence

Notice that if you subscribe twice, there will be two separate streams, each emitting values every second. It looks something like this:

注意,如果你订阅了两次,就会有两个独立的流,每个流都会每秒发出一个数字。代码如下:

// Subscribe starts the clock, and will emit after 1 second sequence.subscribe({ next(num) { console.log('1st subscribe: ' + num); }, complete() { console.log('1st sequence finished.'); } }); // After 1/2 second, subscribe again. setTimeout(() => { sequence.subscribe({ next(num) { console.log('2nd subscribe: ' + num); }, complete() { console.log('2nd sequence finished.'); } }); }, 500); // Logs: // (at 1 second): 1st subscribe: 1 // (at 1.5 seconds): 2nd subscribe: 1 // (at 2 seconds): 1st subscribe: 2 // (at 2.5 seconds): 2nd subscribe: 2 // (at 3 seconds): 1st subscribe: 3 // (at 3 seconds): 1st sequence finished // (at 3.5 seconds): 2nd subscribe: 3 // (at 3.5 seconds): 2nd sequence finished

Changing the observable to be multicasting could look something like this:

修改这个可观察对象以支持多播,代码如下:

function multicastSequenceSubscriber() { const seq = [1, 2, 3]; // Keep track of each observer (one for every active subscription) const observers = []; // Still a single timeoutId because there will only ever be one // set of values being generated, multicasted to each subscriber let timeoutId; // Return the subscriber function (runs when subscribe() // function is invoked) return (observer) => { observers.push(observer); // When this is the first subscription, start the sequence if (observers.length === 1) { timeoutId = doSequence({ next(val) { // Iterate through observers and notify all subscriptions observers.forEach(obs => obs.next(val)); }, complete() { // Notify all complete callbacks observers.slice(0).forEach(obs => obs.complete()); } }, seq, 0); } return { unsubscribe() { // Remove from the observers array so it's no longer notified observers.splice(observers.indexOf(observer), 1); // If there's no more listeners, do cleanup if (observers.length === 0) { clearTimeout(timeoutId); } } }; }; } // Run through an array of numbers, emitting one value // per second until it gets to the end of the array. function doSequence(observer, arr, idx) { return setTimeout(() => { observer.next(arr[idx]); if (idx === arr.length - 1) { observer.complete(); } else { doSequence(observer, arr, ++idx); } }, 1000); } // Create a new Observable that will deliver the above sequence const multicastSequence = new Observable(multicastSequenceSubscriber()); // Subscribe starts the clock, and begins to emit after 1 second multicastSequence.subscribe({ next(num) { console.log('1st subscribe: ' + num); }, complete() { console.log('1st sequence finished.'); } }); // After 1 1/2 seconds, subscribe again (should "miss" the first value). setTimeout(() => { multicastSequence.subscribe({ next(num) { console.log('2nd subscribe: ' + num); }, complete() { console.log('2nd sequence finished.'); } }); }, 1500); // Logs: // (at 1 second): 1st subscribe: 1 // (at 2 seconds): 1st subscribe: 2 // (at 2 seconds): 2nd subscribe: 2 // (at 3 seconds): 1st subscribe: 3 // (at 3 seconds): 1st sequence finished // (at 3 seconds): 2nd subscribe: 3 // (at 3 seconds): 2nd sequence finished
Multicasting observables take a bit more setup, but they can be useful for certain applications. Later we will look at tools that simplify the process of multicasting, allowing you to take any observable and make it multicasting.

虽然支持多播的可观察对象需要做更多的准备工作,但对某些应用来说,这非常有用。稍后我们会介绍一些简化多播的工具,它们让你能接收任何可观察对象,并把它变成支持多播的。

Error handling

错误处理

Because observables produce values asynchronously, try/catch will not effectively catch errors. Instead, you handle errors by specifying an error callback on the observer. Producing an error also causes the observable to clean up subscriptions and stop producing values. An observable can either produce values (calling the next callback), or it can complete, calling either the complete or error callback.

由于可观察对象会异步生成值,所以用 try/catch 是无法捕获错误的。你应该在观察者中指定一个 error 回调来处理错误。发生错误时还会导致可观察对象清理现有的订阅,并且停止生成值。可观察对象可以生成值(调用 next 回调),也可以调用 completeerror 回调来主动结束。

myObservable.subscribe({ next(num) { console.log('Next num: ' + num)}, error(err) { console.log('Received an errror: ' + err)} });

Error handling (and specifically recovering from an error) is covered in more detail in a later section.

在稍后的小节中会对错误处理(特别是从错误中的恢复)做更详细的讲解。