这次主要来分析upstream中的发送数据给client, 以及当buf不足,将一部分写到temp file的部分,他们对应的函数分别是ngx_event_pipe_write_to_downstream和ngx_event_pipe_write_chain_to_temp_file.
首先来看这个函数的第一部分的代码,这部分代码主要是遍历p->in,然后计算能写多少buf到文件(temp file的size是有限制的).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 //out就是将要保存到file的数据 if (p->buf_to_file) { //cache打开的情况 fl.buf = p->buf_to_file; fl.next = p->in; out = &fl; } else { //得到数据 out = p->in; } //如果cache没有打开 if (!p->cacheable) { size = 0; cl = out; ll = NULL; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe offset: %O", p->temp_file->offset); //开始遍历out do { //计算大小 bsize = cl->buf->last – cl->buf->pos; …………………………………………….. //看是否超过限制限制 if ((size + bsize > p->temp_file_write_size) || (p->temp_file->offset + size + bsize > p->max_temp_file_size)) { break; } size += bsize; ll = &cl->next; cl = cl->next; } while (cl); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size); if (ll == NULL) { return NGX_BUSY; } //cl存在则说明只有一部分buf能够写入到temp file,此时p->in保存剩下的chain if (cl) { p->in = cl; *ll = NULL; } else { //否则说明所有的buf都写入到了temp file,此时p->in则设置为空 p->in = NULL; p->last_in = &p->in; } } else { //cache打开的情况,可以看到和上面类似. p->in = NULL; p->last_in = &p->in; }
然后是第二部分,也就是最后一部分,这部分就是写buf到temp file,然后将已经写到temp file的buf,挂载到free_raw_bufs,以供继续使用。这里有用到shadow buf,因为shadow buf的内存是已经分配好的,而当已经写入到temp file之后,这部分buf自然就可以重新使用了。
还有一个很重要的操作,那就是将已经写入到temp file的buf 挂载到p->out上.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 //写out到tempfile if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) { return NGX_ABORT; } //遍历free_raw_bufs,以便与接下来将使用过的buf挂载到后面。 for (last_free = &p->free_raw_bufs; *last_free != NULL; last_free = &(*last_free)->next) { /\* void \*/ } if (p->buf_to_file) { p->temp_file->offset = p->buf_to_file->last – p->buf_to_file->pos; p->buf_to_file = NULL; out = out->next; } //遍历out for (cl = out; cl; cl = next) { next = cl->next; cl->next = NULL; b = cl->buf; //可以看到重新设置buf为file。然后设置相关属性 b->file = &p->temp_file->file; b->file_pos = p->temp_file->offset; p->temp_file->offset += b->last – b->pos; b->file_last = p->temp_file->offset; b->in_file = 1; b->temp_file = 1; //这里就是将buf放入到p->out. if (p->out) { *p->last_out = cl; } else { p->out = cl; } //设置last_out p->last_out = &cl->next; //shadow存在,则将已经保存到file的buf挂载到free_raw_buf中。 if (b->last_shadow) { tl = ngx_alloc_chain_link(p->pool); if (tl == NULL) { return NGX_ABORT; } //可以看到使用它的shadow tl->buf = b->shadow; tl->next = NULL; //last_free就是free_raw_buf的尾部 *last_free = tl; last_free = &tl->next; //reset buf b->shadow->pos = b->shadow->start; b->shadow->last = b->shadow->start; //remove掉shadow。 ngx_event_pipe_remove_shadow_links(b->shadow); } }
然后来看ngx_event_pipe_write_to_downstream,也就是发送数据到client的部分,这个函数整体是一个大循环, 而在这个大循环内分为三部分,其中第一部分处理upstream已经发送完毕(比如断开连接,出错等)时的情况,第二部分是对发送前的buf进行一些处理(比如busy buf,p->in,p->out等),然后循环发送,第三部分就是调用发送接口(output_filter)发送数据,然后update chain.
然后时buf的recycle属性,这个属性主要目的是这样子的,由于在p->input_filter中,nginx制造了一个buf充当已经读取的buf的shadow(last_shadow = 1),而在以后,nginx操作的都是这个buf,这个buf就被设置为recycled,也就是可以循环利用。而且当设置了recycled,则也将会在发送数据的时候,立即将buf发送出去,而不会缓存(可以看ngx_http_write_filter中的判断).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 //判断upstream状态 if (p->upstream_eof || p->upstream_error || p->upstream_done) { /\* pass the p->out and p->in chains to the output filter \*/ //取消掉recycled设置, for (cl = p->busy; cl; cl = cl->next) { cl->buf->recycled = 0; } //首先发送p->out. if (p->out) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream flush out"); //取消掉recycled设置,因为已经不要回收这个buf了(后端不会有数据过来)。 for (cl = p->out; cl; cl = cl->next) { cl->buf->recycled = 0; } //调用filter函数 rc = p->output_filter(p->output_ctx, p->out); //如果发送失败,则设置downstream_error,并且回收chain if (rc == NGX_ERROR) { p->downstream_error = 1; return ngx_event_pipe_drain_chains(p); } p->out = NULL; } //发送p->in if (p->in) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream flush in"); //取消recycled设置 for (cl = p->in; cl; cl = cl->next) { cl->buf->recycled = 0; } //发送数据 rc = p->output_filter(p->output_ctx, p->in); //同上 if (rc == NGX_ERROR) { p->downstream_error = 1; return ngx_event_pipe_drain_chains(p); } p->in = NULL; } //cache相关设置 if (p->cacheable && p->buf_to_file) { file.buf = p->buf_to_file; file.next = NULL; if (ngx_write_chain_to_temp_file(p->temp_file, &file) == NGX_ERROR) { return NGX_ABORT; } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream done"); /\* TODO: free unused bufs \*/ p->downstream_done = 1; break; }
它的步骤是这样子的,首先会计算busy chain的大小,因为我们有busy chain的限制(有busy buf的命令).然后计算p->in/p->out,最后得到一个最大的chain,然后发送。这里要注意,我们发送的每个buf大小是不会大于busy buf的大小的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 //判断是否需要退出循环,这里比较关键的就是downstream->write->ready,当发送返回again,就会从这里退出 if (downstream->data != p->output_ctx || !downstream->write->ready || downstream->write->delayed) { break; } prev = NULL; bsize = 0; //首先计算busy的chain的大小 for (cl = p->busy; cl; cl = cl->next) { if (cl->buf->recycled) { //如果是相同的chain,则跳过计算 if (prev == cl->buf->start) { continue; } //计算大小 bsize += cl->buf->end – cl->buf->start; prev = cl->buf->start; } } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write busy: %uz", bsize); out = NULL; //如果bsize大于我们设置的busy buf size,则直接发送数据 if (bsize >= (size_t) p->busy_size) { flush = 1; goto flush; } flush = 0; ll = NULL; prev_last_shadow = 1; //然后开始处理p->out和p->in for ( ;; ) { if (p->out) { cl = p->out; //计算将要发送的buf是否大于我们设置的busy_size,而cl->buf->last – cl->buf->pos是肯定小于等于busy_size. if (cl->buf->recycled && bsize + cl->buf->last – cl->buf->pos > p->busy_size) { //此时当前的cl就不能发送,所以设置flush,然后立即发送 flush = 1; break; } p->out = p->out->next; //将shadow buf 放到free_raw_bufs中,以便后面使用 ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf); } else if (!p->cacheable && p->in) { cl = p->in; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write buf ls:%d %p %z", cl->buf->last_shadow, cl->buf->pos, cl->buf->last – cl->buf->pos); //类似上面的,也是需要计算buf大小,这里可以看到我们只操作影子(shadow)buf, if (cl->buf->recycled && cl->buf->last_shadow && bsize + cl->buf->last – cl->buf->pos > p->busy_size) { if (!prev_last_shadow) { //设置out chain p->in = p->in->next; cl->next = NULL; if (out) { *ll = cl; } else { out = cl; } } flush = 1; break; } prev_last_shadow = cl->buf->last_shadow; p->in = p->in->next; } else { break; } //如果cl是recycled,则说明这个buf会被发送,因此bsize更新 if (cl->buf->recycled) { bsize += cl->buf->last – cl->buf->pos; } cl->next = NULL; //将对应的chain绑定到out上,接下来就会发送out。 if (out) { *ll = cl; } else { out = cl; } ll = &cl->next; }
然后是最后一部分,也就是发送chain,然后update chain的操作(类似chain output的操作)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 flush: …………………………………. //发送数据到client rc = p->output_filter(p->output_ctx, out); //错误的话,设置downstream_error if (rc == NGX_ERROR) { p->downstream_error = 1; return ngx_event_pipe_drain_chains(p); } //update chain,将已经完全发送的chain保存到free,还没发送的保存到busy. ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag); //遍历free chain, for (cl = p->free; cl; cl = cl->next) { if (cl->buf->temp_file) { if (p->cacheable || !p->cyclic_temp_file) { continue; } /\* reset p->temp_offset if all bufs had been sent \*/ if (cl->buf->file_last == p->temp_file->offset) { p->temp_file->offset = 0; } } /\* TODO: free buf if p->free_bufs && upstream done \*/ /\* add the free shadow raw buf to p->free_raw_bufs \*/ //将shadow的buf放到p->free_raw_buf中. if (cl->buf->last_shadow) { //可以看到这里操作的是cl->buf->shadow,也就是我们在p->input_filter中,拷贝的那个原始buf。 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) { return NGX_ABORT; } cl->buf->last_shadow = 0; } cl->buf->shadow = NULL; }
上面的分析,还遗漏了一个函数,那就是ngx_event_pipe_drain_chains,这个函数被调用,说明client出错,此时则需要将对应的shadow buf放到free_raw_buf中(调用(ngx_event_pipe_add_free_buf).
并且,这里只有cache没有打开的时候,才会finalize 当前的request,这是因为当cache打开的时候,当前的request 是需要被cache,然后下次请求再次到达,就可以发送cache的了。所以此时需要接受完后端所有的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 static void ngx_http_upstream_process_request(ngx_http_request_t *r) { ngx_uint_t del; ngx_temp_file_t *tf; ngx_event_pipe_t *p; ngx_http_upstream_t *u; u = r->upstream; p = u->pipe; //当finalize upstream之后,connection将会被赋值为NULL if (u->peer.connection) { if (u->store) { del = p->upstream_error; tf = u->pipe->temp_file; if (p->upstream_eof || p->upstream_done) { if (u->headers_in.status_n == NGX_HTTP_OK && (u->headers_in.content_length_n == -1 || (u->headers_in.content_length_n == tf->offset))) { ngx_http_upstream_store(r, u); } else { del = 1; } } if (del && tf->file.fd != NGX_INVALID_FILE) { if (ngx_delete_file(tf->file.name.data) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", u->pipe->temp_file->file.name.data); } } } //如果upstream端发送完毕(断开等),则finalize request。 if (p->upstream_done || p->upstream_eof || p->upstream_error) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream exit: %p", p->out); ngx_http_upstream_finalize_request(r, u, 0); return; } } //如果downstream error设置,则进入下面的处理 if (p->downstream_error) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream downstream error"); //这里可以看到如果cache没有打开,则finalize 当前的request if (!u->cacheable && !u->store && u->peer.connection) { ngx_http_upstream_finalize_request(r, u, 0); } } }