Akka StreamsのalsoToを使うとSourceが停止しない

結論から言うと下記のコードには問題がある。rediscalaでRedis pub/subに繋いで、subscribeで受け取った内容をAkka HTTPでのSSEで返すコードの例です。

import java.nio.charset.Charset

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.stage._
import redis.RedisPubSub
import redis.api.pubsub.Message

class RedisPubSubGraphStage(channel: String)(implicit actorSystem: ActorSystem) extends GraphStage[SourceShape[String]] {
  private val out: Outlet[String] = Outlet("RedisPubSubGraphStage.out")

  val shape: SourceShape[String] = SourceShape(out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var redisPubSub: Option[RedisPubSub] = None

      private def receiveMessage(message: Message): Unit = {
        val data = message.data.decodeString(Charset.forName("UTF-8"))
        emit(out, data)
      }

      override def preStart(): Unit = {
        redisPubSub = Some(RedisPubSub("localhost", 6379, Seq(channel), Seq.empty, receiveMessage, _ => ()))
      }

      override def postStop(): Unit = {
        redisPubSub.foreach(_.stop())
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = ()
      })
    }
}

このRedisPubSubGraphStageを使えば、与えたチャンネル(channel)をsubscribeし、受け取った内容をそのまま垂れ流すSourceを作ることが出来る。で、これをSSEで流すと下記のような感じになる。

import akka.http.server.Directives._
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

get {
  path("subscribe") {
    parameters('channel) { channel =>
      val source =
        Source
          .fromGraph(new RedisPubSubSourceGraph(channel))
          .map(s => ServerSentEvent(s))
          .alsoTo(Sink.foreach(sse => processSse(sse)))

      complete(source)
    }
  }
}

おかしいことは分かっているんだけど、subscribeしたらSSEのデータを別口で受け取り、何かしらの処理をしたいとする。例えばチャットなどで、チャットのメッセージが作られただとか削除されただとかのイベントが流れくる中、subscribeした時点でそのイベントの中のメッセージを開き、ついでに既読にしてしまうとか。正攻法ならば接続したクライアントで既読APIを叩いてやるべきだと分かっているんだけど。

とりあえず言い訳はしたが、こんな風にした場合、想定した通りにならない。Akka HTTPのSSEはHTTPのコネクションを切ると、そのために使ったRunnableGraphはきちんと閉じられる。当然RedisPubSubGraphStageが返したGraphStageLogicのpostStopだって呼ばれる。しかし、alsoToを使うと、HTTPのコネクションを切ってもそちらのSinkが生きているせいなのか、postStopが呼ばれない。

当然と言えば当然って感じで、そりゃSinkがひとつでも生きているならばRunnableGraph自体は有効にしておくしかない。別にSSEに限った話でもないが、alsoToを使う場合はこういうことが起こりえることを考慮すべきなんだろうなあ。

ちなみにこういったケースならば、viaでFlowを噛ましてやればalsoToを使わず似たようなことは出来ます。