JSR-310のDateTimeFormatterに気を付けろ

JSR-310に限った話ではないかもしれませんが。

JSR-310で日時なりを文字列にするときは、fomratメソッドにDateTimeFormatterクラスのインスタンスを渡してやる。んー分かりやすい。ちなみにtoStringメソッドでもOK。この場合はISO 8601の形式でフォーマットされます。

で、このtoStringメソッドで使われているフォーマッター(おそらくDateTimeFormatter.ISO_*として用意されている出来合いのフォーマッター)なんですが、気が利いています。どう気が利いているかと言うとミリ秒がゼロの場合はミリ秒を出力しない。き、気が利いてる〜。

けど、他の言語やライブラリがフォーマッターに厳密に従ってパースする場合はミリ秒があったりミリ秒がなかったりで非常に厳しいので、どちらかに統一した方がいいねという話です。というかそれだけの話でした。一応フォーマッターの定義だけ掲載しときます。

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

DateTimeFormatter alwaysWithoutMillis = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZZZZZ");

ZonedDateTime.of(2018, 4, 13, 18, 48, 21, 1000000, ZoneId.of("Asia/Tokyo")).format(alwaysWithoutMillis); // => 2018-04-13T18:48:21+09:00

DateTimeFormatter alwaysWithMillis = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZZZZ");

ZonedDateTime.of(2018, 4, 13, 18, 48, 21, 0, ZoneId.of("Asia/Tokyo")).format(alwaysWithMillis); // => 2018-04-13T18:48:21.000+09:00

Javaめっちゃ久し振りに書いた!

ELBを介したSSEを行う場合の設定や気を付けるところ

SSE(Content-Type: text/event-stream)便利ですよね。フレームワークがサポートしていることも多いですし、その通りやれば結構サクッと動作します。

しかし、ローカルで試した場合にサクッと動作しても、ELBを介した場合に結構ハマったりしたので、その場合の解消法というか、ここ設定しとくと良いかもを書きます。

クライアント(アプリなど) ⇔ ELB ⇔ nginx(EC2インスタンス) ⇔ サーバアプリケーション

という構成が前提です。

結論

結論から言うと、下記の設定をしておくと私が試した限りはスムーズでした。

  • ELBのリスナーはHTTP/HTTPSではなく、TCP/SSLを使用
  • nginxのproxy_bufferingはoff

ELBのリスナーはHTTP/HTTPSではなく、TCP/SSLを使用

これがほとんどなのですが、ELBのリスナーはHTTP/HTTPSを使用すると、ELBが接続を保持したままになり、クライアントがHTTPコネクションを切断しても、サーバアプリケーション側に切断が伝わらない場合があるっぽいです。Redis pub/subなどに接続し、そっからデータを受け取るようなことをしている場合、Redisとの接続を切る必要があるかと思いますが、そういう場合致命的です。

この場合、ELBのリスナーにTCP/SSLを使用してあげたら、スパッと解消されました。ただし、HTTP/HTTPSは専用の処理を挟んでいたり(X-Forwarded-Forリクエストヘッダの付与など)、あとこいつのせいでSSEとの相性が悪いのだと思いますが、バックエンドとのコネクションを維持するキープアライブが働いています。それらが必要な場合はまた一考の余地ありとなります。

nginxのproxy_bufferingはoff

単純な話なのですが、nginxのproxy_bufferingが有効なままだと、サーバアプリケーションでイベントを書き込んでもnginxがバッファリングしてしまい、しばらくの間クライアントが受け取れません。SSEの場合、接続を維持するためにハートビート("data:"など意味を為さないイベント)を流すと思うので、結果それによって押し出されはしますが、SSEの性質上バッファリングしない方が正しいと思うので、切ってしまった方が良いかと思います。

location /path/to/sse {
  # ...

  proxy_buffering off;

  # ...
}

とでも書いておきましょう。併せてproxy_read_timeoutも確認しておいた方が良いかもしれません。ハートビートの間隔より短いと接続が切れてしまいます。nginxのデフォルトは60秒(ついでに言うとELBのアイドルタイムアウトも60秒)なので、あまり考える必要はないかもしれませんが……。

紆余曲折の話

私がいろいろ試した話をだらだら書くだけなので、あまり実のある話はありません。

事の発端はAkka StreamsのalsoToを使うとSourceが停止しないで書いたような、既読処理が停止しない問題を調べていたとき。alsoToの使用をやめてこれで問題解消やろ!と思ったが、そんなことはなくあれー?てなった。

ELBを介さず、直接EC2インスタンスにアクセスするとすんなり動作することから、どうやらELBがコネクション張ったままにしてるっぽいことまで分かった。これ無効にしたいんだけど〜と正直思ったが、どうやら無理っぽい。

じゃあSSEのハートビートを無効にし、ELBのアイドルタイムアウト任せに切断、クライアントで再接続と割り切るかと思ったが、それでもサーバアプリケーションの処理が停止しない。ならばサーバアプリケーションから終了を送ってしまえと、60秒で完了を送るようにしたが、これだとサーバアプリケーションの処理は停止しても、クライアントでエラーが発生し(Google Chromeだとnet::ERR_INCOMPLETE_CHUNKED_ENCODING)、再接続もしばらくの間失敗するというイケてない感じになった。

ちなみにSSEのハートビートを無効にしたときもいろいろあった。nginxのproxy_bufferingがonになっていてクライアントがイベントを受け取れなかったり、Akka Streamsが下流の停止を検出できず、動作が停止しなかったり。まあ結局ハートビートは有効にしたのでこの辺は関係ないのだけど。

で、チームにひたすら進捗投げてたんだけど、そこでHTTP/SSLリスナーはどう?とアドバイスをもらい、一発で解決したのであった。おしまい。

Akka StreamsのalsoToを使うとSourceが停止しない

結論から言うと下記のコードには問題がある。rediscalaでRedis pub/subに繋いで、subscribeで受け取った内容をAkka HTTPでのSSEで返すコードの例です。

import java.nio.charset.Charset

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.stage._
import redis.RedisPubSub
import redis.api.pubsub.Message

class RedisPubSubGraphStage(channel: String)(implicit actorSystem: ActorSystem) extends GraphStage[SourceShape[String]] {
  private val out: Outlet[String] = Outlet("RedisPubSubGraphStage.out")

  val shape: SourceShape[String] = SourceShape(out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var redisPubSub: Option[RedisPubSub] = None

      private def receiveMessage(message: Message): Unit = {
        val data = message.data.decodeString(Charset.forName("UTF-8"))
        emit(out, data)
      }

      override def preStart(): Unit = {
        redisPubSub = Some(RedisPubSub("localhost", 6379, Seq(channel), Seq.empty, receiveMessage, _ => ()))
      }

      override def postStop(): Unit = {
        redisPubSub.foreach(_.stop())
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = ()
      })
    }
}

このRedisPubSubGraphStageを使えば、与えたチャンネル(channel)をsubscribeし、受け取った内容をそのまま垂れ流すSourceを作ることが出来る。で、これをSSEで流すと下記のような感じになる。

import akka.http.server.Directives._
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

get {
  path("subscribe") {
    parameters('channel) { channel =>
      val source =
        Source
          .fromGraph(new RedisPubSubSourceGraph(channel))
          .map(s => ServerSentEvent(s))
          .alsoTo(Sink.foreach(sse => processSse(sse)))

      complete(source)
    }
  }
}

おかしいことは分かっているんだけど、subscribeしたらSSEのデータを別口で受け取り、何かしらの処理をしたいとする。例えばチャットなどで、チャットのメッセージが作られただとか削除されただとかのイベントが流れくる中、subscribeした時点でそのイベントの中のメッセージを開き、ついでに既読にしてしまうとか。正攻法ならば接続したクライアントで既読APIを叩いてやるべきだと分かっているんだけど。

とりあえず言い訳はしたが、こんな風にした場合、想定した通りにならない。Akka HTTPのSSEはHTTPのコネクションを切ると、そのために使ったRunnableGraphはきちんと閉じられる。当然RedisPubSubGraphStageが返したGraphStageLogicのpostStopだって呼ばれる。しかし、alsoToを使うと、HTTPのコネクションを切ってもそちらのSinkが生きているせいなのか、postStopが呼ばれない。

当然と言えば当然って感じで、そりゃSinkがひとつでも生きているならばRunnableGraph自体は有効にしておくしかない。別にSSEに限った話でもないが、alsoToを使う場合はこういうことが起こりえることを考慮すべきなんだろうなあ。

ちなみにこういったケースならば、viaでFlowを噛ましてやればalsoToを使わず似たようなことは出来ます。

sealed abstract case classがすごい

sealed abstract case classなるものがある。別のことを調べてたらこちらの記事(のコメント欄)で見掛けて感動してしまった。

軽く触れると、newできなくなり、case classによって自動で生成されるcopyメソッド、そしてコンパニオンオブジェクトのapplyメソッドが生成されなくなる、というもの。その上でcase classの他の特性を持つ。

これ、インスタンスの生成に関する手段のみが綺麗に潰されているので、バリデーションを挟むファクトリだったりを強制できるのが嬉しい。

私自身は今まで以下のようにしていた。

trait Person {
  def name: String
  
  def age: Int
}

private case class PersonImpl(name: String, age: Int) extends Person

object Person {
  def apply(name: String, age: Int): Person = {
    // ...
    PersonImpl(name, age)
  }
}

こんな風にtraitでインタフェースのみ定義し、その実装をcase classにする。これでequalsメソッドなどは自動で定義してくれる。で、traitの方のコンパニオンオブジェクトで生成してあげる。もちろん実装であるcase class自体は隠しちゃう。

しかしこれ、unapplyメソッドなどは自前で定義しなくちゃならないし、toStringメソッドは実装クラス(この場合はPersonImpl)のものなので、想定した結果にならないと、微妙なとこもあった。

ので、まあケースに応じはするわけだが、sealed abstract case classは有用そうだなと思う。

Scalaマクロのerrorとabortの違い

Scalaのマクロで、コンパイルエラーにしてしまうときは Context.error を使う。

def someMethod_impl(c: Context)(arg: c.Expr[String]): c.Expr[Unit] = {
  // ...

  if (cond) {
    c.error(c.enclosingPosition, "a message")
  }

  // ...
}

でもこれが微妙に使いづらくて、なぜなら戻り値が Unit だから。Unit だと:

val _arg = arg.tree match {
  case Literal(Constant(value: String)) =>
    value
  case _ =>
    c.error(c.enclosingPosition, "a message")
}

とかが出来ません。この場合変数 _argString になってほしいのですが、実際は StringUnit の共通の親である Any になってしまうわけです。

で、登場するのが Context.abort 。こちらは戻り値が Nothing になっているので、上記のようなケースで上手く動作します。

val _arg = arg.tree match {
  case Literal(Constant(value: String)) =>
    value
  case _ =>
    c.abort(c.enclosingPosition, "a message")
}

// 変数 `_arg` はちゃんと `String`

ケースに応じて使い分ければ良さそうですね。

Scala dynamics + macros

Type Dynamic

Scalaはメソッドなどの有無をコンパイルの時点で解決しますが、Dynamic traitを使うことによって動的にすることが出来ます。

// Dynamic traitを使うために必要。コンパイラオプションに `-language:dynamics` を渡してもOK
import scala.language.dynamics

object Adder extends Dynamic {
  // ...
}

こうするだけ。で、オブジェクト(この場合は Adder オブジェクト)に存在しないフィールドやメソッドを指定すると、特定のメソッドが呼ばれます。特定のメソッドは4種類あり、ケースに応じて呼び出されるメソッドが変わります。

  • フィールドの参照: selectDynamic
  • フィールドへの代入: updateDynamic
  • メソッド呼び出し: applyDynamic
  • 名前付き引数を伴ったメソッド呼び出し: applyDynamicNamed

という具合。試しに applyDynamic メソッドを定義してみます。

object Adder extends Dynamic {
  def applyDynamic(name: String)(value: Int): Int = {
    if (!increments.isDefinedAt(name)) {
      throw new IllegalArgumentException(s"$name is not supported")
    }

    val increment = increments(name)

    value + increment
  }

  val increments: Map[String, Int] = Map(
    "one" -> 1,
    "two" -> 2,
    "three" -> 3
  )
}

で実際に使ってみるとこんな感じになる。

println(Adder.one(100))   // => 101
println(Adder.two(100))   // => 102
println(Adder.three(100)) // => 103

本来 Adder オブジェクトには one メソッドなどが定義されていないわけですが、Dynamic traitを使用しているので、applyDynamic メソッドが呼ばれ、その中の処理が実行されるわけです。

先に書いたとおり、他にもメソッドがあるわけですが、これについては割愛。Dynamic traitのドキュメント読むだけでもなんとなく分かるんじゃないかなあと。

マクロ

Type Dynamic便利ですね。しかしこのケースにおいては Adder.four(100) などと呼ぼうものならエラーになります。実行時エラーです。Type Dynamic使わなければコンパイルエラーなのに。

だからこそType Dynamicとも思うのですが、場合によってはコンパイルエラーに出来ます。マクロと併用します。順を追って Adder.four(100)コンパイルエラーになるようにしてみましょう。

まず Adder.applyDynamic メソッドがマクロの呼び出しになるように書き換えます。

import scala.language.dynamics

// `macro` キーワードを使うために必要。コンパイラオプションに `-language:experimental.macros` を渡してもOK
import scala.language.experimental.macros

object Adder extends Dynamic {
  def applyDynamic(name: String)(value: Int): Int = macro AdderMacro.applyDynamic
}

使うのは macro キーワードで、このあとにマクロの実態となるメソッドの識別子を渡します。元のメソッド名と合わせる必要は特になく、macro Nyan.nyan とかでも大丈夫です。

で、実態側を定義する。

import scala.reflect.macros.blackbox.Context

object AdderMacro {
  def applyDynamic(c: Context)(name: c.Expr[String])(value: c.Expr[Int]): c.Expr[Int] = {
    import c.universe._

    val Literal(Constant(numName: String)) = name.tree

    if (!increments.isDefinedAt(numName)) {
      c.error(c.enclosingPosition, s"$numName is not supported")
    }

    val increment = Literal(Constant(increments(numName)))

    c.Expr[Int](q"$value + $increment")
  }

  val increments: Map[String, Int] = Map(
    "one" -> 1,
    "two" -> 2,
    "three" -> 3
  )
}

AdderMacro.applyDynamic がマクロの実態なわけですが、まずシグネチャをマクロ用のものにする必要があります。ひとつ目の引数リストで scala.reflect.macros.{blackbox,whitebox}.Context を受け取るようにし、あとは元のメソッドと同じ引数リストとなります。ただし、c.Expr で包んであげます(または c.Tree にする)。

これで Adder.one(100) を呼び出すと、Type Dynamicによって Adder.applyDynamic("one")(100) になり、マクロを通じてコンパイル時に 100 + 1 に置き換えられます。なので全体的にASTを操作する処理になります。

で、ポイントは c.error のところ。これでコンパイルエラーになります。実際に sbt console で試してみます。

[info] Starting scala interpreter...
Welcome to Scala 2.12.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131).
Type in expressions for evaluation. Or try :help.

scala> Adder.one(100)
res0: Int = 101

scala> Adder.four(100)
<console>:12: error: four is not supported
       Adder.four(100)

意図した通りにコンパイルエラーになっています。と、まあこんな感じでType Dynamicしつつも、マクロを介して妥当かどうかをチェックしてあげられます。

ちなみにマクロの実態の方を先にコンパイルしてあげる必要などがあるので、sbtではプロジェクトを分けたりする必要があります。詳細はサンプルを参照してみてください。

実例

ScalikeJDBCのコードに良い実例があります。

import scalikejdbc._

case class PersonRecord(name: String, age: Int)

object PersonTable extends SQLSyntaxSupport[PersonRecord] {
  // ...
  
  // `column` がType Dynamicになっており、`SQLSyntaxSupport` に与えたクラスの
  // プライマリコンストラクタの引数のみ許可している。よって以下はコンパイルが通る
  column.name
  column.age
  
  // これは通らない。以下のようにコンパイルエラーとなる。
  //   PersonRecord#birthday not found. Expected fields are #name, #age.
  column.birthday
  
  // ...
}

こんな感じで型パラメータに与えられたクラスの情報を元にチェックできたり。ソースは下記の模様。

比較的安全に悪そうなこと出来たりしますが、乱用するとパッと見でどんなコードが生成されるか分かりづらいので、お約束ながら用法用量を守っていきたいですね。

パッケージの依存を単方向にするための拡張メソッド

DDDの話をするとしよう*1

IDDD 第9章 モジュールにおいて、モジュール(Scalaだとパッケージ)の依存は極力減らせ。現実的には難しいが、単方向にしたり、親子関係にあるならば双方向を許可する、ということが述べられている。

しかしこれめっちゃ難しい。集約が別集約のファクトリになる場合、単方向という制限はだいたい破綻する。

// userモジュールの集約
class User(id: UserId) {
  def createTask(): Task = // Taskを返すため、taskモジュールに依存
    new Task(TaskId.generate(), id)
}

// taskモジュールの集約
class Task(id: TaskId, ownerId: UserId) // ownerIdとしてUserIdを持つため、userモジュールに依存

そもそもこういった関係のものにファクトリを持たせるな、という話な気もしてくるが、だからといって避けては通れないときもあると思う。ならば親子関係にすれば良いのか、というとそうでもない。そうしちゃうとほとんどがuserモジュールの子になってしまう。たぶん。

そういうときに拡張メソッドを使うことも出来るな、という話をしたいわけです。

さきほどの例だと user.User をこうする。

class User(id: UserId)

要はファクトリを削る。これでtaskモジュールへの依存はなくなる。

で、taskモジュールに以下のようなのを書く。

import user._

package object task {
  implicit class TaskCreator(user: User) {
    def createTask(): Task =
      new Task(TaskId.generate(), user.id)
  }
}

user.User から task.TaskCreator への変換を定義し、そこに拡張メソッドとしてファクトリを持たせてあげる。こうするとファクトリのある場所がtaskモジュールに移動するので、依存が単方向に収まる。

ちなみに使うときは:

import user._
import task._

val user = createUser()
val task = user.createTask() // import task._ をしていないとcreateTaskメソッドが使えない

のように、両方のモジュールをimportしてあげる必要がある。が、むしろこれはtaskモジュール中のオブジェクトを使いますよ、という意味になるので、むしろ好都合だと思ってる。

実践で用いてはいないのだけど、「あれ、こうすればいいんじゃ?」と思った話でした。デメリットは実践してないから知らない。

*1:FGOやったことないです

Y8 2017 spring in Shibuyaでお話した #y8spring

久し振りに登壇しました。

内容はメディロムで開発しているウェブAPI*1を、どんな風にScalaで書いているか、どんな判断を下しているか、などといったものです。今の仕組みもそろそろこなれてきたし、次(イベントソーシング、CQRSとかな!)に行く前にいっちょまとめとくか、的なノリです。

登壇自体は午後からでしたが、午前の開会式から居たので、すべてのトークを聞いていたのですが、ベストトーカー賞でもあったid:motchangさんのお話は過去似たようなことをやった身としては気が気じゃない感じで、おもしろつらくて楽しかったですね。他の方のトークも学びや再確認できることがあって、楽しかったなーーー。

こんな楽しい機会を企画/運営してくれたスタッフのみなさん、会場を提供してくれた方々にありがとうのお気持ちをですね、お伝えします。また参加したいですねー。

*1:スライド中には明記していませんが、iOS/Android/ウェブブラウザアプリのためのJSONをおしゃべりするAPIのことです

ScalaのJSONライブラリcirceの使い方

ScalaJSONライブラリと言うと、json4sあたりが有名なのかと思います(私感)。

が、json4sはリフレクションを使うので、何かと避けたい方も多いかと思う。

ということで、circeを使ってみましょう。

circe概要

circeはScalaJSONライブラリです。

読み方

公式サイトに書かれています。

たしかにサーシーと読めるが、circeで調べるとWikipediaにヒットするので、キルケー呼びを推したい所存。通じれば何でもいいと思う。

特徴

元々はArgonautをforkして作られたものなので、それに似た特徴を持つ。

  • 純粋関数型のライブラリ(リフレクションを使っていない)
  • catsを内部的に使用
  • shepelessによるJSONオブジェクト ⇔ データ型の相互変換
  • Scala.js対応
  • Argonautにある複雑なオペレーター(--\とか)はすべて削除されている

各モジュールの用途

circeはモジュールに分かれている。よく使うモジュールはcirce-core, circe-parser, circe-genericの3つ。

circe-core

コアなクラス(Json, Encoder, Decoder, etc…)が含まれている。よく使うパッケージはio.circeとio.circe.syntaxのふたつ。

circe-parser

文字列をパースしてJSONオブジェクトに変換してくれるやつ。パーサーはjawnを使用していて、実態はcirce-jawnモジュールにある。

io.circe.parserパッケージのみがあり、parse関数やdecode関数を提供している。

circe-generic

よく使うパッケージはio.circe.generic.autoパッケージ。これを取り込んでおくと、JSONオブジェクト ⇔ case classおよび主要な型(String, Int, etc…)の相互変換を可能にしてくれる。

このモジュールによる変換は、変換が可能な場合コンパイルを通してくれる優れもの。反対に言うと、コンパイルが通らない場合は変換できない型が含まれている。その場合はその型のEncoder/Decoderを定義しておけばちゃんとcirce-genericモジュールで変換が可能になる。

よってコンパイルエラーとなった場合、ちゃんとすべての型が変換できるか確認すると良さ気。

その他モジュール

circe-opticsモジュールはJSONオブジェクトの操作(深い階層にある値を書き換えたりできる)を行えたりと、その他いろいろある。が、今回は割愛。

エンコード

JSONライブラリの主要な使い道はJSON文字列からデータ型への変換と、データ型からJSON文字列の変換を行うという2つだと思う。よってその2つの変換の方法を紹介。

データ型 → JSONオブジェクトの変換はエンコードと呼び、asJsonメソッドで行う。

asJsonメソッドの戻り値はio.circe.Jsonのオブジェクトとなり、このオブジェクトのnoSpacesメソッドやspaces4メソッドなどで文字列に変換できるので、HTTPのレスポンスなどとして返す場合はそのメソッドを用いる。

import io.circe.syntax._       // これがないと`asJson`メソッドが使えない
import io.circe.generic.auto._ // これがないと`io.circe.Encoder[Person]`を自動定義してくれなくて、`asJson`メソッドに渡すEncoderがないと怒られる

case class Person(name: String)

val person = Person("takkkun")

val json = person.asJson

println(json.noSpaces) // => {"name":"takkkun"}

asJsonメソッドはio.circe.Encoder[A]をimplicit parameterで受け取るようになっている。circe-genericモジュールのとこでも書いたように、case classと主要な型であればio.circe.generic.autoパッケージを取り込むだけでそれらの型のEncoderが使えるようになるので、上記のコードはちゃんと動作する。

もし定義されていない型のEncoderが必要になる場合は、自前で定義する。

import io.circe._
import io.circe.syntax._
import io.circe.generic.auto._

import org.joda.time.LocalDate

case class Person(name: String, birthday: LocalDate)

val person = Person("takkkun", new LocalDate(1987, 1, 15))

// `io.circe.Encoder[org.joda.time.LocalDate]`が定義されていないのでコンパイルエラー
person.asJson

implicit val localDateEncoder = new Encoder[LocalDate] {
  final def apply(a: LocalDate): Json =
    Json.fromString(a.toString("yyyy-MM-dd"))
}

val json = person.asJson

println(json.noSpaces) // => {"name":"takkkun","birthday":"1987-01-15"}

Encoderの定義方法いろいろ。以下はすべてio.circe.Encoder[org.joda.time.LocalDate]を定義している。

// `io.circe.Encoder[A]`トレイトを継承してその場で定義
implicit val localDateEncoder =
  new Encoder[LocalDate] {
    final def apply(a: LocalDate): Json =
      Json.fromString(a.toString("yyyy-MM-dd"))
  }

// `io.circe.Encoder[A].contramap`を使用(Aの箇所は変換先のJSON型を指定)
implicit val localDateEncoder: Encoder[LocalDate] =
  Encoder[String].contramap(_.toString("yyyy-MM-dd"))

// `io.circe.Encoder.encodeString.contramap`を使用(Stringの箇所は変換先のJSON型を指定、encodeIntとか)
implicit val localDateEncoder: Encoder[LocalDate] =
  Encoder.encodeString.contramap(_.toString("yyyy-MM-dd"))

デコード

JSONオブジェクト → データ型の変換はデコードと呼び、parse関数およびio.circe.Jsonまたはio.circe.HCursor (io.circe.ACursor)のasメソッドで行う。JSONオブジェクトにせず、JSON文字列から直接データ型へ変換する場合はdecode関数を使う。

import io.circe.parser._       // これがないと`parse`関数および`decode`関数が使えない
import io.circe.generic.auto._ // これがないと`io.circe.Decoder[Person]`を自動定義してくれなくて、`as`メソッドに渡すDecoderがないと怒られる

case class Person(name: String)

val jsonString = """{"name": "takkkun"}"""

parse(jsonString).right.flatMap(_.as[Person]) match {
  case Right(person) => println(person) // => Person(takkkun)
  case Left(error)   => println(error)
}

decode[Person](jsonString) match {
  case Right(person) => println(person) // => Person(takkkun)
  case Left(error)   => println(error)
}

asメソッドおよびdecode関数はio.circe.Decoder[A]をimplicit parameterで受け取るようになっている。エンコードのときと同様、case classと主要な型であればio.circe.generic.autoパッケージを取り込むだけでそれらの型のDecoderが使えるようになるので、上記のコードはちゃんと動作する。

もし定義されていない型のDecoderが必要になる場合は、自前で定義する。

import io.circe._
import io.circe.parser._
import io.circe.generic.auto._

import org.joda.time.LocalDate

case class Person(name: String, birthday: LocalDate)

val jsonString = """{"name": "takkkun", "birthday": "1987-01-15"}"""

// `io.circe.Decoder[org.joda.time.LocalDate]`が定義されていないのでコンパイルエラー
decode[Person](jsonString)

implicit val localDateDecoder = new Decoder[LocalDate] {
  final def apply(c: HCursor): Decoder.Result[LocalDate] =
    c.as[String].right.map(LocalDate.parse)
}

decode[Person](jsonString) match {
  case Right(person) => println(person) // => Person(takkkun,1987-01-15)
  case Left(error)   => println(error)
}

Decoderの定義方法いろいろ。以下はすべてio.circe.Decoder[org.joda.time.LocalDate]を定義している。

// `io.circe.Decoder[A]`トレイトを継承してその場で定義
implicit val localDateDecoder =
  new Decoder[LocalDate] {
    final def apply(c: HCursor): Decoder.Result[LocalDate] =
      c.as[String].right.map(LocalDate.parse)
  }

// `io.circe.Decoder[A].map`を使用(Aの箇所は変換元のJSON型を指定)
implicit val localDateDecoder: Decoder[LocalDate] =
  Decoder[String].map(LocalDate.parse)

// `io.circe.Decoder.decodeString.map`を使用(Stringの箇所は変換元のJSON型を指定、decodeIntとか)
implicit val localDateDecoder: Decoder[LocalDate] =
  Decoder.decodeString.map(LocalDate.parse)

まとめ

circeによる相互変換はだいたい書いたやつで事足りる。と思う。少なくとも私が実際にjson4sからcirceに乗り換えたときには。

Argonautも使ってみたが、毎回すべての型に対してEncoder/Decoderを書かなければいけないので、なかなか面倒だった。その点circeは必要な分だけ書けばいいので、これが思いの外嬉しい。実際に書いたらよく分かった。

その上でリフレクション使っていないので、なんというか安心感があって助かる。今後はcirceを使っていこうかな。

Akka HTTPを使ってみる。Akka Streamsにもちょっと触れるよ

Akka HTTPとはAkkaの上でHTTPサーバーを実現したものです。HTTPクライアントとしての機能を持っていたり、WebSocketのサポートもしています。

sprayという似たものもありますが、spray/sprayには「長らくメンテしてないよ。Akka HTTPに置き換えらているよ」とアナウンスされています。ですので、素直にAkka HTTPを使います。

ふたつのAPI

Akka HTTPのサーバーサイドにはレベルの異なるふたつのAPIがあります。

低レベルAPILow-Level Server-Side API

Akka HTTPはAkka、もっと言うとAkka Streamsを利用して書かれているのですが、低レベルAPIはそのAkka Streamsが剥き出しになっており、よりHTTPに近い方(コネクションとか)にも触れられます。こちらを利用することによってWebSocketによるアクセスを捌くことも出来ます。

Akka Streamsが剥き出しにはなっていますが、それらのあれこれをやってくれて、ちょっと短く書けるように、bindAndHandleSyncというメソッドがあります。そのメソッドを使うと以下のように書けます。

import akka.actors.ActorSystem
import akka.streams.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, Uri }
import akka.http.scaladsl.model.HttpMethods._

implicit val system = ActorSystem()

implicit val materializer = ActorMaterializer()

val handle: HttpRequest => HttpResponse = {
  case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
    HttpResponse(entity = "pong")
}

Http().bindAndHandleSync(handle, "localhost", 8080)

bindAndHandleAsyncメソッドもありますが、こちらはHttpRequest => Future[HttpResponse]と、Futureを使うようになります。

高レベルAPIHigh-Level Server-Side API

高レベルAPIは低レベルAPIをラップし、ルーティングの機能を備えたものです(よく見るとこのページのURLも"routing-dsl"になっていますね)。その分低レベルAPIに比べて細やかなことは出来ません。

高レベルAPIでは、akka.http.scaladsl.server.Directivesのメソッド群と、bindAndHandleメソッドを使います。

import akka.actors.ActorSystem
import akka.streams.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._

implicit val system = ActorSystem()

implicit val materializer = ActorMaterializer()

val route =
  path("ping") {
    get {
      complete("pong")
    }
  }

Http().bindAndHandle(route, "localhost", 8080)

bindAndHandleメソッドの第1引数に渡すのはFlow[HttpRequest, HttpResponse, Any]という型の値で、このFlowというのはAkka Streamsのものです。ですので、高レベルAPIにおいてもAkka Streamsが隠蔽されるわけではないのですが、akka.http.scaladsl.server.Directivespath, get, completeはすべてこのオブジェクトのメソッド)を用いるので、低レベルAPIに比べるとほとんど意識しなくなります。

反対に、低レベルAPIにおいてFlowを用いる箇所ではakka.http.scaladsl.server.Directivesで組み立てたものを渡せたりもします。

低レベルAPIを使ってみる

高レベルAPIakka.http.scaladsl.server.Directivesが提供するメソッド群を使ったルーティングの記述はややクセがありますが(sprayに触れてた人はほぼ同じだから馴染みやすいらしい)、見て分からないほどのものではないと思います。が、これは他のウェブフレームワークでもやってくれそうなもの。というわけで、低レベルAPIの方を詳しく見ていきます。

その前に軽くAkka Streamsを説明。

  • Source: 何かしらの入力を発生させる。始端。ひとつの出力チャンネルを持つ
  • Sink: Sourceから発生した入力が最終的に行き着く先。終端。ひとつの入力チャンネルを持つ
  • Flow: 入力を受け取って、別の形に変換し出力する。入力チャンネルと出力チャンネルをひとつずつ持つ

より突っ込んだ話であれば:

あたりが詳しかったです。

Akka HTTPの低レベルAPIはこのAkka Streamsを利用しているので、それをわざとらしく使用したコードが以下です。

import akka.http.scaladsl.Http
import akka.stream.scaladsl.Sink

val source = Http().bind("localhost", 8080)

source.runWith(Sink.foreach { connection =>
  println(s"New connection is accepted: ${connection.remoteAddress}")
  // handle connection
})

bindAndHandleSyncメソッドやbindAndHandleメソッドではなく、bindメソッドを使用すると、戻り値はSource[Http.IncomingConnection, Future[Http.ServerBinding]]となります。ポイントはSourceを返している点です。

このSourceはHttp.IncomingConnectionを発生させるSourceです。ですので、このあとにFlowやSinkを繋げばHttp.IncomingConnection、つまりHTTPによってアクセスされた場合のコネクションを得ることが出来るわけです。

source.runWithメソッドにSinkを渡すと実行されるので、HTTPコネクションを受け取るSinkを渡してあげればOKです。

ただ、Sinkは入力の行き着く先ですので、これだけではHTTPコネクションを受け取って終わりになってしまいます。何の処理もしていません。8080ポートで接続を受け付けるようになっただけです。

ですので、connection.handleWithメソッドを使います。

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, Uri }
import akka.http.scaladsl.model.HttpMethods._
import akka.stream.scaladsl.{ Sink, Flow }

val source = Http().bind("localhost", 8080)

source.runWith(Sink.foreach { connection =>
  println(s"New connection is accepted: ${connection.remoteAddress}")

  connection.handleWith(Flow.fromFunction {
    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "pong")
  })

  // Http().bindAndHandleSync(...) の第1引数に渡すものと一緒
  // connection.handleWithSyncHandler(handler)

  // Http().bindAndHandleAsync(...) の第1引数に渡すものと一緒
  // connection.handleWithAsyncHandler(handler)
})

handleWithメソッドは第1引数にFlow[HttpRequest, HttpResponse, Mat]Matの説明は割愛)を受け取ります。想像に難くないかもしれませんが、HttpRequestを入力として受け取り、HttpResponseを出力として返すFlowのことであり、そのFlowでHTTPリクエストを処理してHTTPレスポンスを返してやってね、ということです。

これでHttp.IncomingConnectionを受け取った後、あらたにHttpRequestを受け取りHttpResponseを返すというふたつのストリームによって、HTTPサーバーを実現します。

ちなみにコメントでも書きましたが、Http.IncomingConnectionにはhandleWithSyncHandlerメソッドとhandleWithAsyncHandlerメソッドも定義されていて、これはHttp()で使えるbindAndHandleSyncメソッドおよびbindAndHandleAsyncメソッドにて実現できることをHttp.IncomingConnectionにて実現するものです。

とまあこんな風にHTTPリクエストなどなどを扱えます。高レベルにHTTPを隠蔽するなら他の選択肢もありますが、HTTPコネクションのレベルで何かしてやる場合、Akka HTTPはひとつの選択肢だなーと思いました。