这次主要来看nginx如何处理一个http的流程,也就是接收请求,解析,然后接收完毕,然后开始发送数据,这一系列是如何流转起来的,通过上2篇,我们知道了nginx初始化完毕之后会休眠在epoll(或者kqueue等等).
下面就是nginx的事件处理流程图.
然后我们就来看ngx_event_accept函数,这个函数被调用是当listen 句柄有可读事件之后才被调用,它会accept到一个新的句柄,然后设置新的句柄的回调,然后再次返回。而这个新的句柄的回调将会进入nginx的整个http的处理机。
这个函数也是比较长的,我们来一段段的看。
首先下面这段就是accpet 句柄 ,这里我要得瑟一下,accept4 这个系统调用,是我给nginx发的patch,然后被igor吸收加到nginx的0.9 develop里面的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 socklen = NGX_SOCKADDRLEN; //开始accept句柄 #if (NGX_HAVE_ACCEPT4) s = accept4(lc->fd, (struct sockaddr *) sa, &socklen, SOCK_NONBLOCK); #else s = accept(lc->fd, (struct sockaddr *) sa, &socklen); #endif
接下来就是从连接池取得连接,然后创建连接里面包含的数据结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 c = ngx_get_connection(s, ev->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif //创建内存池 c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } //分配客户端地址 c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, sa, socklen); //分配log log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; }
接下来就是初始化从来连接池取出来的连接。主要是发送,接受回调,以及客户端地址等等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 *log = ls->log; //设置读取的回调,这里依赖于操作系统。以后会详细介绍nginx的event框架 c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; //设置client的ip地址 c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->unexpected_eof = 1; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /\* Solaris’s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 \*/ c->sendfile = 0; #endif } #endif //准备设置读写的结构 rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) { /\* rtsig, aio, iocp \*/ rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; #if (NGX_HAVE_KQUEUE) rev->available = 1; #endif } //设置log rev->log = log; wev->log = log;
最后一部分就是最关键的一部分,到达这里说明连接已经初始化完毕,句柄已经取到,按照一般的想法,这个时候就需要将连接加入到事件驱动器中,并且需要设置新的句柄的一些回调处理函数。可是nginx并没有这么做,它只是先设置回调函数,可是并不加事件到epoll中。
这里可以看到是调用ls->handler(c),而这个handler 前两篇blog 我们知道是被初始化为ngx_http_init_connection,也就是最终会调用这个函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 //如果不是epoll的话,就调用add_conn if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; //调用回调 ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available–; }
接下来就是ngx_http_init_connection函数了,这个函数主要是设置当前句柄的读handler,如果数据可读,则直接调用request handler,如果数据不可读,则设置定时器(超时定时器),并将这个句柄挂载到事件处理器上。
这里有一个需要注意的地方,那就是如果使用了ngx_use_accept_mutex锁的话,那么就不能够立即处理request,因为处理request是一个非常耗时的操作,而现在在锁里面,所以此时之需要将这个读事件挂载到ngx_posted_events队列,等退出锁之后再进行处理。
而一般来说默认都会使用mutex锁,因此此时就将rev加到post_events队列中,然后直接返回,那么可能就要问了,什么时候会把事件挂载到epoll中呢,这个我们接下来会分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 ngx_http_init_connection(ngx_connection_t *c) { ngx_event_t *rev; …………………………………………….. c->log_error = NGX_ERROR_INFO; rev = c->read; //设置读handler. rev->handler = ngx_http_init_request; c->write->handler = ngx_http_empty_handler; #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_reading, 1); #endif //如果接收准备好了,则直接调用ngx_http_init_request,如果 if (rev->ready) { /\* the deferred accept(), rtsig, aio, iocp \*/ //如果使用了mutex锁,则post 这个event,然后返回。 if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events); return; } ngx_http_init_request(rev); return; } //添加定时器 ngx_add_timer(rev, c->listening->post_accept_timeout); //将事件挂载到事件处理器 if (ngx_handle_read_event(rev, 0) != NGX_OK) { #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_reading, -1); #endif ngx_http_close_connection(c); return; } }
ok,然后我们来看如果使用了ngx_use_accept_mutex的话,是在那里进入ngx_http_init_request的处理的。
我们回到ngx_process_events_and_timers函数,当我们从ngx_http_init_connection返回时,我们来看nginx接下来会做什么事情。
到达下面的逻辑,此时mutex锁已经被让出,所以此时我们可以处理整个request。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 //如果post_event链表存在,则会进入链表的处理 if (ngx_posted_events) { if (ngx_threaded) { ngx_wakeup_worker_thread(cycle); } else { //处理post event。 ngx_event_process_posted(cycle, &ngx_posted_events); } }
然后就是ngx_event_process_poste函数,这个函数很简单,就是遍历event队列,然后调用event的handler函数,而我们还记得上面在ngx_http_init_connection中设置的handler就是ngx_http_init_request,这样子,我们就再一次进入request的处理,可是此时依旧没有挂载读事件到epoll。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_thread_volatile ngx_event_t **posted) { ngx_event_t *ev; for ( ;; ) { ev = (ngx_event_t \*) \*posted; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); if (ev == NULL) { return; } ngx_delete_posted_event(ev); //调用handler ev->handler(ev); } }
然后就是ngx_http_init_request,进入这个函数,说明客户端有请求过来了,此时我们就需要进入http的协议解析部分了,因此在这个函数主要就是初始化request结构,初始化完毕后进入解析处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static void ngx_http_init_request(ngx_event_t *rev) { …………………………………….. //设置handler回调 rev->handler = ngx_http_process_request_line; r->read_event_handler = ngx_http_block_reading; ……………………………………………… //进入解析request 部分。 rev->handler(rev); }
最后来看ngx_http_process_request_line,这个函数就是对request line进行解析,而解析部分在我以前的blog已经分析过了,这里就不详细分析了。这里要注意一个地方。 我们知道nginx 使用的是epoll的ET模式,而et模式的话,就需要能够判断这次读取的数据是否读完,这里nginx是这样判断的,那就是根据协议来判断,也就是协议驱动,由协议来判断是否有读取完毕。
这里要注意,如果我们使用了mutex锁,那么现在进入这个函数之后,我们会通过ngx_http_read_request_header中来挂载读事件到epoll。
来看代码片段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 if (rev->ready) { n = c->recv(c, r->header_in->last, r->header_in->end – r->header_in->last); } else { //第一次进来设置n为again n = NGX_AGAIN; } if (n == NGX_AGAIN) { if (!rev->timer_set) { cscf = ngx_http_get_module_srv_conf(r, ngx_http_core_module); ngx_add_timer(rev, cscf->client_header_timeout); } //然后挂载读事件. if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_close_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return NGX_ERROR; }
接下来就是ngx_http_process_request_line函数,可以看到它会调用ngx_http_read_request_header.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 for ( ;; ) { if (rc == NGX_AGAIN) { //读取request. n = ngx_http_read_request_header(r); if (n == NGX_AGAIN || n == NGX_ERROR) { return; } } //然后开始parse rc = ngx_http_parse_request_line(r, r->header_in); ………………………………. }
然后当parse结束后通过rc来判断解析的结果,如果是NGX_OK则说明header解析完毕,如果是NGX_AGAIN,则说明header只解析了一部分。我们这里主要来看NGX_OK的情况,就是当request line完全解析完毕时,nginx做什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (rc == NGX_OK) { …………………………………. c->log->action = "reading client request headers"; //设置并调用ngx_http_process_request_headers执行后续操作(解析header) rev->handler = ngx_http_process_request_headers; ngx_http_process_request_headers(rev); }
ngx_http_process_request_headers这个函数主要是解析http的request header,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 static void ngx_http_process_request_headers(ngx_event_t *rev) { ……………………………………………….. rc = NGX_AGAIN; for ( ;; ) { if (rc == NGX_AGAIN) { …………………………………… n = ngx_http_read_request_header(r); if (n == NGX_AGAIN || n == NGX_ERROR) { return; } } rc = ngx_http_parse_header_line(r, r->header_in, cscf->underscores_in_headers); …………………………………………… if (rc == NGX_OK) { //这个判断里面会设置request中的预制header(cookie, id_modify_since等等) ……………………………… } if (rc == NGX_HTTP_PARSE_HEADER_DONE) { //到达这里说明header解析完毕. r->request_length += r->header_in->pos – r->header_in->start; r->http_state = NGX_HTTP_PROCESS_REQUEST_STATE; //这里主要是进行一些头的校验 rc = ngx_http_process_request_header(r); if (rc != NGX_OK) { return; } //然后进入nginx的http处理(进入phase处理) ngx_http_process_request(r); return; } ……………………. }
ngx_http_process_request就不分析了,这个函数在以前分析sub request以及nginx的handler处理的时候都有介绍过。它会调用nginx的handler phase处理链表。
上面的处理结束之后,可以看到nginx并没有挂载可写事件,这是因为可写事件发生是只要内核的写缓冲区大于可写的最小水位就可以写,而http是短连接,因此不可写的情况是很少的,所以nginx会默认第一次就是可写的,于是nginx这里是只有当我们需要写的数据没有写完的情况才会挂载写事件.来看相关代码。
首先在connect结构中有一个buffered的结构,nginx就用这个域来控制是否需要加入可写事件。这个值的设置在ngx_http_write_filter中,它的设置就不详细分析了,我们之需要知道如果当数据没有发送完,这个就会被设置.
来看ngx_http_finalize_request关于write handler的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 //如果buffered有设置,则会挂载写事件. if (r->buffered || c->buffered || r->postponed || r->blocked) { if (ngx_http_set_write_handler(r) != NGX_OK) { ngx_http_terminate_request(r, 0); } return; }