Twitter Streaming APIをErlangから使ってみる

TwitterさんはたくさんAPIを公開しており, 開発者からしたら頭が下がる想いですが, その中でもちょっと特殊なStreaming API.

Twitter API Wiki / Streaming API Documentation

Public Timeline上を流れるつぶやきを次々と受け取るAPIですね. 種類があるのでとりあえず書いておきます.

メソッドの種類

sampleメソッド

Public Timeline上のつぶやきをランダムにピックアップし, 応答として返してくれるメソッドです. 「とりあえずどんな感じなのか知りたいんだけど」って人にもってこいです.

URL
http://stream.twitter.com/1/statuses/sample.format
リクエストメソッド
GET
firehoseメソッド

Public Timeline上の全つぶやきを応答として返してくれるメソッド. おそらく消化ホース(firehose)の中を流れる水のよう, として名付けられたんじゃないでしょうか? 全つぶやきなのでそれはそれは激流なんだろうなぁ, と勝手に想像してました.

しかしこのメソッド, 利用時のアカウントに特定のロール(権限)が与えられていないと使えません. なので普段はあまり使(わ|え)ないでしょう.

URL
http://stream.twitter.com/1/statuses/firehose.format
リクエストメソッド
GET
filterメソッド

使うならこっちのfilterメソッド. Public Timeline上から条件に該当するつぶやきを応答として返してくれるメソッドです. 指定できる条件はユーザーID(follow)または任意のキーワード(track), です. 双方ともカンマ区切りで複数指定できます. 片方, または両方指定するコトもできますが, 両方省略するコトはできません. firehoseメソッドと同じになっちゃいますからね.

注意するコトはfollowにはScreen nameではなく, ユーザーIDしか指定できません. ボクだったらtakkkunではなく, 14976270です. またtrackにはマルチバイト文字を指定するコトができないようです. またデフォルトではfollowに200個, trackに400個までしか指定できません.

URL
http://stream.twitter.com/1/statuses/filter.format
リクエストメソッド
POST
retweetメソッド

コレだけ試していないのでなんともですが, Public Timeline上を流れるRetweetsを応答として返すメソッドでしょう. そんな感じ.

URL
http://stream.twitter.com/1/statuses/retweet.format
リクエストメソッド
GET

気になるところ

@tsupo さんによるTwitter APIの日本語訳があります.

twitterAPI.txt at master from tsupo's Twitter-API-Specification--written-in-Japanese- - GitHub

ここのStreaming APIの項にgardenhose, birddog, shadowというのがあります. しかし先のStreaming API Documentにはありません. コレらは代わりにロールという位置づけになっており, そのロールが付与されると, filterメソッドのfollowまたはtrackに指定できるIDまたはキーワードの個数が増える, というものです.

どちらにせよロールは設けるのでしょうが, filterメソッドの制限を拡張するのか, 新たなメソッドが使えるようになるのか, どうなるか気になるところです.

Erlangから実際に使ってみる

とりあえず現状で動くコードを書いてみます. 試すだけならcurlを使えばいいんですが, コードから扱ってみたかったのでErlangで試してみました.

-module(twitter_stream).
-author("KONDO Takahiro <heartery@gmail.com>").

-export([filter/3, stop/1]).

filter(User, Password, Data) ->
    Url = lists:append(["http://", User, ":", Password, "@stream.twitter.com/1/statuses/filter.json"]),
    Request = {Url, [], "application/x-www-form-urlencoded", Data},
    stream(post, Request, processor()).

stop(Streamer) ->
    send(Streamer, stop),
    receive {Streamer, stopped} -> stopped end.

stream(RequestMethod, Request, Processor) ->
    spawn(fun() ->
                  Options = [{sync, false}, {stream, self}],
                  case http:request(RequestMethod, Request, [], Options) of
                      {ok, RequestId} -> loop(RequestId, Processor);
                      {error, Reason} -> send(Processor, {error, Reason})
                  end
          end).

loop(RequestId, Processor) ->
    receive
        {http, {RequestId, stream_start, Headers}} ->
            send(Processor, {start, Headers}),
            loop(RequestId, Processor);
        {http, {RequestId, stream, Part}} ->
            case Part of
                <<"\r\n">> -> noop;
                _          -> send(Processor, {stream, Part})
            end,
            loop(RequestId, Processor);
        {http, {RequestId, {error, Reason}}} ->
            send(Processor, {error, Reason});
        {From, stop} ->
            http:cancel_request(RequestId),
            send(From, stopped)
    end.

processor() ->
    spawn(fun() -> process_loop() end).

process_loop() ->
    receive
        {_Streamer, {start, Headers}} ->
            io:format("Start: ~p~n", [Headers]),
            process_loop();
        {_Streamer, {stream, Part}} ->
            io:format("Stream: ~p~n", [Part]),
            process_loop();
        {_Streamer, {error, Reason}} ->
            io:format("Error: ~p~n", [Reason])
    end.

send(To, Message) ->
    To ! {self(), Message}.

つぶやきひとつひとつは改行で区切られるので, それを元にして切り出しています.

http:request/4のオプションとして{stream, self}を用いると, http:request/4を呼び出したプロセスに次々とレスポンスが流れ込んでくるので, それを改行区切りで別プロセスに渡すだけです. 実用するならその別プロセス(コード上ではprocessor)を外から渡せるようにした方がいいでしょうね.

とりあえずこんな感じに使います. erlang(大文字小文字は関係ない)を含むつぶやきを受け取ります.

% erl
Erlang R13B01 (erts-5.7.2) [source] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.7.2  (abort with ^G)
1> inets:start().
ok
2> Streamer = twitter_stream:filter("Twitter ID", "Password", "track=erlang").
<0.48.0>
% 標準出力に結果を出力しているので, ここに応答が次々と表示される
3> twitter_stream:stop(Streamer).
stopped

コレでStreaming APIからつぶやきを受け取りつついろんなコトができそうです.