有一些时候内置的akka的Graph不能满足我们的需求,需要自己订制Graph,最近写socks5的时候发现要自己搞,就学习一下。

首先,应用这些东西的前提是你对akka stream有一定的了解,这里推荐油管上的Konrad Malawski视频以及简单看看这个repo。然后,就可以去看看官方的教程

大体情况

构造一个Custom Stream Process,大概需要这么几个东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TrivalHandler extends GraphStage[FlowShape[A, B]] {

// 声明使用的port
val in = Inlet[A]("Map.in")
val out = Outlet[B]("Map.out")

// 声明shape
override val shape = FlowShape.of(in, out)

// 构造GraphStageLogic
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// 内部状态
....
// InHandler的设置
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, f(grab(in)))
}
})

// OutHandler的设置
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}

首先定义你想要的那个结构,有几个inlet几个outlet,以及具体的Shape来指定是SourceShapeSinkShape还是FlowShape,最关键的是createLogic得到的GraphStageLogic,在实现的时候应该把具体的可变的部分都保留在GraphStageLogic里面。

对于GraphStageLogic来说,关键在设计InHandler或者OutHandler的逻辑。

官方文档中使用这两种线表示两个flow

Alt text

Alt text

上面是元素流动的流,下面是请求的流,因为Akka Stream采用这种实现方式。

InHandler为例,它有三个回调函数onPushonUpstreamFinish以及onUpstreamFailure。这三个函数用来对应上游对这个stage的行为。onPush说明上游有数据Push过来,然后你可以通过push把数据推向下游。相对的,下游通过OnPush来说明自己需要数据,你也可以通过pull来向上游请求数据。默认的流就是如上图所示。

具体自己实现的时候也可以靠考虑这个流的方式来对应到具体代码的实现上。当然同时应该满足文档中的状态机。

一个栗子

最初我是想写一个Socks5的服务器,结果发现其实完整的custom flow graphstage的文档挺少的。有很多坑,这里先搞一个例子。

目标是这样的,我们想自己实现source的例子比较常见,比如我们有个新的数据源,并没有默认的实现,但是flow的例子可能少一点,但是我们如果用akka stream处理tcp请求,有些情况我们需要自己解析包的结构,然后再进行处理。这种时候,我们可能就需要自己定制flowstage了。

这里定义了包结构是以0x00开始代表包的开始,然后跟一个Byte的内容代表以后的包长度,然后跟着utf8的字符,然后返回就是对应的字符。

Alt text

Alt text

大概是这个样子的。接下来就是把他们拼装到一起了。

首先处理tcp连接的这一部分,Stream中流的是IncomingConnection,然后materialize之后得到的是Future[ServerBinding],然后对于每一个IncomingConnection,通过handleWith来暴露流,如果去看源码,其实handleWith就是

1
flow.joinMat(handler)(Keep.right).run()

joinMat的拓扑其实就是

Alt text

Alt text

也就是我们需要一个Flow[ByteString, ByteString],获取输入的流,然后返回输出的流。

我们可能希望更多,比如当一次tcp断开之后,能获得一些统计信息什么的。所以,实现的时候不能只继承GraphStage也应该实现GraphStageWithMaterializedValue。即类型应该为GraphStageWithMaterializedValue[FlowStage[ByteString, String], Future[Int]]

先定义对开的接口,一个输入一个输出,然后定到shape

1
2
3
4
val in = Inlet[ByteString]("Custom.in")
val out = Outlet[String]("Custom.out")

override def shape: FlowShape[ByteString, String] = FlowShape(in, out)

然后,流程在createLogicAndMaterializedValue里面,它不止返回一个GraphStageLogic,还要返回Future[Int],这里就当计算收到包的个数。这里有一些坑,首先是要注意pullpush的阶段,不然有可能死锁,除了push以外,还有emit他是阻塞的,所以不会出现可以对同一个port调用两次的情况。

另外,因为要自己维护一个buffer,因为不能保证每次到来的都是一个完整的包,有可能要等多次才能收到一个完整的包。还有一个问题就是当客户端关闭socket的时候,不能直接关闭自己,应该考虑到buffer里面还有一些没处理过的,全都处理了之后才能关闭。

1
2
3
4
5
6
7
8
9
override def preStart(): Unit = {
super.preStart()
pull(in)
}

override def postStop() = {
promise.tryFailure(new IOException("Connection closed"))
super.postStop()
}

要注意的是,因为要返回Future,所以postStop中要tryFailure,保证所有的出口都能完成future

接下来,对于InletHandler

1
2
3
4
5
6
7
8
9
10
11
override def onPush(): Unit = {
val data = grab(in)
writeBuffer(data)
processBuffer()
}

override def onUpstreamFinish(): Unit = {
finishRequest()
super.onUpstreamFinish()
completeStage()
}

finishRequest里面解决剩下的buffer的内容。这里的内容是通过实现对应的unapply函数来分析和获取的,有点像Parser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
object SizedByteFrame extends ByteFragment[(Int, Option[String])]{
override def fromBytes: Extractor =
new PartialFunction[Seq[Byte], ((Int, Option[String]), Seq[Byte])] {
def getLength(d: Seq[Byte]) = (d.head.toInt, d.tail)
def isDefinedAt(d: Seq[Byte]) = {
val (size, data) = getLength(d.tail)
(d.head == 0x00) && (size <= data.length)
}

def apply(s: Seq[Byte]) = {
val raw = s.tail
val (size, data) = getLength(raw)

if(size > 0) {
val str = ByteString(data.slice(0, size).toArray).utf8String
((size, Some(str)), data.slice(size, data.length))
} else
((0, None), raw.tail)
}
}

override def toBytes(value: (Int, Option[String])): ByteString = {
val head = 0x00.toByte
value._2 match {
case Some(raw) =>
head +: value._1.toByte +: ByteString(raw.toArray.map(_.toByte))
case None =>
head +: ByteString(0.toByte)
}
}
}

对于outletHandler,其实akka stream中已经内置了很多的例子,直接使用eagerTerminateOutput,其实就是onPull时什么也不干。

具体使用的时候,注意下mat的时候,是Keep right还是keep left就差不多了。

具体的代码可以看这里




X