以下内容完全是抄袭《Principles of Reactive Programming》中Erik Meijer的课程内容,想好好学习请转战公开课,不要向下看。

这个本来是公开课《Principles of Reactive Programming》的第二部分。这两天要用Akka,就顺便回顾了一下。其实这部分也展示了monad(这是什么?)封装的能力。

先看下面这张图

four effects

four effects

这张图其实描述了四种我们常要处理的场景,同步-单一对应着可能的异常处理,同步-多个对应可迭代,异步-单一对应Future,异步-多个对应Observable

Try[T]

先看Try[T],这个类型类似于Either[+A, +B],但是在语义上有一些不同。一个Try大概可以这么定义。

1
2
3
4
5
6
7
8
9
10
11
12

abtract class Try[+T] {}
case class Success[+T](value: T) extends Try[T] {}
case class Failure[+T](exception: Throwable) extends Try[T] {}
object Try {
def apply[T](f: => T): Try[T] =
try {
Success(f)
} catch {
case e => Failure(e)
}
}

这里,我们有Success和Failure两种模式,显然Success用来表示结果正常,而Failure用来表示出现异常。

一个简单的例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

import scala.util.{Try, Failure, Success}

object example {

def main(args: Array[String]) {
val div = Try { //这里可能出错
val a = scala.io.StdIn.readInt()
val b = scala.io.StdIn.readInt()
a / b
}

div match {
case Success(x) => println(x)
case Failure(e) => println(e.toString)
}
}
}

使用Try就不用显示的将代码包裹在try ... catch ...中,然后通过定义在其上面的一些组合函数,可以简化很多代码,比如继续上面这个例子,为了应对除0的错误,我们就假设当出现除零错误的时候,就是用正无穷来代替这个值,就可以简单的写成下面这样就可以了。

1
div.getOrElse(Int.MaxValue)

当然也可以使用for来简化计算,

1
2
3
4
5
6
7
8
9
10
11
object example {

def main(args: Array[String]) {
val result = for {
x <- Try("0".toInt)
y <- Try("12".toInt / x).orElse(Try {"12".toInt / 0.001})
} yield y

println(result.getOrElse(Int.MaxValue))
}
}

这里就使用orElse,保证当除数为0的时候,使用一个很小的数代替它。scala里面的for其实是flatMap的一个语法糖。TryflatMap可以这样简单的实现

1
2
3
4
5
6
7
8
case class Sucess[+T](value: T) extends Try[T] {
def flatMap[B](f: T => Try[B]): Try[B] =
try {
f(value)
} catch {
case NonFatal(e) => Failure(e)
}
}

所以在有很多部分都可能发生异常的时候,利用for,就可以简单的实现处理的代码而不用费心在异常处理上。

Erik Meijer讲课也很有意思,他虽然会讲这是个monad,但是他又不强调monad law,感觉如果为了合理使用for简化语法,都可以实现其flatMap方法。

Try[T]上定义的高阶函数可以看这里

Future

同步单一的情况,下一步就是异步单一结果了。Future不是特别新的东西,(各种地方都有,ES6里面有,C#包括Python里面有Async以及Await,更不用说Haskell。其实计算机和数学有关的部分大多都不是新东西)。

异步的场景很常见,比如我要抓取不同网页的内容然后merge到一起,再send给远程的另一方。就可以写成这样

1
2
3
4
5
val firstSource = getFromSource1();
val secondSource = getFromSource2();

val last = merge(firstSource, secondSource)
sendTo(last)

但是由要从网络获取数据,所以有可能会等到很长时间。所以,对于这种延时的操作,就可以用Future封装。

比如这个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import scala.concurrent.{Await, Future}
import scala.io.Source
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global

object example {

def main(args: Array[String]): Unit = {
val source = Future {
val s = Source.fromURL("http://www.w3.org/Addressing/URL/url-spec.txt")
val t = s.getLines().toList.mkString("\n")
t
}

source onComplete {
case Success(x) => println(x)
case Failure(e) => println("error")
}

Thread.sleep(1000)
}
}

一个简单的Future可以这么定义

1
2
3
4
trait Future[T] {
def onComplete(callback: Try[T] => Unit)
(implicit executor: ExecutionContext): Unit
}

这里implicit的变量是给Future运行一个上下文。使用Future就可以简化异步的操作,不用显示的维护线程。

Future上也定义很多的composition。比如,下面我们需要从两个地点获取数据,然后把他们组合到一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.CountDownLatch

import scala.concurrent._
import scala.io.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.concurrent.duration._

object main extends App {

val netiquetteUrl = "http://www.w3.org/Addressing/URL/url-spec.txt"
val netiquette = Future { Source.fromURL(netiquetteUrl).mkString }
val urlSpecUrl = "http://www.w3.org/Addressing/URL/url-spec.txt"

val latch = new CountDownLatch(1)
val urlSpec = Future { Source.fromURL(urlSpecUrl).mkString }

val answer = for {
nettext <- netiquette
urltext <- urlSpec
} yield "Check this out: " + nettext + ". And check out: " + urltext

answer onComplete {
case Success(x) => {println(x + "\n"); latch.countDown()}
case Failure(e) => {println(e.toString + "\n"); latch.countDown()}
}

Await.result(answer, Duration.Inf)
latch.await()
}

这里for或者说flatMap再一次发挥了作用。还有诸如filtermap等高阶函数。另外,别忘了伴生对象的Future里面很多使用的函数,比如我们如果有一系列的url要访问,最后把内容组合到一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import scala.concurrent._
import scala.io.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.concurrent.duration._

object main extends App {

val netiquetteUrl = "http://www.w3.org/Addressing/URL/url-spec.txt"
val urlSpecUrl = "http://www.w3.org/Addressing/URL/url-spec.txt"
val urlList = List(netiquetteUrl, urlSpecUrl)

val futureList = Future.sequence(urlList.map(x =>
Future {Source.fromURL(x).mkString}))

futureList onComplete {
case Success(l) => l.mkString("\n")
case Failure(e) => e.toString
}


val s = Await.result(futureList, Duration.Inf)

println(s)
}

通过伴生对象中的sequenceList[Future[T]]变成Future[List[T]],然后进一步处理。

其实FutureonComplete,本质上也还是一种回调函数,但是由于提供了更多的高阶函数,使得回调多层的时候,不会显得那么难看。

Async以及await

async以及await是个在很多语言都实现的语法糖,我第一次见是在F#、C#里面,不过一定要记住async和parallelized并不是一个东西。先看个例子吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.async.Async._

object main extends App {
def slowCalc(x: Int): Future[Int] = Future {
Thread.sleep(30)
x
}


val f = async {
await(slowCalc(12)) + await(slowCalc(15))
}

val x = Await.result(f, Duration.Inf)
println(x)
}

用async把要异步运行的部分包裹起来,用await表示等待求值,最后返回一个Future

一个可能的声明是

1
2
3
def async[T](body: => T)(implicit context: Executioncontext): Future[T]

def await[T](future: Future[T]): T

await的使用有一些限制:

  • await需要一个直接相关的async。也就是要求,await要不然在一个async的闭包里,要不然在一个封闭的objecttraitclass

  • await不能使用by name的参数。

  • await不使用在boolean短路的参数

  • async里面不能使用return

  • await不能再一个try/catch代码段里

由于async/await是通过宏来实现的,有时候报错会比较难读懂。 不过有意思的是默认的async/await实现中中间表示使用的是A-normal form,而不是CPS,不过Akka中的Dataflow concurrency使用的是CPS。其实,大概的实现思路应该是,首先转换成ANF,然后宏会分析整个代码段,将其分解成一个个chunks,即一段段顺序执行的序列。然后多个代码段都可以访问的变量或者常量,包裹进整个状态机的对象里面,然后将整个代码转换成一个状态机。

Promises

之前说过,Future本身还是回调,而Promise就为摆脱这个语法又前进了一步。Promise可以看做一个用来盛放Future的盒子。看下面一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object main extends App {
val p = Promise[Int]()
val firstCall = Future {
Thread.sleep(30)
30
}

val secondCall = Future {
Thread.sleep(10)
10
}

firstCall onComplete { p.tryComplete(_)}
secondCall onComplete { p.tryComplete(_)}


val x = Await.result(p.future, Duration.Inf)
println(x)
}

这里有两个计算,我们只关心更快的一个。我们使用Promise保证肯定会有一个结果填进来,然后调用其future,来保证能获取其结果。也就是之前的future只能在最后一步返回结果,而使用Promise就可以在其中获取结果,并且不只是和一个future关联。一个简单的定义如下:

1
2
3
4
5
trait Promise[T] {
def future: Future[T]
def complete(result: Try[T]): Unit
def tryComplete(result: Try[T]): Unit
}

当然Promise本身有一些确定,首先,它只能处理一个数据,无法处理连续的数据流,而且它没有cancel的选项,一旦开始不能取消。不过还好,Scala可以通过隐式类型转换的方法来添加这些功能,习题就给了一个添加取消功能的例子,这个是个简单的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package nodescala.cancle

import java.util.concurrent.{CancellationException}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}

/**
* Created by ariwaranosai on 15/11/18.
*/

package object cancle {
implicit class FutureCompanionOps(val f: Future.type) extends AnyVal{
/**
* 修改运行方式,现在运行需要提供一个用来生成Subscription的函数
*/

def run()(f: CancellationToken => Future[Unit]): Subscription = {
val ca = CancellationTokenSource()
f(ca.cancellationToken)
ca
}
}


/**
* subscription用来提供unsubscribe的方法
*/

trait Subscription {
def unsubscribe(): Unit
}

/**
* CancellationToken 用来判断是不是请求取消
*/

trait CancellationToken {
def isCancelled: Boolean
}

/**
* 实现一种Subscription,在取消后会返回CancellationToken
*/

trait CancellationTokenSource extends Subscription {
def cancellationToken: CancellationToken
}

/**
* CancellationTokenSource的伴生对象,
* 对于每一个CancellationTokenSource,采用一个Promise记录其是不是已经
* cancel了,调用unsubscribe的时候,就使p完成,从而保证取消
*/

object CancellationTokenSource {
def apply() = new CancellationTokenSource {
val p = Promise[Unit]
def cancellationToken = new CancellationToken {
def isCancelled: Boolean = p.future.value != None
}
def unsubscribe(): Unit = {
p.trySuccess(())
}
}
}

def main(args: Array[String]) {
val loop = Future.run(){ token => Future {
var i = 0
while (i < 100) {
if (token.isCancelled) throw new CancellationException()
Thread.sleep(100)
println(i)
i += 1
}
}}

Thread.sleep(1000)
loop.unsubscribe()
println("unsubscribe")
}


}



X