mochiweb源码分析(一)
这篇主要分析下mochiweb的整体结构。
我这里看的代码是github上的最新代码( https://github.com/mochi/mochiweb )。
由于mochiweb是一个框架,因此我们就从他自带的简单例子入手,来看他是如何工作的。我们就从keepalive.erl开始。
首先来看这个模块的start函数,也就是它的启动函数:
[erlang]
-define(LOOP, {?MODULE, loop}).
start(Options = [{port, _Port}]) ->
mochiweb_http:start([{name, ?MODULE}, {loop, ?LOOP} | Options]).
[/erlang]
可以看到启动函数非常简单,那就是直接调用mochiweb_http模块的start函数。那么我们整个分析流程,就从这个模块的start函数开始。这里要注意,我们可以看到是将当前回调模块的loop函数传递给了mochiweb_http,这就给后续调用回调,提供了接口。
[erlang]
%% @spec start(Options) -> ServerRet
%% Options = [option()]
%% Option = {name, atom()} | {ip, string() | tuple()} | {backlog, integer()}
%% | {nodelay, boolean()} | {acceptor_pool_size, integer()}
%% | {ssl, boolean()} | {profile_fun, undefined | (Props) -> ok}
%% | {link, false}
%% @doc Start a mochiweb server.
%% profile_fun is used to profile accept timing.
%% After each accept, if defined, profile_fun is called with a proplist of a subset of the mochiweb_socket_server state and timing information.
%% The proplist is as follows: [{name, Name}, {port, Port}, {active_sockets, ActiveSockets}, {timing, Timing}].
%% @end
start(Options) ->
mochiweb_socket_server:start(parse_options(Options)).
[/erlang]
这里会调用mochiweb_socket_server的start函数启动mochiweb server,可是在调用之前会调用parse_options来解析。
[erlang]
parse_options(Options) ->
{loop, HttpLoop} = proplists:lookup(loop, Options),
Loop = {?MODULE, loop, [HttpLoop]},
Options1 = [{loop, Loop} | proplists:delete(loop, Options)],
mochilists:set_defaults(?DEFAULTS, Options1).
[/erlang]
这里由于Options是一个tuple list,因此这里调用proplists的lookup函数对整个tuple list进行查找,其中key就是loop,它会和每个tuple的第一个元素比较,如果相等就会返回当前的tuple,下面就是proplists的lookup函数的实现:
[erlang]
lookup(Key, [P | Ps]) ->
if is_atom(P), P =:= Key ->
{Key, true};
%%关键在这里,可以看到会取出tuple的第一个元素和key比较.
tuple_size(P) >= 1, element(1, P) =:= Key ->
%% Note that Key
does not have to be an atom in this case.
P;
true ->
lookup(Key, Ps)
end;
lookup(_Key, []) ->
none.
[/erlang]
当解析出来loop回调之后,就进入了mochiweb_socket_server模块的处理,这个模块是一个gen_server.我们来看它的start函数:
[erlang]
start_link(Options) ->
start_server(start_link, parse_options(Options)).
start(Options) ->
case lists:keytake(link, 1, Options) of
{value, {_Key, false}, Options1} ->
start_server(start, parse_options(Options1));
_ ->
%% TODO: https://github.com/mochi/mochiweb/issues/58
%% [X] Phase 1: Add new APIs (Sep 2011)
%% [_] Phase 2: Add deprecation warning
%% [_] Phase 3: Change default to {link, false} and ignore link
%% [_] Phase 4: Add deprecation warning for {link, _} option
%% [_] Phase 5: Remove support for {link, _} option
start_link(Options)
end.
[/erlang]
可以看到link是否存在,最终调用调用parse_options(mochiweb_socket_server)来pase对应的参数,然后再调用start_server函数。这里parse_options就不详细介绍了,只需要知道它是用来解析参数(ip,port…)的就够了。我们详细来看start_server.
[erlang]
start_server(F, State=#mochiweb_socket_server{ssl=Ssl, name=Name}) ->
ok = prep_ssl(Ssl),
case Name of
undefined ->
gen_server:F(?MODULE, State, []);
_ ->
gen_server:F(Name, ?MODULE, State, [])
end.
[/erlang]
可以看到start_server会直接调用gen_sever的start函数,也就是启动当前gen server,不过我们注意到这里调用的是start,而不是start_link,这是因为在mochiweb中,并没有实现supervision行为的模块,而是在当前的mochiweb_socket_server中实现了简单的监控树行为,后续我们会看到。
在继续看mochiweb_socket_server的init函数之前,我们先来看核心的数据结构,也就是mochiweb_socket_server这个record,我们可以看到这个record包含了将会被回调的loop,连接池,backlog等等的信息,而这个结构就是在刚才上面没有分析的parse_options中设置的,然后最终传递给init函数。可以看到这里面主要是一些将要传递给erlang的socket option。
[erlang]
-record(mochiweb_socket_server,
{port,
loop,
name=undefined,
%% NOTE: This is currently ignored.
max=2048,
ip=any,
listen=null,
nodelay=false,
backlog=128,
active_sockets=0,
acceptor_pool_size=16,
ssl=false,
ssl_opts=[{ssl_imp, new}],
acceptor_pool=sets:new(),
profile_fun=undefined}).
[/erlang]
然后来看init函数.这个函数主要是构造socket opts,然后传递listen函数。
[erlang]
init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog, nodelay=NoDelay}) ->
process_flag(trap_exit, true),
BaseOpts = [binary,
{reuseaddr, true},
{packet, 0},
{backlog, Backlog},
{recbuf, ?RECBUF_SIZE},
{active, false},
{nodelay, NoDelay}],
Opts = case Ip of
any ->
case ipv6_supported() of % IPv4, and IPv6 if supported
true -> [inet, inet6 | BaseOpts];
_ -> BaseOpts
end;
{_, _, _, _} -> % IPv4
[inet, {ip, Ip} | BaseOpts];
{_, _, _, _, _, _, _, _} -> % IPv6
[inet6, {ip, Ip} | BaseOpts]
end,
listen(Port, Opts, State).
[/erlang]
这里要注意最关键的一句那就是process_flag(trap_exit, true)这也就说明这个gen_server充当了supervision的角色,它会监控所有的子进程,而对应的重启策略也就类似supervision的 simple_one_one,这个后续会分析.
然后就是listen函数了。这里的设计非常巧妙,因为在一般的server设计中,比如http,都是来一个请求,accept到句柄,然后spawn一个进程,将句柄传递给子进程,用完然后销毁,可是在mochiweb中,不是这么做的,因为这么做会有一定的性能损失,因为始终还是有进程的切换。
在mochiweb中,会创建一个accept pool,这个个数默认是16,不过可以通过传递的参数修改的。这个accept pool,就是说mochiweb会首先启动16个子进程,都同时阻塞在accept调用上,然后如果一个请求过来,某个子进程被唤醒,唤醒之后,子进程会发消息给父进程,然后子进程此时就会变成一个worker进程,那就是说它不会在accept句柄了,这就有点像一个退化的过程。然后父进程接收到消息之后,会重新再启动一个对应的accept进程。
来看源码
[erlang]
new_acceptor_pool(Listen,
State=#mochiweb_socket_server{acceptor_pool=Pool,
acceptor_pool_size=Size,
loop=Loop}) ->
F = fun (_, S) ->
Pid = mochiweb_acceptor:start_link(self(), Listen, Loop),
sets:add_element(Pid, S)
end,
Pool1 = lists:foldl(F, Pool, lists:seq(1, Size)),
State#mochiweb_socket_server{acceptor_pool=Pool1}.
listen(Port, Opts, State=#mochiweb_socket_server{ssl=Ssl, ssl_opts=SslOpts}) ->
%%调用listen,如果不是https,则会调用gen_tcp:listen创建listen句柄
case mochiweb_socket:listen(Ssl, Port, Opts, SslOpts) of
{ok, Listen} ->
{ok, ListenPort} = mochiweb_socket:port(Listen),
{ok, new_acceptor_pool(
Listen,
State#mochiweb_socket_server{listen=Listen,
port=ListenPort})};
{error, Reason} ->
{stop, Reason}
end.
[/erlang]
上面的代码中,最核心的就是new_acceptor_pool这个函数,这个函数调用lists:foldl来循环调用mochiweb_acceptor:start_link启动accept子进程,然后将所有启动的子进程保存到acceptor_pool中。
然后就来看mochiweb_acceptor的start_link函数,这个函数很简单,就是启动一个子进程,然后链接到mochiweb_socket_server父进程。这里只需要注意一个地方,那就是accept到句柄之后,就直接给父进程发送了一个accepted消息。
[erlang]
start_link(Server, Listen, Loop) ->
proc_lib:spawn_link(?MODULE, init, [Server, Listen, Loop]).
init(Server, Listen, Loop) ->
T1 = now(),
case catch mochiweb_socket:accept(Listen) of
{ok, Socket} ->
%%发送消息
gen_server:cast(Server, {accepted, self(), timer:now_diff(now(), T1)}),
%%调用loop回调
call_loop(Loop, Socket);
{error, closed} ->
exit(normal);
{error, timeout} ->
init(Server, Listen, Loop);
{error, esslaccept} ->
exit(normal);
Other ->
error_logger:error_report(
[{application, mochiweb},
“Accept failed error”,
lists:flatten(io_lib:format(“~p”, [Other]))]),
exit({error, accept_failed})
end.
call_loop(Loop, Socket) ->
Loop(Socket).
[/erlang]
最后我们就来看父进程接收到accepted消息后,如何处理,这里还有一个多的要注意的,那就是整个server状态中,还保存了所有activesocket的个数,也就是已经accept的socket个数。
[erlang]
handle_cast({accepted, Pid, Timing},
State=#mochiweb_socket_server{active_sockets=ActiveSockets}) ->
%%active socket个数更新
State1 = State#mochiweb_socket_server{active_sockets=1 + ActiveSockets},
case State#mochiweb_socket_server.profile_fun of
undefined ->
undefined;
F when is_function(F) ->
catch F([{timing, Timing} | state_to_proplist(State1)])
end,
%%调用recyle_acceptor来回收子进程
{noreply, recycle_acceptor(Pid, State1)};
[/erlang]
然后就是recycle_acceptor函数,它用来重新启动新的acceptor进程。
[erlang]
recycle_acceptor(Pid, State=#mochiweb_socket_server{
acceptor_pool=Pool,
listen=Listen,
loop=Loop,
active_sockets=ActiveSockets}) ->
%%判断进程id是否是刚才启动的进程id。
case sets:is_element(Pid, Pool) of
true ->
%%重新启动一个acceptor进程
Acceptor = mochiweb_acceptor:start_link(self(), Listen, Loop),
%%更新状态
Pool1 = sets:add_element(Acceptor, sets:del_element(Pid, Pool)),
State#mochiweb_socket_server{acceptor_pool=Pool1};
false ->
State#mochiweb_socket_server{active_sockets=ActiveSockets – 1}
end.
[/erlang]
此时如果当子进程异常退出,那么要怎么办呢,这里也就是如何处理EXIT消息.这里主要是区分正常退出和异常退出,异常退出的话,需要打印日志,而正常的话,什么都不许要打印。剩余的操作都一致,那就是调用recycle_acceptor来看是否需要更新状态。
[erlang]
handle_info({‘EXIT’, Pid, normal}, State) ->
{noreply, recycle_acceptor(Pid, State)};
handle_info({‘EXIT’, Pid, Reason},
State=#mochiweb_socket_server{acceptor_pool=Pool}) ->
case sets:is_element(Pid, Pool) of
true ->
%% If there was an unexpected error accepting, log and sleep.
error_logger:error_report({?MODULE, ?LINE,
{acceptor_error, Reason}}),
timer:sleep(100);
false ->
ok
end,
{noreply, recycle_acceptor(Pid, State)};
[/erlang]
这次就分析到这里,后续的会分析mochiweb的处理请求部分。