异步流程控制

1什么是异步?
就是一个任务分成两段,先执行第一段,然后转而执行其他任务,等做好了准备,再回过头执行第二段(异步函数书写的时候不用”return”来返回值,必须通过回调函数来返回值)

//例1
var fs = require("fs");
var data = fs.readFileSync('text.txt')
console.log(data.toString());
console.log("程序执行结束!");
//执行结果:text, 程序执行结束
//例2
fs.readFile('text.txt', function (err, data) {
  if (err) return console.error(err);
  console.log(data.toString());
});
//执行结果:程序执行结束, text

以上代码可以看出例1是个同步函数,如果读取的text文件过大,会阻塞下面程序的执行,而例2会先读文件在执行其他程序,最后打印text的内容

1、callback

是一个通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用为调用它所指向的函数时,这就是回调函数。回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。
以读取文件为例:

var fs = require('fs')
function fun (file, callback) {
    fs.readFile(file, (err, data) => {
      if (err) {
        console.log(err)
      } else {
        callback(data.toString())
      }
    });
}
fun('text.txt', function(data1){
  fun('text1.txt', function(data2) {
    //dosomething
  })
})

从上面读取文件的例子可以看出callback函数最大多的问题在于陷入回调地狱,一层套一层的代码,可读性和维护性差

2、Stream

Stream(流)是很多I/0操作的抽象,被 Node 中的很多对象所实现。比如一个 HTTP 服务器的请求是一个流(可读流)(服务器的响应是一个流(可写流))。流是可读、可写或兼具两者的。所有流都是 EventEmitter 的实例。
stream有四种类型:
Readable,可读流
Writable,可写流
Duplex,读写流 (例:http://codewinds.com/blog/2013-08-31-nodejs-duplex-streams.html)
Transform,扩展的Duplex
以读取文件为例,传统方式是用fs.readFile,这种方法会将数据全部读入内存,读取效率低。通过stream的createReadStream会将文件数据切割成小块,写入特定的处理函数,会大大提高效率。此文主要讲解Readable和Writable。
例:

 var fs = require('fs');
 var readabler = fs.createReadStream(src) ; //创建可读流
 var writeabler = fs.createWriteStream(src) //创建可写流
 //readtable使用方式
 readabler.read('data', function (chunk) {
     rs.pause() ; //pause暂停执行
     doSomething(chunk, function () {
         rs.resume() ; //恢复读取
     }) ;
 }) ;
 readabler.on('end', function () {
     doSomething();
 })  ;
//writable使用方式
writeabler.write('text1', 'UTF8');
writeabler.on('finish', function() { 给writeabler增加监听事件
  console.log("写入完成。");
});
writeabler.end() //注意:end后不可再写入

下面是一个读取时写入的例子

 var fs = require('fs');
 var readableStream = fs.createReadStream(src) ;
 var writableStream = fs.createWriteStream(src);
 readableStream.on('data', function(chunk){
    if (writableStream.write(chunk, 'UTF8') === null);
    readableStream.pause() //此步骤的作用是当读取到的文件过大可能还没写完会造成文件丢失,故暂停读取
});
writableStream.on('drain', function() { // 监听是否读取的内容是否写完,写完后,继续读取
    readableStream.resume();
});

readableStream.on('end', function() { // 当没有数据时,关闭数据流
    writableStream.end();
});

上面的例子通过pipe可以轻易实现readableStream.pipe(writableStream)。pipe相当于文件交流的管道,可以控制文件的流向。

官网:http://nodejs.cn/api/stream#stream_stream

3、promise

Promise 构造函数包含一个参数和一个带有 resolve(解析)和reject(拒绝)两个参数的回调。在回调中执行一些操作(例如异步),如果一切都正常,则调用 resolve,否则调用 reject。Promise对象的状态改变,只有两种可能:从Pending变为Resolved和从Pending变为Rejected,且状态一旦改变就不会再变

Promise对象是一个构造函数,用来生成Promise实例

var promise = new Promise(function(resolve, reject) {
  // do a thing, possibly async, then…

  if (/* everything turned out fine */) {
    resolve("Stuff worked!"); //resolve在异步操作成功时调用,并将异步操作的结果,作为参数传递出去
  }
  else {
    reject(Error("It broke")); //reject在异步操作失败时调用,并将异步操作报出的错误,作为参数传递出去
  }
});

Promise实例生成以后,可以用then方法分别指定Resolved状态和Rejected状态的回调函数

promise.then(function(result) {
  console.log(result); // "Stuff worked!"
}, function(err) {
  console.log(err); // Error: "It broke"
});

then() 方法接受两个回调函数作为参数:一个用于成功情形的回调和一个用于失败情形的回调。这两个都是可选的,因此您可以只添加一个用于成功情形或失败情形的回调。

Promise 新建后就会立即执行

var promise = new Promise(function(resolve, reject) {
    console.log('start')
    resolve()
})
promise.then(function() {
    console.log('end')
})
console.log('hi')

// 'start' => 'hi' => 'end'

上面代码中,Promise 新建后立即执行,所以首先输出的是Promise。然后,then方法指定的回调函数将在当前脚本所有同步任务执行完才会执行,所以Resolved最后输出。

调用resolve或reject并不会终结 Promise 的参数函数的执行。

new Promise((resolve, reject) => {
  resolve(1);
  console.log(2);
}).then(r => {
  console.log(r);
});
// 2
// 1

上面代码中,调用resolve(1)以后,后面的console.log(2)还是会执行,并且会首先打印出来。这是因为立即 resolved 的 Promise 是在本轮事件循环的末尾执行,总是晚于本轮循环的同步任务。
Promise.prototype.then()
作用是为Promise实例添加状态改变时的回调函数。

Promise.prototype.then = function(sucess, fail) {
    this.done(sucess);
    this.fail(fail);
    return this;
};

Promise.prototype.catch()用于指定发生错误时的回调函数。如果Promise状态已经变成Resolved,在抛错是无效的
一般来说,调用resolve或reject以后,Promise 的使命就完成了,后继操作应该放到then方法里面,而不应该直接写在resolve或reject的后面。所以,最好在它们前面加上return语句,这样就不会有意外
Promise.all()用于将多个 Promise 实例,包装成一个新的 Promise 实例。。
,Promise.resolve(),Promise.reject()都是将非promise的对象转化为promise对象,其不同点在于生成对象实例的状态不同。

var p = Promise.reject('出错了');
var q = Promise.resolve('hello');
// 等同于
var p = new Promise((resolve, reject) => reject('出错了'))
var q = new Promise((resolve, reject) => resolve('hello'))
p.then(function(v){
  console.log(v)
  }, function (s) {
  console.log(s)
});

bluebird对promise的封装:http://bluebirdjs.com/docs/api-reference.html

4、Generator

参照网址:http://es6.ruanyifeng.com/#docs/generator
Generator函数形式上是一个普通函数,有两个特征。一是,function关键字与函数名之间有一个星号;二是,函数体内部使用yield表达式,定义不同的内部状态。执行 Generator函数会返回一个遍历器对象(value: value; done: false/true)value属性表示当前的内部状态的值,是yield表达式后面那个表达式的值;done属性是一个布尔值,表示是否遍历结束。

function *gen() {
    var msg = yield 'first msg'
}
var a = gen() //获取遍历器对象
a.next() //执行函数并返回{value: 'first msg', done: false}
a.next  //执行函数并返回{value: undefined, done: true}

注意,generator函数也可以不用yield表达式,此时该函数就是一个单纯的暂缓执行函数,yield表达式只能在generator函数中使用

function *gen() {
    console.log('first msg')
}
var a = gen()
a.next() //first msg; {value: undefined, done: true}

yield表达式本身没有返回值,或者说总是返回undefined。next方法可以带一个参数,该参数就会被当作上一个yield表达式的返回值

function *gen() {
    console.log('start')
    var msg = yield 'first msg'
    console.log('I am back and bring' + msg)
}
var a = gen()
a.next() // start; {value: 'first msg', done: false}
a.next('something from outside') // {value: 'I am back and bring something from outside', done: false}
a.next() // {value: undefined, done: true}

function *gen(x) {
    var y = 2 * (yeild(x + 1))
    var z = yeild(y * 2)
    return (x +y + z)
}
var a = gen(2) // x: 2; y: undefined; z: undefind
a.next() {value: 3, done: false}
a.next() {value: NAN, done: false}
a.next() {value: NAN, done: false}

a.next(3) {value: 12, done: false} //yeild(x + 1)被赋值为3, y=6;
a.next(4) {value: 12, done: false} //yeild(y * 2)被赋值为4, z=4;

Generator函数可以用return 方法给定返回的值,终结函数

function *gen() {
  yield 1;
  yield 2;
  yield 3;
}
var a = gen();
a.next()        // { value: 1, done: false }
a.return('first msg')    //{ value: 'first msg', done: true }

注意:return中如果没有参数,则返回的value为undefined。如果 Generator 函数内部有try...finally代码块,那么return方法会推迟到finally代码块执行完再执行
function *gen () {
  yield 1;
  try {
    yield 2;
  } finally {
    yield 3;
  }
  yield 4;
}
var a = gen();
a.next() // { value: 1, done: false }
a.next() // { value: 2, done: false }
a.return(5) // { value: 3, done: false }
a.next() // { value: 5, done: true }

如果在 Generator 函数内部,调用另一个Generator函数,默认情况下是没有效果的。需要加上yield*

function* foo() {
  yield 'a';
  yield 'b';
}

function* bar() {
  yield 'x';
  foo();
  yield 'y';
}

function* gen() {
  yield 1;
  yield* foo();
  yield 2;
}

for (var v of bar()){
  console.log(v); // 'x', 'y'
}
for (var v of gen()){
  console.log(v); // 1, 'a', 'b' 2
}

Generator 函数内部还可以部署错误处理代码,捕获函数体外抛出的错误

function* gen(x){
  try {
    var y = yield x + 2;
  } catch (e){
    console.log(e);
  }
  return y;
}

var g = gen(1);
g.next();
g.throw('出错了'

Thunk函数用于generator函数的流程管理,让其自动执行generator函数
co函数库其实就是将两种自动执行器(Thunk 函数和 Promise 对象),包装成一个库。使用 co 的前提条件是,Generator 函数的 yield 命令后面,只能是 Thunk 函数或 Promise 对象。
github地址:https://github.com/tj/co

5、async function

异步函数会返回一个promise对象。调用成功时会用promise的resolve方法来处理这个返回值,抛出异常或者非法值时,会使用promise的reject方法来处理这个异常值。
类似generator

JavaScript
async function foo () { var a = await '123' var b = await '234' console.log(a, b) } foo () // 123, 234

async流程工具包:http://caolan.github.io/async/

6、RxJS

rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展,响应式的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作
rxjs核心:

  • Observable: 可观察的数据对象
  • Observer: 观察者实例,用来决定何时观察指定数据.
  • Subscription: 观察数据序列返回订阅实例.
  • Operators: Observable的操作方法,包括转换数据序列,过滤等,所有的Operators方法接受的参数是上一次发送的数据变更的值,而方法返回值我们称之为发射新数据变更.
  • Subject: 计时观察者也是观察者对象.
  • Schedulers: 控制调度并发,即当Observable接受Subject的变更响应时,可以通过scheduler设置响应方式,目前内置的响应可以调用Object.keys(Rx.Subject)查看

Observable
例:

//可观察对象可以使用create创建,也可以用of,from,interval等
var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1)
  })
//Operators
  observable.map(item=>item * 10)
//观察者
  var observe = {
    next: x=>console.log(x),
    error: err=>console.log(err),
    complete: ()=>console.log('done'),
  }
//观察者订阅可观察者对象
  observable.subscribe(observe)
//output: 1

上面代码中使用Rx.Observable.create操作符来创造一个可观察对象,使用一个观察者订阅它,可观察对象执行后给观察者发送next/error/complete通知。由于使用了map创建了一个新的可观察对象,新的可观察对象与当前可观察对象没有关系,故值为1。

在Observable.create(function(observer){…})中的代码,表示了一个可观察对象的执行,一个仅在观察者订阅的时候发生的惰性计算。执行随着时间产生多个值,以同步或者异步的方式。
下面是可观察对象执行可以发送的三种类型的值:

  • “Next”: 发送一个数字/字符串/对象等值。
  • “Error”: 发送一个JS错误或者异常。
  • “Complete” 不发送值。
    Next通知是最重要且最常见的类型:它们代表发送给观察者的确切数据,Error和Complete通知可能仅在可观察对象执行期间仅发生一次,但仅会执行二者之中的一个。

observer
观察者是可观察对象所发送数据的消费者,观察者简单而言是一组回调函数 , 分别对应可观察对象发送的通知的类型:next, error和complete。
例:

var observer={
  next:x=>console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification')
}

使用观察者,需要订阅可观察对象:

observable.subscribe(observer)

观察者是可选的,如果你不提供某个回调函数,可观察对象的执行仍然会照常发生,但某个类型的通知将不会发生,因为在观察者对象中没有对应于他们的回调函数。
下面代码中仅提供回调来作为参数也是可以的:

observable.subscribe(x => console.log('Observer got a next value: ' + x));

Subscription
订阅表示一次性资源的对象,通常是一个可观察对象的执行。订阅对象有一个重要的方法:unsubscribe,该方法不需要参数,它可以弃掉可观察对象所持有的资源。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
setTimeout(()=>{
  subscription.unsubscribe();
}, 3000)

//output: 1, 2
订阅对象也可以被放置在一起,因此对一个订阅对象的unsubscribe()进行调用,可以对多个订阅进行取消。做法是:把一个订阅”加”进另一个订阅。

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
  subscription.unsubscribe();
}, 1000);
//output:
//second: 0
//first: 0
//second: 1
//first: 1
//second: 2

remove(otherSubscription)方法,用于解除被add添加的子订阅。

Subject
Subject是允许值被多播到多个观察者的一种特殊的Observable。它同时拥有 Observer 和 Observable 的行为, 而纯粹的可观察对象是单播的(每一个订阅的观察者拥有单独的可观察对象的执行)。
注意:
1.每一个Subject都是一个observable可观察对象,给定一个Subject后,你可以订阅它,提供的观察者将会正常的开始接收值。从观察者的角度来看,它不能判断一个可观察对象的执行时来自于单播的Observable还是来自于一个Subject.
2.每一个Subject都是一个Observer观察者对象。它是一个拥有next()/error()/complete()方法的对象。要想Subject提供一个新的值,只需调用next(),它将会被多播至用来监听Subject的观察者。

var subject = new Rx.Subject();
subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
//output:
//observerA: 1
//observerB: 1
//observerA: 2
//observerB: 2

注意:在订阅创建之前执行的 next() 就会丢失

Subject也是一个观察者,你可以提供一个Subject当做observable.subscribe()的参数

var subject = new Rx.Subject();
subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2]);
observable.subscribe(subject);
//output:
//observerA: 1
//observerB: 1
//observerA: 2
//observerB: 2

BehaviorSubject
BehaviorSubject中可以传递一个当前值。当新的观察者订阅它时,会立即接受到这个来自BehaviorSubject的”当前值”。

var subject = new Rx.BehaviorSubject(0);
subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});
subject.next(3)
//output:
//observerA: 0
//observerA: 1
//observerA: 2
//observerA: 3
//observerB: 3

ReplaySubject
ReplaySubject可以把已经发出的值保存在缓存之中。
AsyncSubject
AsyncSubject只发送给观察者可观察对象执行的最新值,并且仅在执行结束时。即当触发complete时会返回最新值,触发error时会返回err的内容

Operators
操作符是可观察对象上定义的方法,例如.map(…),.filter(…),.merge(…),等等。当他们被调用,会返回一个新的可观察对象,并不会改变当前的可观察对象实例。

Scheduler
调度者控制着何时启动一个订阅和何时通知被发送。它有三个组件构成:一个调度者是一个数据结构,它知道如何根据优先级或其他标准存储和排列任务。一个调度者是一个执行上下文。它表示何处何时任务被执行。一个调度者具有虚拟的时钟。它通过调度器上的getter方法now()提供了“时间”的概念。 在特定调度程序上调度的任务将仅仅遵守由该时钟表示的时间。

var observable = Rx.Observable.of('foo', Rx.Scheduler.async)
console.log('before')
observable.subscribe(value => console.log(value));
console.log('after')
//output:before, after, foo

官网:http://reactivex.io/rxjs/manual/overview.html
rx操作符:http://rxmarbles.com