以下内容依旧是是抄袭《Principles of Reactive Programming》中Erik Meijer的课程内容,想好好学习请转战公开课,或者看reactivex的文档或者微软rx.net的文档。

接着回来看这张图,

e

e

上一篇见识了前面的两个,而Iterable其实大家都很熟悉,关键就在最后的Observable上。课上还讲了一些关于类型的对偶。

回忆future的定义

1
2
3
4
trait Future[T] {
def onComplete(callback: Try[T] => Unit)
(implicit executor: ExecutionContext): Unit
}

忽略中间隐式的参数其实可以把其类型看成(Try[T] => Unit) => Unit然后把它翻过来,就是Unit => (Unit => Try[T]),简化之后就只有Try[T]了。这种关系称为Dual,详细可以看这里

然后,再看看Iterator,他大概可以这样定义

1
2
3
4
5
6
7
8
trait Iterable[T]{
def iterator(): Iterator[T]
}
trait Iterator[T] {
def hasNext: Boolean
def next(): T
}

然后就能写出来Iterator的类型,Iterator可以看成一个可以有无限长的流,每次调用next都会取出来新的元素。所以,在一些lazy的地方,经常会用到迭代器

Iterator的类型可以写成Unit => (Unit => Try[Option[T]]),然后反转过来,(Try[Option[T]] => Unit) => Unit,然后把异常处理的部分分开,对于Try[Option[T]]有三种情况,正确得出结果T => Unit,发生异常Throwable => Unit,还有空Unit=> Unit,然后写出来(Throwable => Unit, () => Unit, T => Unit) => Unit,这个就是Observable[T]。然后我们像Iterator一样丰富起来,首先应该区分ObserverObserable,然后把三种情况分开,最后把内容展开。

1
2
3
4
5
6
7
8
9
trait Observer[T] {
def onError(e: Throwable): Unit
def onCompleted(): Unit
def onNext(value: T): Unit
}
trait Obserable[T] {
def subscible(observer: Observer[T]): Unit
}

然后,考虑到我们还需要能移除这种关注关系的方法,所以,还需要一种可以取消的结构Subscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
trait Observer[T] {
def onError(e: Throwable): Unit
def onCompleted(): Unit
def onNext(value: T): Unit
}
trait Obserable[T] {
def subscible(observer: Observer[T]): Subscription
}
trait Subscription {
def unsubscribe(): Unit
def isUnsubscribed: Boolean
}

Observable其实是代表了异步事件流,Rx中实现了相应的内容,这部分可以看wiki。在Reactive Programming,将之前要处理的data flow,统一抽象出一种数据结构,同时这种数据结构其实也是monad。或者换个说法,Python的Rx库的介绍上有

Rx = Observables + LINQ + Schedulers

在Rx中,使用Observables表示异步数据流(或者说event stream是first class的,这也是为什么Rx不是FRP的原因,感兴趣可以看原论文),使用LINQ作为其上的操作(定义在其上的大量的高阶函数),使用Schedulers来调度异步数据流。这里我还是想把这句话放上来

Your mouse is a database.
– Erik Meijer

Observable

在GUI的世界里,应该尽力避免block的,会阻塞的事件注册在应该在后台,等到完成在通过回调的方式完成,但是单纯的回调问题很明显,假设我们需要对Botton的点击的事件进行相应,最简单的方法应该是直接在onclick上填一个回调函数处理,但是这样,如果我们有更多的事件要添加的话就有更改onclick上的赋值,每一次都要把前一个时间留下来,然后加个新的代码进去,然后重新付回去,这显然是不能接受的。所以,我们可能想到了把这个封装一下,每次注册事件的回调,都是把函数append进事件队列里,这样一方面就不用修改之前的代码。这样的缺点依然很明显,我们缺少控制我们到底关心什么事件的能力,当我们不再关心Botton的点击的时候,我们希望有简单的办法取消关注。这也是一个很明显的生产者消费者的模型。

先看个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import rx.lang.scala.{Subscription, Observable, Observer}
import scala.swing._
import scala.swing.event._
object ButtonSwing extends SimpleSwingApplication {
def top = new MainFrame {
title = "Swing Observables"
val button = new Button {
text = "Click"
}
contents = button
val buttonClick = Observable[Button]({
obs => {
button.reactions += {
case ButtonClicked(_) => obs.onNext(button)
}
Subscription()
}
})
var obs = Observer[Button](_ => println("click"))
buttonClick.subscribe(obs)
}
}

这里给Button绑定了一个Observer,每click一次都会输出一个。

正如这个例子,一个Observable可以通过一个observer => Subscription建立然后返回一个Observable。每当我们在调用这个Observablesubscribe上时,就会调用这个函数,然后返回一个Subscription。在我们不关心Observable的时候,就可以调用这个Subscriptionunsubscribe

Observable上定义了很多高阶函数,Marble Diagrams可以很清楚的表示函数的功能。

filter函数过滤每一个event,生成一个新的Observablefilter

map函数对每一个event进行映射。 map

flatMap函数将一个事件映射成一个或多个eventflatMap

这些都是常见的高阶函数。一些还有面对多个Observable函数,他们有面对多个Observable的语义。

merge

merge

concat

concat

还可以看这里,这些图也是从文档里面找到的,需要注意的一点是多个流在处理的时候,他们的顺序可能是和自己想的不一样。

这是一个简单的merge的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import rx.lang.scala._
import scala.concurrent.duration._
object example {
def main(args: Array[String]) {
val obsTwo: Observable[Long] = Observable.interval(2 second)
val obsThree: Observable[Long] = Observable.interval(3 second)
val twoSecond: Observable[String] = obsTwo.map("1 second: " + _.toString)
val threeSecond: Observable[String] = obsThree.map("2 second: " + _.toString)
val mergeed = twoSecond.merge(threeSecond)
val s = mergeed.subscribe(println(_))
readLine()
s.unsubscribe()
}
}

考虑一个复杂一点的情况,我们从一个信号源获得数据之后还要去第二信号源查询相应的数据,就像视频中给的

1
2
3
4
5
6
7
8
9
def getGeocode(c: Geo): Future[Country] = { ... }
val withCountry: Observable[Observable[(EarthQuake, Country)] = earthQuakeObservable().
map( quake => {
val country: Future[Country] = getGeocode(quake.location)
Observable.from(country.map(country => (quake.country)))
}
val c: Observable[(EarthQuake, Country)] = withCountry.concat()

这里,对于来的每一个地震信息,都要反查一下对应的位置信息,这里使用from函数可以从future构件一个observable,如果使用flatten,会按照完成的顺序,有可能后面发生的地震会在查询的时候更快,导致最后的Observable中的顺序和第一个不一致,所以这个时候应该用concat

Subscription

回忆一下Subscription。它用来表示对Observable的一种对应关系。

简单来说,Observable分为两种一种称为Cold另一种称为HotCold Observable为每一个Observer提供一份单独的内容,也就是说对它的subscribe是有副作用的,但是这种模式是很有用的,如果你关注一个产生随机内容的Observable,有可能你希望你自己的随机序列和其他的Observer是不一样的。与之相对应的,Hot Observable就是对于所有的Observer都提供同一样的内容。所以,一个值得注意的地方是unsubscribe并不意味着cancel

Subscription本身也有很多类型,比如CompositeSubcription,它可以同时组合多个Subscription,而且它也重载了+=-=方便组合。要注意,对已经unsubscribeCompositeSubcription加入新的Subscription,会使新加的Subscription马上unsubscribeMultiAssignmentSubscription可以多次进行赋值,但是每次只保存一个SubscriptionSerialSubscription也可以进行多次赋值,但是它在付新值的时候会把旧的Subscription进行unsubscribe

Subjects

Future中,我们可以使用Promise简化逻辑,同时获取Future中间的值。在Observable中也有对应的概念。对于Promise其实就是一个盒子,隐式的包含一个future,通过success将内容放到future里面,然后通过future获取返回的内容。

相似的,我们还有Subject,它也可以通过onNextonCompletedonError来把值扔进去,然后通过Observable获取其中的值。

看个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import rx.lang.scala.subjects.PublishSubject
object example {
def main(args: Array[String]): Unit = {
val channel = PublishSubject[Int]() //构造一个新的subject
val a = channel.subscribe(x => println(s"a: $x"))
val b = channel.subscribe(x => println(s"b: $x"))
channel.onNext(42)
a.unsubscribe()
channel.onNext(4711)
channel.onCompleted()
val c = channel.subscribe(x => println(s"c: $x"))
channel.onNext(13)
}
}

这里的使用了一个PublishSubject,其实Subject也有很多种,subscribePublishSubject上,并不能获得之前已经publish的值,也就是PublishSubject不保存自己之前的值。如果想要每个观察者都能得到所有的值,应该使用ReplaySubject。才外还有,AsyncSubject它会保存最终一个值,而BehaviorSubject会保存最近的一个值。

Others

一个简单的Observable可以这样实现

1
2
3
4
5
6
7
8
9
10
11
12
13
object Observable {
def apply[T](f: Future[T]): Observable[T] = {
val subject = AsyncSubject[T]()
f onComplete {
case Failure(e) => { subject.onError(e) }
case Success(t) => {
subject.onNext(t)
subject.onCompleted()
}
}
subject
}
}

它将一个Future变成Observable,由于我们希望可以保留最后一个值,所以使用的是AsyncSubject

另外,对比Future的实现,其异常有Try,对应的Observable也有Notification

1
2
3
4
abstract class Notification[+T]
case class OnNext[T](elem: T) extends Notification[T]
case class OnError(t: Throwable) extends Notification[Nothing]
case object OnCompleted extends Notification[Nothing]

另外还有一个materialize可以把一般的Obervable[T]变成Observable[Notification[T]]

当你为了调试想要阻塞直到执行完了为止,Observable自带toBlocking函数。

1
2
3
4
5
6
7
8
9
val xs: Observable[Long] = Observable.interval(1 second).take(5)
val ys: List[Long] = xs.toBlocking.toList
println(y)
val zs: Observable[Long] = xs.sum
val s: Long = zs.toBlocking.single
println(s)

Observable也可以从Iterable构建。

1
2
3
4
5
6
7
def from[T](ts: Iterable[T]): Observable[T] = Observable ( s => {
ts.foreach(t => {
if (s.isUnsubscribed) {break}
s.onNext(t)
})
s.onCompleted()
})

更多的文档可以看这里

example

最后看一个经典的例子,魂斗罗想要30条命需要先输入秘籍上下上下左右左右ba。这里就模拟这个行动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.awt.BorderLayout
import java.awt.event.KeyEvent
import javax.swing.JFrame
import javax.swing.JLabel
import javax.swing.SwingConstants
import rx.lang.scala.Observable
import rx.observables.SwingObservable
import rx.lang.scala.JavaConversions
class Win1 extends JFrame {
val label = new JLabel("")
def run = {
initLayout()
val konami = Observable.just(
38, // up
38, // up
40, // down
40, // down
37, // left
39, // right
37, // left
39, // right
66, // b
65 // a
)
val pressedKeys = JavaConversions
.toScalaObservable(SwingObservable.fromKeyEvents(this))
.filter(_.getID == KeyEvent.KEY_RELEASED)
.map(_.getKeyCode()) //获取按键码
val bingo = pressedKeys.sliding(10, 1)
.flatMap(window => (window zip konami).forall(p => p._1 == p._2))
.filter(identity) //每10个按键emit一个window,满足条件的留下
bingo.subscribe(_ => label.setText(label.getText + " KONAMI "))
}
def initLayout() = {
setLayout(new BorderLayout)
label.setHorizontalAlignment(SwingConstants.CENTER)
add(label, BorderLayout.CENTER)
setSize(400, 400)
setResizable(false)
setTitle("Enter the Konami Code!")
setVisible(true)
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE)
}
}
object Konami {
def main(args: Array[String]): Unit = {
javax.swing.SwingUtilities.invokeLater(new Runnable {
override def run(): Unit = {
new Win1().run
}
})
}
}



X