hotwheels源码剖析
在霸爷的推荐下,看了hotwheels的代码,接下来我就来分析下hotwheels的代码(主要是server端代码),hotwheels是干吗的呢,介绍在这里:
https://github.com/tolbrino/hotwheels
Janus is a messaging server optimized to unicast over TCP to thousands of clients subscribed to topics of interest.
The ultimate goal is to maintain a latency of less than 2 seconds for 20 thousand clients on Amazon EC2 (small instance).
首先来看janus.app:
[erlang]
{application, janus,
[{description, “Janus”},
{vsn, “0.0.1”},
{id, “janus”},
{modules, [barrier,
bin,
bot,
client_proxy,
common,
flashbot,
histo,
janus,
janus_acceptor,
janus_admin,
janus_app,
janus_flash,
launcher,
mapper,
pubsub,
topman,
t,
transport,
util
]},
{registered, [janus_sup,
janus_topman_sup,
janus_proxy_mapper_sup,
janus_transport_sup,
janus_listener]},
{applications, [kernel,
stdlib,
mnesia,
inets
]},
{mod, {janus_app, []}},
{env, []}
]
}.
[/erlang]
具体每个域的意思这里就不介绍了,详细可以去看erlang的文档 http://www.erlang.org/doc/design_principles/applications.html
我们主要来看mod这个tuple,可以看到回调模块是janus_app,所以我们就从janus_app开始。
通过模块定义我们可以清楚的看到这个模块是一个application:
[erlang]
-module(janus_app).
-behaviour(application).
[/erlang]
因此我们来看它的start函数:
[erlang]
-define(LISTEN_PORT, 8081).
start(_Type, _Args) ->
Port = janus_admin:get_env(listen_port, ?LISTEN_PORT),
supervisor:start_link({local, ?MODULE},
?MODULE,
[Port, transport]).
[/erlang]
这里可以看到首先会从环境变量里面取得端口(命令行参数),而默认的port是8001,然后调用supervisor start_link函数,这个函数会启动创建并启动一个supervisor,这里可以看到回调模块是当前模块,因此我们接下来就来看当前模块的init函数.
init返回的child spec的格式我就不介绍了,可以去看erlang的手册
[erlang]
init([Port, Module]) ->
{ok,
{_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
%% TCP server
{janus_sup,
{janus_acceptor, start_link, [self(), Port, Module]},
permanent,
2000,
worker,
[janus_acceptor]
},
%% Topic manager
{janus_topman_sup,
{topman, start, []},
permanent,
2000,
worker,
[topman]
},
%% Client proxy mapper
{janus_proxy_mapper_sup,
{mapper, start, [client_proxy_mapper]},
permanent,
2000,
worker,
[mapper]
},
%% Client instance supervisor
{janus_transport_sup,
{supervisor, start_link, [{local, janus_transport_sup},
?MODULE, [Module]]},
permanent,
infinity,
supervisor,
[]
}
]
}
};
[/erlang]
从上面的代码可以看到这个supervisor一共会监控4个子进程,其中3个是worker,1个是supervisor。
对应的三个worker的名字分别是:
janus_sup(janus_acceptor:start_link())
janus_topman_sup(topman:start())
janus_proxy_mapper_sup(mapper:start(client_proxy_mapper))
而唯一的supervisor是janus_transport_sup(supervisor:start_link(transport))。
后面的括号注明了子进程的启动模块和回调函数。
从上面代码的注释可以看到每个子进程都是干嘛的,我们一个个来分析,首先来看第一个janus_sup进程,这个进程调用janus_acceptor模块的start_link启动的,所以我们来看janus_acceptor这个模块。
[erlang]
start_link(Parent, Port, Module)
when is_pid(Parent),
is_integer(Port),
is_atom(Module) ->
Args = [Parent, Port, Module],
proc_lib:start_link(?MODULE, acceptor_init, Args).
[/erlang]
这里可以看到代码比较简单,就是调用start_link启动一个子进程,子进程的模块就是当前模块,然后回调函数是acceptor_init,参数是一个list,包含三个参数,分别是父进程id,端口号,以及module, 父进程id所指的就是的supervisor的进程id,而module是指transport模块(可以看前面janus_app模块)。
这里要注意在调用proc_lib:start_link之前,一直是处于supervisor进程中的,当start_link之后,才是启动了子进程.这里使用了proc_lib:start_link,这个函数是同步的启动一个子进程,它会一直等待,直到子进程调用init_ack,才会返回.
因此接下来我们来看acceptor_init这个函数:
[erlang]
acceptor_init(Parent, Port, Module) ->
State = #state{
parent = Parent,
port = Port,
module = Module
},
error_logger:info_msg(“Listening on port pn”, [Port]),
case (catch do_init(State)) of
{ok, ListenSocket} ->
proc_lib:init_ack(State#state.parent, {ok, self()}),
acceptor_loop(State#state{listener = ListenSocket});
Error ->
proc_lib:init_ack(Parent, Error),
error
end.
[/erlang]
这个函数可以看到就是通过调用do_init来得到监听的listen socket,然后根据返回值来做一些操作,这里可以看到不论失败,成功都会调用init_ack来返回值给父进程,当成功之后,就会调用acceptor_loop来进入后续处理.
在看acceptor_loop之前,线来看do_init方法:
[erlang]
do_init(State) ->
Opts = [binary,
{packet, 0},
{reuseaddr, true},
{backlog, 1024},
{active, false}],
case gen_tcp:listen(State#state.port, Opts) of
{ok, ListenSocket} ->
{ok, ListenSocket};
{error, Reason} ->
throw({error, {listen, Reason}})
end.
[/erlang]
这里调用gen_tcp的listen方法,我们着重来看传入listen的opts,这里可以看到active被设置为false,也就是每次必须主动地调用recv来读取数据。
然后来看acceptor_loop 函数,也就是server子进程的主循环函数,这个函数主要就是通过accept来接收客户端的连接,然后交给后续模块处理.
[erlang]
acceptor_loop(State) ->
case (catch gen_tcp:accept(State#state.listener, 50000)) of
{ok, Socket} ->
handle_connection(State, Socket),
?MODULE:acceptor_loop(State);
{error, Reason} ->
handle_error(Reason),
?MODULE:acceptor_loop(State);
{‘EXIT’, Reason} ->
handle_error({‘EXIT’, Reason}),
?MODULE:acceptor_loop(State)
end.
[/erlang]
这里先暂停一下,我们先来看最后一个被supervisor监控的子进程,也就是一个子supervisor,janus_transport_sup。来看它的child spec:
[erlang]
{janus_transport_sup,
{supervisor, start_link, [{local, janus_transport_sup},
?MODULE, [Module]]},
permanent,
infinity,
supervisor,
[]
}
[/erlang]
可以看到他会继续创建一个新的supervisor,然后也是当前模块(janus_app),只不过参数是一个参数,因此我们来看另外的一个init函数:
[erlang]
init([Module]) ->
{ok,
{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
%% TCP Client
{undefined,
{Module, start_link, []},
temporary,
2000,
worker,
[]
}
]
}
}.
[/erlang]
可以看到这个child spec,重启策略是simple_one_one,也就是需要手动重启,并且它将会创建的子进程是Module(transport模块)的start_link函数来启动.
接下来就来看transport的启动函数以及init函数,这个模块是一个gen_server.
[erlang]
-behavior(gen_server).
start_link(Port)
when is_integer(Port) ->
gen_server:start_link(?MODULE, [Port], []).
init([Port]) ->
process_flag(trap_exit, true),
{ok, #state{port = Port, transport = janus_flash }}.
[/erlang]
这里需要注意的是process_flag(trap_exit, true),这个其实也就是设置表示父进程将会接收子进程的crash信息。还有一个就是state,这里state的transport设置为了janus_flash模块.
ok,然后我们再回到janus_acceptor模块,接下来来看假设有一个连接过来之后的情况。这里跳过错误处理,就来看看正确的处理流程。
[erlang]
handle_connection(State, Socket),
?MODULE:acceptor_loop(State);
[/erlang]
当正确接到新的连接之后,会进入handle_connection的处理,然后调用acceptor_loop进入递归.因此我们就来看handle_connection
[erlang]
handle_connection(State, Socket) ->
{ok, Pid} = janus_app:start_transport(State#state.port),
ok = gen_tcp:controlling_process(Socket, Pid),
%% Instruct the new handler to own the socket.
(State#state.module):set_socket(Pid, Socket).
[/erlang]
这里做了3个操作,首先调用janus_app:start_transport来启动一个新的子进程,而这个子进程是属于那个supervisor呢,来看代码:
[erlang]
start_transport(Port) ->
supervisor:start_child(janus_transport_sup, [Port]).
[/erlang]
可以看到它启动了janus_transport_sup这个supervisor的子进程,而我们还记得前面分析的,这个supervisor的子进程的启动回调就是transport模块的start_link函数。这里要注意start_child返回的是子进程的pid.
[erlang]
start_link(Port)
when is_integer(Port) ->
gen_server:start_link(?MODULE, [Port], []).
[/erlang]
然后接下来的两个操作,就是将当前进程接受到的socket传递给新建的子进程,然后调用transport的set_socket方法。然后我们来看transport模块的set_socket方法.
[erlang]
set_socket(Ref, Sock) ->
gen_server:cast(Ref, {set_socket, Sock}).
[/erlang]
可以看到就是给新建的子进程发送一个set_socket的方法.这里要注意就是会设置socket的属性,也就是设置active为once。
[erlang]
handle_cast({set_socket, Socket}, State) ->
inet:setopts(Socket, [{active, once},
{packet, 0},
binary]),
{ok, Keep, Ref} = (State#state.transport):start(Socket),
keep_alive_or_close(Keep, State#state{socket = Socket, state = Ref});
[/erlang]
这里可以看到调用了state的transport的start方法,那么这个transport是那个模块呢,上面的分析中在当前transport的init方法中返回e设置的就是janus_flash模块,所以这里调用的就是janus_flash:start方法.
[erlang]
start(Socket) ->
Send = fun(Bin) -> gen_tcp:send(Socket, [Bin, 1]) end,
{ok, Proxy, Token} = client_proxy:start(Send),
State = #state{
socket = Socket,
proxy = Proxy,
token = Token
},
JSON = {struct,
[{<<”timestamp”>>, tuple_to_list(now())},
{<<”token”>>, Token}
]},
send(mochijson2:encode(JSON), State).
[/erlang]
这里可以看到先是创建了一个send方法,然后调用client_proxy start,这里client_proxy其实是一个gen_server,因此我们来看这个模块的start方法以及 init方法.
[erlang]
start(Send) ->
Token = common:random_token(),
{ok, Pid} = gen_server:start_link(?MODULE, [Token, self(), Send], []),
{ok, Pid, Token}.
init([Token, Parent, Send]) ->
process_flag(trap_exit, true),
ok = mapper:add(client_proxy_mapper, Token),
State = #state{
token = Token,
parent = Parent,
send = Send,
messages = []
},
{ok, State}.
[/erlang]
可以看到init方法里面调用了mapper模块的add方法,因此来看mapper:add方法
[erlang]
add(Ref, Key) ->
gen_server:call(Ref, {add, Key, self()}).
[/erlang]
可以看到也就是给client_proxy_mapper这个进程发送了一个同步的消息,而对应的client_proxy_mapper也就是一开始在janus_app模块中注册的进程,这个进程就是mapper模块启动的。因此来看mapper的对应同步消息接收。
[erlang]
handle_call({add, Key, Pid}, _, State) ->
case ets:lookup(State#state.key_pid, Key) of
[_] ->
ok;
_ ->
Ref = erlang:monitor(process, Pid),
ets:insert(State#state.key_pid, {Key, {Pid, Ref}}),
ets:insert(State#state.pid_key, {Pid, Key})
end,
{reply, ok, State};
[/erlang]
这里也就是将随机出来的token和进程通过ets关联。
前面这里对于数据的发送分析完了,剩下的就是连接的错误,断开处理以及数据的接收处理,线来看连接的接收处理,通过上面的分析,我们知道,accept到的socket是处于transport这个gen_server管理的,因此读取数据就在这个里面处理:
[erlang]
handle_info({tcp, Socket, <<”
when Socket == State#state.socket ->
inet:setopts(Socket, [{active, once}]),
dispatch(Bin, janus_flash, State);
[/erlang]
这里主要还是调用dispatch来处理数据的读取,先是调用janus_flash的process方法,然后调用keep_alive_or_close来判断是否连接已经关闭.
[erlang]
dispatch(Data, Mod, State = #state{transport = Mod}) ->
{ok, Keep, TS} = Mod:process(Data, State#state.state),
keep_alive_or_close(Keep, State#state{state = TS}).
[/erlang]