在nginx的模块中,分为3种类型,分别是handler,filter和upstream,其中upstream可以看做一种特殊的handler,它主要用来实现和后端另外的服务器(php/jboss等)进行通信,由于在nginx中全部都是使用非阻塞,并且是一个流式的处理,所以upstream的实现很复杂,接下来我会通过几篇blog来详细的分析下nginx中upstream的设计和实现。
upstream顾名思义,真正产生内容的地方在”上游”而不是nginx,也就是说nginx是位于client和后端的upstream之间的桥梁,在这种情况下,一个upstream需要做的事情主要有2个,第一个是当client发送http请求过来之后,需要创建一个到后端upstream的请求。第二个是当后端发送数据过来之后,需要将后端upstream的数据再次发送给client.接下来会看到,我们编写一个upstream模块,最主要也是这两个hook方法。
首先来看如果我们要写一个upstream模块的话,大体的步骤是什么,我们以memcached模块为例子,我们会看到如果我们自己编写upstream模块的话,只需要编写upstream需要的一些hook函数,然后挂载到upstream上就可以了。
首先来看它的初始化部分。这个函数是命令memcached_pass的handle,它主要做了两件事情,第一件是保存目的upstream(memcached_pass 的参数).第二个是设置core module的handler(这个handler会在update_location中设置给content_handle),也就是说一个upstream其实也就是一个处于content phase的handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| static char * ngx_http_memcached_pass(ngx_conf_t \*cf, ngx_command_t \*cmd, void *conf) { ……………………………….. value = cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t)); u.url = value[1]; u.no_resolve = 1; //根据url,取得对应的upstream mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0); if (mlcf->upstream.upstream == NULL) { return NGX_CONF_ERROR; }
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module); //设置handler clcf->handler = ngx_http_memcached_handler; …………………………………………….
|
然后来看ngx_http_memcached_handler,它主要是初始化upstream的相关回调,然后调用ngx_http_upstream_init设置对应的读写回调等等其他的操作。这里我们要主要看它设置的几个回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| static ngx_int_t ngx_http_memcached_handler(ngx_http_request_t *r) { ngx_int_t rc; ngx_http_upstream_t *u; ngx_http_memcached_ctx_t *ctx; ngx_http_memcached_loc_conf_t *mlcf;
if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_HEAD))) { return NGX_HTTP_NOT_ALLOWED; } //首先discard request body rc = ngx_http_discard_request_body(r);
if (rc != NGX_OK) { return rc; } //设置content type。 if (ngx_http_set_content_type(r) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } //创建一个upstream if (ngx_http_upstream_create(r) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; }
u = r->upstream; //设置schema ngx_str_set(&u->schema, "memcached://"); u->output.tag = (ngx_buf_tag_t) &ngx_http_memcached_module;
mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module); //设置config,可以看到它就是在memcached_pass中add的upstream u->conf = &mlcf->upstream; //开始设置回调,接下来会解释这几个回调的含义 u->create_request = ngx_http_memcached_create_request; u->reinit_request = ngx_http_memcached_reinit_request; u->process_header = ngx_http_memcached_process_header; u->abort_request = ngx_http_memcached_abort_request; u->finalize_request = ngx_http_memcached_finalize_request; //创建上下文 ctx = ngx_palloc(r->pool, sizeof(ngx_http_memcached_ctx_t)); if (ctx == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; }
ctx->rest = NGX_HTTP_MEMCACHED_END; ctx->request = r;
ngx_http_set_ctx(r, ctx, ngx_http_memcached_module); //设置另外的回调,这几个回调主要是针对非buffering的情况 u->input_filter_init = ngx_http_memcached_filter_init; u->input_filter = ngx_http_memcached_filter; u->input_filter_ctx = ctx;
r->main->count++; //进入upstream的处理,接下来就会详细分析这个函数 ngx_http_upstream_init(r);
return NGX_DONE; }
|
ok,接下来来看上面设置到的几个回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| //这个回调是创建到后端upstream的request时候被调用. ngx_int_t (\*create_request)(ngx_http_request_t \*r); //这个是重新初始化到后端upstream的请求,主要是重新初始化context ngx_int_t (\*reinit_request)(ngx_http_request_t \*r); //这个是当后端upstream发送数据到nginx,然后nginx解析这个数据的时候被调用 ngx_int_t (\*process_header)(ngx_http_request_t \*r); //这个回调暂时还没有nginx使用 void (\*abort_request)(ngx_http_request_t \*r); //request(和后端upstream)结束,需要释放资源 void (\*finalize_request)(ngx_http_request_t \*r, ngx_int_t rc); //用于upstream中的重定向 ngx_int_t (\*rewrite_redirect)(ngx_http_request_t \*r, ngx_table_elt_t *h, size_t prefix);
|
其实除了上面的几个回调,还有几个很重要的回调,那几个我们在后面会详细分析。
接下来我们就跳出memcached模块,进入upstream 模块的处理分析了,首先来看下面upstream初始化基本的流程图,下面这张图只到挂载完upstream的读写回调函数。
从上面的memcached的最后一部分,我们看到最终调用 ngx_http_upstream_init(r)进入upstream的处理,我们就从这个函数开始来分析upstream的初始化实现。
这个函数首先删除设置的定时器,然后如果是边缘触发的话,则挂载write的事件,这是因为走upstream,如果接下来我们connect不成功(NGX_EINPROGRESS),则不会进入ngx_http_finalize_request以挂载写hook(前面request的blog有介绍这部分),所以这里我们需要先挂载写事件.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| void ngx_http_upstream_init(ngx_http_request_t *r) { ngx_connection_t *c;
c = r->connection;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http init upstream, client timer: %d", c->read->timer_set); //删除定时器 if (c->read->timer_set) { ngx_del_timer(c->read); } //挂载写事件 if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
if (!c->write->active) { if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) == NGX_ERROR) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } } //进入upstream的初始化 ngx_http_upstream_init_request(r); }
|
接下来就来详细分析ngx_http_upstream_init_reques,这个函数比较长,我们一段段来看。
下面这段主要是创建将要发送给后端upstream的请求,这里可以看到调用的是create_request这个回调函数,这个函数是我们编写upstream函数时,挂载的hook之一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| u->store = (u->conf->store || u->conf->store_lengths);
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; }
if (r->request_body) { u->request_bufs = r->request_body->bufs; } //调用create_request来创建请求 if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }
|
接下来这段是初始化upstream的一些属性.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| u->peer.local = u->conf->local;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer;
|
然后接下来就是初始化upstream_states数组,这个数组主要是保存了当前的upstream的一些状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| //初始化upstream states if (r->upstream_states == NULL) {
r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }
} else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } //开始挂载清理回调函数 cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }
cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler;
|
然后就是这个函数最核心的处理部分,那就是根据upstream的类型来进行不同的操作,这里的upstream就是我们通过XXX_pass传递进来的值,这里的upstream有可能下面几种情况。
1 XXX_pass中不包含变量。
2 XXX_pass传递的值包含了一个变量($开始).这种情况也就是说upstream的url是动态变化的,因此需要每次都解析一遍.
而第二种情况又分为2种,一种是在进入upstream之前,也就是 upstream模块的handler之中已经被resolve的地址(请看ngx_http_XXX_eval函数),一种是没有被resolve,此时就需要upstream模块来进行resolve。
接下来的代码就是处理这部分的东西。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| if (u->resolved == NULL) { //这里的话,在XXX_pass命令被解析的时候,upstream已经被解析完毕了. uscf = u->conf->upstream;
} else { //如果地址已经被resolve过了,此时创建round robin peer if (u->resolved->sockaddr) {
if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //然后开始连接后端的upstream ngx_http_upstream_connect(r, u);
return; } //否则需要resolve host到ip地址 host = &u->resolved->host; …………………………………………………..
temp.name = *host;
ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ……………………………………..
ctx->name = *host; ctx->type = NGX_RESOLVE_A; //设置handler,最终在下面的ngx_resolve_name中会调用这个handler。 ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout;
u->resolved->ctx = ctx; //resolve hostname,然后会调用上面的handler if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }
return; }
|
然后我们来看ngx_http_upstream_resolve_handler,它做得事情和上面的u->resolved->sockaddr存在的分支类似,也是创建peer然后连接对端.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| static void ngx_http_upstream_resolve_handler(ngx_resolver_ctx_t *ctx) { ………………………………….. if (ngx_http_upstream_create_round_robin_peer(r, ur) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; }
ngx_resolve_name_done(ctx); ur->ctx = NULL;
ngx_http_upstream_connect(r, u); }
|
这部分就先分析到这里,接下来的一篇我会详细分析nginx connect后端upstream以及读写handler的初始化。