読者です 読者をやめる 読者になる 読者になる

Akka HTTPを使ってみる。Akka Streamsにもちょっと触れるよ

Akka HTTPとはAkkaの上でHTTPサーバーを実現したものです。HTTPクライアントとしての機能を持っていたり、WebSocketのサポートもしています。

sprayという似たものもありますが、spray/sprayには「長らくメンテしてないよ。Akka HTTPに置き換えらているよ」とアナウンスされています。ですので、素直にAkka HTTPを使います。

ふたつのAPI

Akka HTTPのサーバーサイドにはレベルの異なるふたつのAPIがあります。

低レベルAPILow-Level Server-Side API

Akka HTTPはAkka、もっと言うとAkka Streamsを利用して書かれているのですが、低レベルAPIはそのAkka Streamsが剥き出しになっており、よりHTTPに近い方(コネクションとか)にも触れられます。こちらを利用することによってWebSocketによるアクセスを捌くことも出来ます。

Akka Streamsが剥き出しにはなっていますが、それらのあれこれをやってくれて、ちょっと短く書けるように、bindAndHandleSyncというメソッドがあります。そのメソッドを使うと以下のように書けます。

import akka.actors.ActorSystem
import akka.streams.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, Uri }
import akka.http.scaladsl.model.HttpMethods._

implicit val system = ActorSystem()

implicit val materializer = ActorMaterializer()

val handle: HttpRequest => HttpResponse = {
  case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
    HttpResponse(entity = "pong")
}

Http().bindAndHandleSync(handle, "localhost", 8080)

bindAndHandleAsyncメソッドもありますが、こちらはHttpRequest => Future[HttpResponse]と、Futureを使うようになります。

高レベルAPIHigh-Level Server-Side API

高レベルAPIは低レベルAPIをラップし、ルーティングの機能を備えたものです(よく見るとこのページのURLも"routing-dsl"になっていますね)。その分低レベルAPIに比べて細やかなことは出来ません。

高レベルAPIでは、akka.http.scaladsl.server.Directivesのメソッド群と、bindAndHandleメソッドを使います。

import akka.actors.ActorSystem
import akka.streams.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._

implicit val system = ActorSystem()

implicit val materializer = ActorMaterializer()

val route =
  path("ping") {
    get {
      complete("pong")
    }
  }

Http().bindAndHandle(route, "localhost", 8080)

bindAndHandleメソッドの第1引数に渡すのはFlow[HttpRequest, HttpResponse, Any]という型の値で、このFlowというのはAkka Streamsのものです。ですので、高レベルAPIにおいてもAkka Streamsが隠蔽されるわけではないのですが、akka.http.scaladsl.server.Directivespath, get, completeはすべてこのオブジェクトのメソッド)を用いるので、低レベルAPIに比べるとほとんど意識しなくなります。

反対に、低レベルAPIにおいてFlowを用いる箇所ではakka.http.scaladsl.server.Directivesで組み立てたものを渡せたりもします。

低レベルAPIを使ってみる

高レベルAPIakka.http.scaladsl.server.Directivesが提供するメソッド群を使ったルーティングの記述はややクセがありますが(sprayに触れてた人はほぼ同じだから馴染みやすいらしい)、見て分からないほどのものではないと思います。が、これは他のウェブフレームワークでもやってくれそうなもの。というわけで、低レベルAPIの方を詳しく見ていきます。

その前に軽くAkka Streamsを説明。

  • Source: 何かしらの入力を発生させる。始端。ひとつの出力チャンネルを持つ
  • Sink: Sourceから発生した入力が最終的に行き着く先。終端。ひとつの入力チャンネルを持つ
  • Flow: 入力を受け取って、別の形に変換し出力する。入力チャンネルと出力チャンネルをひとつずつ持つ

より突っ込んだ話であれば:

あたりが詳しかったです。

Akka HTTPの低レベルAPIはこのAkka Streamsを利用しているので、それをわざとらしく使用したコードが以下です。

import akka.http.scaladsl.Http
import akka.stream.scaladsl.Sink

val source = Http().bind("localhost", 8080)

source.runWith(Sink.foreach { connection =>
  println(s"New connection is accepted: ${connection.remoteAddress}")
  // handle connection
})

bindAndHandleSyncメソッドやbindAndHandleメソッドではなく、bindメソッドを使用すると、戻り値はSource[Http.IncomingConnection, Future[Http.ServerBinding]]となります。ポイントはSourceを返している点です。

このSourceはHttp.IncomingConnectionを発生させるSourceです。ですので、このあとにFlowやSinkを繋げばHttp.IncomingConnection、つまりHTTPによってアクセスされた場合のコネクションを得ることが出来るわけです。

source.runWithメソッドにSinkを渡すと実行されるので、HTTPコネクションを受け取るSinkを渡してあげればOKです。

ただ、Sinkは入力の行き着く先ですので、これだけではHTTPコネクションを受け取って終わりになってしまいます。何の処理もしていません。8080ポートで接続を受け付けるようになっただけです。

ですので、connection.handleWithメソッドを使います。

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, Uri }
import akka.http.scaladsl.model.HttpMethods._
import akka.stream.scaladsl.{ Sink, Flow }

val source = Http().bind("localhost", 8080)

source.runWith(Sink.foreach { connection =>
  println(s"New connection is accepted: ${connection.remoteAddress}")

  connection.handleWith(Flow.fromFunction {
    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "pong")
  })

  // Http().bindAndHandleSync(...) の第1引数に渡すものと一緒
  // connection.handleWithSyncHandler(handler)

  // Http().bindAndHandleAsync(...) の第1引数に渡すものと一緒
  // connection.handleWithAsyncHandler(handler)
})

handleWithメソッドは第1引数にFlow[HttpRequest, HttpResponse, Mat]Matの説明は割愛)を受け取ります。想像に難くないかもしれませんが、HttpRequestを入力として受け取り、HttpResponseを出力として返すFlowのことであり、そのFlowでHTTPリクエストを処理してHTTPレスポンスを返してやってね、ということです。

これでHttp.IncomingConnectionを受け取った後、あらたにHttpRequestを受け取りHttpResponseを返すというふたつのストリームによって、HTTPサーバーを実現します。

ちなみにコメントでも書きましたが、Http.IncomingConnectionにはhandleWithSyncHandlerメソッドとhandleWithAsyncHandlerメソッドも定義されていて、これはHttp()で使えるbindAndHandleSyncメソッドおよびbindAndHandleAsyncメソッドにて実現できることをHttp.IncomingConnectionにて実現するものです。

とまあこんな風にHTTPリクエストなどなどを扱えます。高レベルにHTTPを隠蔽するなら他の選択肢もありますが、HTTPコネクションのレベルで何かしてやる場合、Akka HTTPはひとつの選択肢だなーと思いました。