ErlangでTwitterのUserStreamを受け取る

以前もErlangでTwitter Streaming APIを使うといったエントリを書いたのですが、いかんせん情報が古すぎます。UserStreamではなく、素のStreaming APIなのはともかく、認証がベーシック認証だったりします。

その割にはどうやら最近参照されているらしい。http://naoyat.hatenablog.jp/entry/2012/01/04/220639http://d.hatena.ne.jp/siritori/20120312/1331503357には以前のエントリのURLが貼られているようで。いや、なんかすみません。

ということで、ちゃんと動くかつOTPで書き直してみました。erlang-oauthに依存しています。

-module(userstream).
-author("Takahiro Kondo <heartery@gmail.com>").

-export([start/5, start/6, start_link/5, start_link/6, stop/1]).

-behavior(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

-record(state, {id, processor}).

start(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret) ->
    start(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret, []).

start(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret, Options) ->
    Args = [Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret],
    gen_server:start(?MODULE, Args, Options).

start_link(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret) ->
    start_link(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret, []).

start_link(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret, Options) ->
    Args = [Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret],
    gen_server:start_link(?MODULE, Args, Options).

stop(Server) ->
    gen_server:cast(Server, stop).

%% callback functions

init([Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret]) ->
    Url = "https://userstream.twitter.com/2/user.json",
    Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
    Options = [{sync, false}, {stream, self}],
    case oauth:post(Url, [], Consumer, AccessToken, AccessTokenSecret, Options) of
        {ok, Id}        -> {ok, #state{id = Id, processor = Processor}};
        {error, Reason} -> {stop, {http_error, Reason}}
    end.

handle_call(_, _, State) ->
    {noreply, State}.

handle_cast(stop, State) ->
    {stop, normal, State}.

handle_info({http, {Id, stream_start, Headers}}, #state{id = Id, processor = Processor} = State) ->
    send(Processor, {start, Headers}),
    {noreply, State};

handle_info({http, {Id, stream, <<"\r\n">>}}, #state{id = Id} = State) ->
    {noreply, State};

handle_info({http, {Id, stream, Part}}, #state{id = Id, processor = Processor} = State) ->
    send(Processor, {stream, Part}),
    {noreply, State};

handle_info({http, {Id, {error, Reason}}}, #state{id = Id, processor = Processor} = State) ->
    send(Processor, {error, Reason}),
    {stop, {http_error, Reason}, State}.

terminate(_, #state{id = Id, processor = Processor}) ->
    send(Processor, stop),
    httpc:cancel_request(Id).

code_change(_, State, _) ->
    {ok, State}.

%% private functions

send(To, Message) ->
    To ! Message.

ちゃんと動くかは確認しましたが、process_flagとかは呼んでいないのでそこらへんは適当に。gen_serverですので、ちゃんと設定すればそこまで手こずることなくsupervisor treeに組込めるかと思います。

本当はuserstreamモジュールをさらにビヘイビアにして、handle_status/2, handle_favorite/2とかで各イベントをハンドリングできるようにするといいんですが、それをやるとちょっと複雑になるので、そこまではやりません。

ちなみに使い方はこんな感じで。

-module(example).
-author("Takahiro Kondo <heartery@gmail.com>").

-export([start/0, stop/1]).

start() ->
    Processor = spawn(fun() -> process() end),
    ConsumerKey = "Your consumer key",
    ConsumerSecret = "Your consumer secret",
    AccessToken = "Your access token",
    AccessTokenSecret = "Your access token secret",
    userstream:start(Processor, ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret).

stop(Pid) ->
    userstream:stop(Pid).

process() ->
    receive
        {start, Headers} ->
            io:format("Start: ~p~n", [Headers]),
            process();
        {stream, Part} ->
            io:format("Stream: ~p~n", [Part]),
            process();
        {error, Reason} ->
            io:format("Error: ~p~n", [Reason]),
            process();
        stop ->
            io:format("Stop~n")
    end.
$ erl -s inets -s ssl
> {ok, Pid} = example:start().
ここにUserStreamからの応答が表示される(example:process/0で標準出力に吐き出してるため)
> example:stop(Pid).
Stop
>

OTPを使いつつ複雑すぎない書き方をしてみました。必ずしもOTPを使う必要はないですし、メリットばかりでもないんですが、アプリケーションがある程度複雑になってきたら使った方が良いかなと思います。それこそ書き捨てのコードでは不要でしょうが、あのプロセスが動いて、こっちであーでどーで、とかで頭のリソース割かれるならOTPを学ぶ価値はアリかなと。

余談ではありますが、余力があれば自作のtwitterモジュールをGitHubにでもあげておきたいもんですね。それにはTwitterREST APIはもちろん、先述したUserStream用ビヘイビアも書いてはあるんですよ。ただ随分前からメンテナンスをサボってるので、REST APIが古過ぎるという感じで…… なんか一から書いた方が早そう。