前面我介绍过google对内核协议栈的patch,RPS,它主要是为了软中断的负载均衡,这次继续来介绍google 的对RPS的增强path RFS(receive flow steering),RPS是把软中断map到对应cpu,而这个时候还会有另外的性能影响,那就是如果应用程序所在的cpu和软中断处理的cpu不是同一个,此时对于cpu cache的影响会很大。 这里要注意,在kernel 的2.6.35中 这两个patch已经加入了。
ok,先来描述下它是怎么做的,其实这个补丁很简单,想对于rps来说就是添加了一个cpu的选择,也就是说我们需要根据应用程序的cpu来选择软中断需要被处理的cpu。这里做法是当调用recvmsg的时候,应用程序的cpu会被存储在一个hash table中,而索引是根据socket的rxhash进行计算的。而这个rxhash就是RPS中计算得出的那个skb的hash值.
可是这里会有一个问题,那就是当多个线程或者进程读取相同的socket的时候,此时就会导致cpu id不停的变化,从而导致大量的OOO的数据包(这是因为cpu id变化,导致下面软中断不停的切换到不同的cpu,此时就会导致大量的乱序的包).
而RFS是如何解决这个问题的呢,它做了两个表rps_sock_flow_table和rps_dev_flow_table,其中第一个rps_sock_flow_table是一个全局的hash表,这个表针对socket的,映射了socket对应的cpu,这里的cpu就是应用层期待软中断所在的cpu。
1 2 3 4 5 6 7 8 9 10 struct rps_sock_flow_table { unsigned int mask; //hash表 u16 ents[0]; };
可以看到它有两个域,其中第一个是掩码,用于来计算hash表的索引,而ents就是保存了对应socket的cpu。
然后是rps_dev_flow_table,这个是针对设备的,每个设备队列都含有一个rps_dev_flow_table(这个表主要是保存了上次处理相同链接上的skb所在的cpu),这个hash表中每一个元素包含了一个cpu id,一个tail queue的计数器,这个值是一个很关键的值,它主要就是用来解决上面大量OOO的数据包的问题的,它保存了当前的dev flow table需要处理的数据包的尾部计数。接下来我们会详细分析这个东西。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 struct netdev_rx_queue { struct rps_map *rps_map; //每个设备的队列保存了一个rps_dev_flow_table struct rps_dev_flow_table *rps_flow_table; struct kobject kobj; struct netdev_rx_queue *first; atomic_t count; } ____cacheline_aligned_in_smp; struct rps_dev_flow_table { unsigned int mask; struct rcu_head rcu; struct work_struct free_work; //hash表 struct rps_dev_flow flows[0]; }; struct rps_dev_flow { u16 cpu; u16 fill; //tail计数。 unsigned int last_qtail; };
首先我们知道,大量的OOO的数据包的引起是因为多个进程同时请求相同的socket,而此时会导致这个socket对应的cpu id不停的切换,然后软中断如果不做处理,只是简单的调度软中断到不同的cpu,就会导致顺序的数据包被分发到不同的cpu,由于是smp,因此会导致大量的OOO的数据包,而在RFS中是这样解决这个问题的,在soft_net中添加了2个域,input_queue_head和input_queue_tail,然后在设备队列中添加了rps_flow_table,而rps_flow_table中的元素rps_dev_flow包含有一个last_qtail,RFS就通过这3个域来控制乱序的数据包。
这里为什么需要3个值呢,这是因为每个cpu上的队列的个数input_queue_tail是一直增加的,而设备每一个队列中的flow table对应的skb则是有可能会被调度到另外的cpu,而dev flow table的last_qtail表示当前的flow table所需要处理的数据包队列(backlog queue)的尾部队列计数,也就是说当input_queue_head大于等于它的时候说明当前的flow table可以切换了,否则的话不能切换到进程期待的cpu。
不过这里还要注意就是最好能够绑定进程到指定的cpu(配合rps和rfs的参数设置),这样的话,rfs和rps的效率会更好,所以我认为像erlang这种在rfs和rps下性能应该提高非常大的.
下面就是softnet_data 的结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 struct softnet_data { struct Qdisc *output_queue; struct Qdisc **output_queue_tailp; struct list_head poll_list; struct sk_buff *completion_queue; struct sk_buff_head process_queue; /\* stats \*/ unsigned int processed; unsigned int time_squeeze; unsigned int cpu_collision; unsigned int received_rps; #ifdef CONFIG_RPS struct softnet_data *rps_ipi_list; /\* Elements below can be accessed between CPUs for RPS \*/ struct call_single_data csd ____cacheline_aligned_in_smp; struct softnet_data *rps_ipi_next; unsigned int cpu; //最关键的两个域 unsigned int input_queue_head; unsigned int input_queue_tail; #endif unsigned dropped; struct sk_buff_head input_pkt_queue; struct napi_struct backlog; };
接下来我们来看代码,来看内核是如何实现的,先来看inet_recvmsg,也就是调用rcvmsg时,内核会调用的函数,这个函数比较简单,就是多加了一行代码sock_rps_record_flow,这个函数主要是将本socket和cpu设置到rps_sock_flow_table这个hash表中。
首先要提一下,这里这两个flow table的初始化都是放在sys中初始化的,不过sys部分相关的代码我就不分析了,因为具体的逻辑和原理都是在协议栈部分实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 int inet_recvmsg(struct kiocb \*iocb, struct socket \*sock, struct msghdr *msg, size_t size, int flags) { struct sock *sk = sock->sk; int addr_len = 0; int err; //设置hash表 sock_rps_record_flow(sk); err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT, flags & ~MSG_DONTWAIT, &addr_len); if (err >= 0) msg->msg_namelen = addr_len; return err; }
然后就是rps_record_sock_flow,这个函数主要是得到全局的rps_sock_flow_table,然后调用rps_record_sock_flow来对rps_sock_flow_table进行设置,这里会将socket的sk_rxhash传递进去当作hash的索引,而这个sk_rxhash其实就是skb里面的rxhash,skb的rxhash就是rps中设置的hash值,这个值是根据四元组进行hash的。这里用这个当索引一个是为了相同的socket都能落入一个index。而且下面的软中断上下文也比较容易存取这个hash表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 struct rps_sock_flow_table *rps_sock_flow_table __read_mostly; static inline void sock_rps_record_flow(const struct sock *sk) { #ifdef CONFIG_RPS struct rps_sock_flow_table *sock_flow_table; rcu_read_lock(); sock_flow_table = rcu_dereference(rps_sock_flow_table); //设置hash表 rps_record_sock_flow(sock_flow_table, sk->sk_rxhash); rcu_read_unlock(); #endif }
其实所有的事情都是rps_record_sock_flow中做的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static inline void rps_record_sock_flow(struct rps_sock_flow_table *table, u32 hash) { if (table && hash) { //获取索引。 unsigned int cpu, index = hash & table->mask; /\* We only give a hint, preemption can change cpu under us \*/ //获取cpu cpu = raw_smp_processor_id(); //保存对应的cpu,如果等于当前cpu,则说明已经设置过了。 if (table->ents[index] != cpu) //否则设置cpu table->ents[index] = cpu; } }
上面是进程上下文做的事情,也就是设置对应的进程所期待的cpu,它用的是rps_sock_flow_table,而接下来就是软中断上下文了,rfs这个patch主要的工作都是在软中断上下文做的。不过看这里的代码之前最好能够了解下RPS补丁,因为RFS就是对rps做了一点小改动。
主要是两个函数,第一个是enqueue_to_backlog,这个函数我们知道是用来将skb挂在到对应cpu的input queue上的,这里我们就关注他的一个函数就是input_queue_tail_incr_save,他就是更新设备的input_queue_tail以及softnet_data的input_queue_tail。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 if (skb_queue_len(&sd->input_pkt_queue)) { enqueue: __skb_queue_tail(&sd->input_pkt_queue, skb); //这个函数更新对应设备的rps_dev_flow_table中的input_queue_tail以及dev flow table的last_qtail input_queue_tail_incr_save(sd, qtail); rps_unlock(sd); local_irq_restore(flags); return NET_RX_SUCCESS; }
第二个是get_rps_cpu,这个函数我们知道就是得到软中断应该运行的cpu,这里我们就看RFS添加的部分,这里它是这样计算的,首先会得到两个flow table,一个是sock_flow_table,另一个是设备的rps_flow_table(skb对应的设备队列中对应的flow table),这里的逻辑是这样子的取出来两个cpu,一个是根据rps计算数据包前一次被调度过的cpu(tcpu),一个是应用程序期望的cpu(next_cpu),然后比较这两个值,如果 1 tcpu未设置(等于RPS_NO_CPU) 2 tcpu是离线的 3 tcpu的input_queue_head大于rps_flow_table中的last_qtail 的话就调度这个skb到next_cpu.
而这里第三点input_queue_head大于rps_flow_table则说明在当前的dev flow table中的数据包已经发送完毕,否则的话为了避免乱序就还是继续使用tcpu.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 got_hash: flow_table = rcu_dereference(rxqueue->rps_flow_table); sock_flow_table = rcu_dereference(rps_sock_flow_table); if (flow_table && sock_flow_table) { u16 next_cpu; struct rps_dev_flow *rflow; //得到flow table rflow = &flow_table->flows[skb->rxhash & flow_table->mask]; tcpu = rflow->cpu; /得到next_cpu next_cpu = sock_flow_table->ents[skb->rxhash & sock_flow_table->mask]; //条件 if (unlikely(tcpu != next_cpu) && (tcpu == RPS_NO_CPU || !cpu_online(tcpu) || ((int)(per_cpu(softnet_data, tcpu).input_queue_head – rflow->last_qtail)) >= 0)) { //设置tcpu tcpu = rflow->cpu = next_cpu; if (tcpu != RPS_NO_CPU) //更新last_qtail rflow->last_qtail = per_cpu(softnet_data, tcpu).input_queue_head; } if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) { *rflowp = rflow; //设置返回cpu,以供软中断重新调度 cpu = tcpu; goto done; } } ………………………………
最后我们来分析下第一次数据包到达协议栈而应用程序还没有调用rcvmsg读取数据包,此时会发生什么问题,当第一次进来时tcpu是RPS_NO_CPU,并且next_cpu也是RPS_NO_CPU,此时会导致跳过rfs处理,而是直接使用rps的处理,也就是上面代码的紧接着的部分,下面这段代码前面rps时已经分析过了,这里就不分析了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 map = rcu_dereference(rxqueue->rps_map); if (map) { tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32]; if (cpu_online(tcpu)) { cpu = tcpu; goto done; } }