第11章 通用块层和scsi层
在内核中通用块层和scsi层的位置,上接文件系统的VFS层,下接硬盘驱动。通用块层的作用就是处理I/O的合并或者排序。而scsi层的作用主要是管理scsi设备、处理的设备的上线和离线、为设备加载合适的驱动等。scsi层同时是通用块层的下一层,I/O处理的时候也需要经过scsi层。通用块层和scsi层的一部分共同执行I/O的处理过程。
scsi层在内核中担当特殊的角色。即使某些不是scsi设备的磁盘(比如ATA格式的硬盘),在内核中也是通过scsi层来管理的。本章关注重点是I/O的处理过程,主要涉及通用块层和scsi层的一部分,scsi层对设备的管理功能不在本章讨论范围之内。
11.1 块设备队列
第9章分析了块设备的队列和队列处理函数。在Linux内核,读写操作是以一个个请求的方式出入块设备的队列。一个请求可以代表一个I/O,也可以代表多个I/O。如果一个I/O和其他I/O发生了合并,这两个I/O在一个请求里面。所有需要执行的请求,都要链接到块设备队列的链表。第9章还分析了块设备队列的电梯对象,电梯提供了块设备排序的算法和排序结构,本章将继续讨论。
11.1.1 scsi块设备队列处理函数
scsi作为一个广泛使用的框架,提供了一系列的块设备队列处理函数。这是在scsi_alloc_queue函数中实现的,如代码清单11-1所示。
代码清单11-1 scsi_alloc_queue函数
struct request_queue *scsi_alloc_queue(struct scsi_device *sdev)
{
struct Scsi_Host *shost = sdev->host;
struct request_queue *q;
q = blk_init_queue(scsi_request_fn, NULL);
if (!q)
return NULL;
blk_queue_prep_rq(q, scsi_prep_fn);
blk_queue_max_hw_segments(q, shost->sg_tablesize);
blk_queue_max_phys_segments(q, SCSI_MAX_PHYS_SEGMENTS);
blk_queue_max_sectors(q, shost->max_sectors);
blk_queue_bounce_limit(q, scsi_calculate_bounce_limit(shost));
blk_queue_segment_boundary(q, shost->dma_boundary);
blk_queue_issue_flush_fn(q, scsi_issue_flush_fn);
blk_queue_softirq_done(q, scsi_softirq_done);
if (!shost->use_clustering)
clear_bit(QUEUE_FLAG_CLUSTER, &q->queue_flags);
return q;
}scsi_alloc_queue函数为scsi块设备创建队列结构时调用。默认为scsi设备提供了出队列函数scsi_request_fn和软中断完成函数scsi_softirq_done。
11.1.2 电梯算法和对象
电梯算法在内核中已经被抽象为一个对象,只要实现一些基本的电梯函数,就可以提供一个电梯算法。电梯结构的定义如代码清单11-2所示。
代码清单11-2 elevator_noop算法
static struct elevator_type elevator_noop = {
.ops = {
.elevator_merge_req_fn = noop_merged_request,
.elevator_dispatch_fn = noop_dispatch,
.elevator_add_req_fn = noop_add_request,
.elevator_queue_empty_fn = noop_queue_empty,
.elevator_former_req_fn = noop_former_req,
.elevator_latter_req_fn = noop_latter_req,
.elevator_init_fn = noop_init_queue,
.elevator_exit_fn = noop_exit_queue,
},
.elevator_name = "noop",
.elevator_owner = THIS_MODULE,
};电梯结构elevator_type最重要的执行函数是elevator_add_req_fn和elevator_dispatch_fn,分别用来向电梯加入一个请求和从电梯获得一个请求。而elevator_merge_req_fn则实现I/O的合并。在下文将看到这些函数的使用方式。
电梯算法一般需要维护一个队列,这个队列是为了对请求排序或者执行I/O合并。
11.2 硬盘HBA抽象层
HBA(Host Bus Adapter,主机总线适配器)通常用来连接计算机内部总线和存储系统。用来接入硬盘的设备,如果是一个PCI设备,它既是一个PCI设备,同时支持SCSI硬盘或者ATA硬盘,它就是一个HBA设备。
HBA设备的驱动,既是PCI驱动,同时又要管理和控制硬盘,所以它也可算作是硬盘驱动。scsi层最终要把scsi命令发送到硬盘的驱动层。硬盘驱动层是内核软件的最后一层,当硬盘驱动把命令发送到硬件后,就脱离了软件控制的范围。
为便于管理硬盘驱动,内核抽象出一个scsi_host_template对象,所有的硬盘驱动都要以这种方式提供。将scsi_host_template结构的定义简化后,如代码清单11-3所示。
代码清单11-3 scsi_host_template函数
struct scsi_host_template {
const char *name;
int (* queuecommand)(struct scsi_cmnd *, void (*done)(struct scsi_cmnd *));
int (* eh_abort_handler)(struct scsi_cmnd *);
int (* eh_device_reset_handler)(struct scsi_cmnd *);
......
int (* eh_host_reset_handler)(struct scsi_cmnd *);
};对scsi_host_template结构的重要成员做如下解释。
- name:HBA卡的名字。
- queuecommand:I/O函数。通过这个函数将scsi命令发送到HBA卡,完成一次I/O。
- eh_abort_handler:撤销一个scsi命令。
- eh_device_reset_handler:reset某个硬盘设备。这个硬盘设备将离线,然后重新上线。
- eh_host_reset_handler:reset整个适配器芯片。执行这个调用,整个适配器芯片被重启,所有的硬盘离线,然后重新被扫描一次。
在scsi_host_template结构提供的调用函数中,异常处理占了很大一部分。对一个I/O来说,执行结果可以分为几种情况。
- I/O命令执行完成,而且I/O执行成功。
- I/O命令执行完成,但是I/O未成功,返回有错误。
- I/O超时未返回。
对于I/O返回有错误的情况,内核根据错误类型,选择再次执行该I/O命令或者交给scsi错误处理任务处理。对I/O超时未返回,则必须由scsi错误处理任务控制。
scsi错误处理任务通常首先abort(取消)出错的命令,如果不能奏效,尝试reset硬盘设备;如果仍然不能奏效,尝试reset总线;如果仍不能奏效,尝试reset整个芯片。由于各个厂家的HBA芯片实现各有不同,笔者发现很多情况下,这个错误处理流程会导致错误。最常见的错误就是CPU占有率100%和abort处理时异常,导致硬盘离线。所以实现用户定制的错误处理逻辑可能是一个正确的选择。
11.3 I/O的顺序控制
I/O的顺序是通用块层中一个比较重要的概念。很多应用要求I/O必须按照指定的顺序执行,为此内核使用了barrier I/O的概念来实现I/O的顺序。指的是barrier I/O之前的I/O必须执行完毕,然后再执行barrier I/O,而barrier I/O之后的I/O必须在barrier I/O执行完毕才能执行,就像I/O队列中插入了一个栅栏。
假设有1,2,3,4,5五个I/O,其中4是barrier I/O。这意味着,必须1,2,3执行完毕再执行4,在4执行完毕之前不能执行任何I/O。
需要指出的是,按照顺序下发1,2,3,4,5并不能保证I/O的完成顺序。这是因为硬盘本身有缓存(cache)和队列,并不是按照下发的顺序来执行I/O。
如何实现I/O顺序?
这就必须利用同步cache命令。即发现4是一个barrier I/O后,插一个同步cache命令,再下发4,然后再插一个同步cache命令。在4执行完毕之前,不能再发送新的I/O命令。有人可能有疑问,1,2,3的执行顺序能否保证?答案是不能,1,2,3既然没设置barrier标志,意味可以乱序执行。
复杂一点的情况是,如果硬件支持FUA标志,也就是说这个I/O会跳过硬盘的buffer,后面的一次同步cache命令就可以省略。更复杂的情况,如果硬件支持TAG队列功能,在执行保证顺序操作的过程中,仍然可以下发新的I/O。在后面的代码中,可以见到对I/O的这种处理方法。
11.4 I/O调度算法
一个I/O调度算法,关键就是实现elevator结构需要的几个函数,然后注册调度算法。用户可以通过proc文件系统选择I/O调度算法,内核将根据设定的I/O调度算法对I/O执行调度。
11.4.1 noop调度算法
noop调度算法是最简单的调度算法,如它的名字所言,基本什么都没做,等同于一个先进先出的调度算法,本身没对I/O进行真正的排序。I/O调度算法的入队列函数决定了I/O是如何插入电梯队列的,因此首先分析入队列函数。noop调度算法的入队列函数是noop_add_request,如代码清单11-4所示。
代码清单11-4 noop_add_request函数
static void noop_add_request(request_queue_t *q, struct request *rq)
{
struct noop_data *nd = q->elevator->elevator_data;
list_add_tail(&rq->queuelist, &nd->queue);
}noop调度算法的入队列函数极其简单,就是把请求加入noop的链表,没有执行任何排序工作。
I/O调度算法的出队列函数决定了I/O是如何被挑选,然后由硬盘驱动执行。noop调度算法的出队列函数是noop_dispatch,如代码清单11-5所示。
代码清单11-5 noop_dispatch函数
static int noop_dispatch(request_queue_t *q, int force)
{
struct noop_data *nd = q->elevator->elevator_data;
/*判断队列非空*/
if (!list_empty(&nd->queue)) {
struct request *rq;
/*取出I/O请求*/
rq = list_entry(nd->queue.next, struct request, queuelist);
list_del_init(&rq->queuelist);
/*I/O请求送到块设备的队列*/
elv_dispatch_sort(q, rq);
return 1;
}
return 0;
}noop出队列函数从队列头取出最先插入队列的一个I/O,然后将I/O送到块设备的队列。I/O从块设备队列到驱动执行的过程,是由scsi层控制的。
11.4.2 deadline调度算法
和noop调度算法相比,deadline调度算法更复杂,它内部启用了一个红黑树结构来对I/O进行排序,保证I/O出队列时,已经按照扇区地址排好顺序了。deadline调度算法代码在block目录的deadline-iosched文件中。在分析算法前,首先分析deadline调度算法定义的内部数据结构,如代码清单11-6所示。
代码清单11-6 deadline_data结构
struct deadline_data {
struct rb_root sort_list[2];
struct list_head fifo_list[2];
struct deadline_rq *next_drq[2];
struct hlist_head *hash; /* request hash */
unsigned int batching; /* number of sequential dispatches */
sector_t last_sector; /* head position */
unsigned int starved; /* times reads have starved */
int fifo_expire[2];
int fifo_batch;
int writes_starved;
int front_merges;
mempool_t *drq_pool;
}从deadline的数据结构可以发现,有两个队列,一个是红黑树sort_list,另一个是链表fifo_list。
一个I/O进入deadline队列的时候,要被插入红黑树队列和fifo两个队列,红黑树是按照扇区地址排序的队列,而fifo则按照I/O的先后顺序排序。每个队列都是两成员的数组,这是为了分别保存读请求和写请求,即读写请求分别保存在不同的队列中。
deadline调度算法源文件只有几百行,重点分析插入队列的过程和出队列的过程,就可以基本明白deadline算法的设计思路。首先从入队列函数开始分析,deadline算法的入队列函数是deadline_add_request,如代码清单11-7所示。
代码清单11-7 deadline_add_request函数
static void
deadline_add_request(struct request_queue *q, struct request *rq)
{
struct deadline_data *dd = q->elevator->elevator_data;
struct deadline_rq *drq = RQ_DATA(rq);
const int data_dir = rq_data_dir(drq->request);
/*deadline请求加入红黑树队列*/
deadline_add_drq_rb(dd, drq);
/*设置deadline请求的超时时间,然后加入到fifo队列*/
drq->expires = jiffies + dd->fifo_expire[data_dir];
list_add_tail(&drq->fifo, &dd->fifo_list[data_dir]);
/*如果请求可以合并,则还要加入hash链表*/
if (rq_mergeable(rq))
deadline_add_drq_hash(dd, drq);
}deadline_add_request函数首先把请求加入红黑树队列。deadline算法的红黑树根据扇区地址的顺序排序,因此插入过程是以I/O请求的扇区地址作为key。其次要设置请求的超时时间,然后加入先进先出的fifo队列。设置超时时间的目的是防止I/O在队列中时间过长,影响业务的使用。
deadline调度算法I/O出队列的函数是deadline_dispatch_requests,如代码清单11-8所示。
代码清单11-8 deadline_dispatch_requests函数
static int deadline_dispatch_requests(request_queue_t *q, int force)
{
struct deadline_data *dd = q->elevator->elevator_data;
const int reads = !list_empty(&dd->fifo_list[READ]);
const int writes = !list_empty(&dd->fifo_list[WRITE]);
struct deadline_rq *drq;
int data_dir;
if (dd->next_drq[WRITE])
drq = dd->next_drq[WRITE];
else
drq = dd->next_drq[READ];
if (drq) {
/* we have a "next request" */
/*I/O批处理*/
if (dd->last_sector != drq->request->sector)
/* end the batch on a non sequential request */
dd->batching += dd->fifo_batch;
if (dd->batching < dd->fifo_batch)
/* we are still entitled to batch */
goto dispatch_request;
}deadline算法首先根据一系列设定的条件选择要处理的I/O。
第一部分判断是否连续的批模式。如果当前I/O和前面I/O的扇区地址是连续的而非随机的,满足批模式的条件,则直接进入dispatch_request优先处理。deadline还设置了一个fifo_batch数值,批模式优先处理的I/O个数不能超过这个数值。
if (reads) {
BUG_ON(RB_EMPTY_ROOT(&dd->sort_list[READ]));
/*存在读写请求,而写请求已经超过设定的饿死时间的情况下,优先处理写*/
if (writes && (dd->starved++ >= dd->writes_starved))
goto dispatch_writes;
/*如果不存在写超过饿死时间,则处理读请求*/
data_dir = READ;
goto dispatch_find_request;
}
if (writes)dispatch_find_request:
if (deadline_check_fifo(dd, data_dir)) {
/*如果有读请求超时,则优先处理*/
/* An expired request exists - satisfy it */
dd->batching = 0;
drq = list_entry_fifo(dd->fifo_list[data_dir].next);
} else if (dd->next_drq[data_dir]) {
/*从请求方向相同(都是读或者都是写)的队列挑选下一个请求*/
drq = dd->next_drq[data_dir];
} else {
/*从红黑树队列选一个请求*/
dd->batching = 0;
drq = deadline_find_first_drq(dd, data_dir);
}
dispatch_request:
dd->batching++;
deadline_move_request(dd, drq);
return 1;deadline算法第三部分要挑选一个出队列的I/O。首先的条件是检查读请求是否超时。读请求入队列的时候设置了一个超时时间,超过这个时间必须马上处理。如果没超时的读请求,则从同方向的请求队列(都是读或者都是写)选择下一个请求。
最后,如果以上条件都不满足,说明同一个方向已经没有下一个请求,或者一个电梯已经完成(完成从低到高的一个循环),需要重新开始一轮循环,因此从红黑树队列中重新挑选一个新的I/O。挑选I/O的函数是deadline_find_first_drq,如代码清单11-9所示。
代码清单11-9 deadline_find_first_drq函数
static struct deadline_rq *
deadline_find_first_drq(struct deadline_data *dd, int data_dir)
{
struct rb_node *n = dd->sort_list[data_dir].rb_node;
for (;;) {
if (n->rb_left == NULL)
return rb_entry_drq(n);
n = n->rb_left;
}
}deadline算法重新挑选一个I/O请求的条件,就是根据I/O的方向(读或者写)从相应的红黑树队列中取出最左边的一个I/O。因为硬盘顺序是按照从低到高的扇区地址排序的,选出最左边的I/O,其实就是选择扇区最小的I/O,后续I/O的扇区地址都大于这个I/O,从而开启一轮新的循环。
11.5 I/O的处理过程
11.5.1 I/O插入队列的过程分析
根据前文第10章的分析,文件系统提交I/O都要通过 submit_bio 来提交一个I/O。
1. submit_bio 函数
submit_bio 函数是文件系统VFS层和通用块层的衔接点,本节从这个函数开始分析,如代码清单11-10所示。
/* 代码清单11-10 submit_bio函数 */
void submit_bio(int rw, struct bio *bio)
{
/*计算扇区数目*/
int count = bio_sectors(bio);
bio->bi_rw |= rw;
if (rw & WRITE)
count_vm_events(PGPGOUT, count);
else
count_vm_events(PGPGIN, count);
/*dump io,用来调试*/
if (unlikely(block_dump)) {
....../*省略部分代码*/
generic_make_request(bio);
}2. generic_make_request 函数
submit_bio 函数调用 generic_make_request 向通用块层提交一个请求,输入的参数是一个bio结构,generic_make_request 函数要把bio转换为底层处理的请求结构,分两部分介绍。
/* 代码清单11-11 generic_make_request(ll_rw_blk.c) */
void generic_make_request(struct bio *bio)
{
request_queue_t *q;
sector_t maxsector;
int ret, nr_sectors = bio_sectors(bio);
dev_t old_dev;
/*在执行较长时间的任务之前,执行一次调度*/
might_sleep();
/* Test device or partition size, when known. */
/*检查是否超过了磁盘的扇区限制*/
maxsector = bio->bi_bdev->bd_inode->i_size >> 9;
if (maxsector) {
sector_t sector = bio->bi_sector;
if (maxsector < nr_sectors || maxsector - nr_sectors < s
handle_bad_sector(bio);
goto end_io;
}
}generic_make_request 函数第一部分检查最大扇区限制。执行I/O的起始扇区地址加I/O大小的结果不应该超过块设备的物理扇区地址,否则就要结束本次I/O,返回错误。
generic_make_request 函数第二部分检查I/O的大小不应该超过块设备的最大可处理扇区。后者是块设备本身的特性,它限制了块设备一次I/O所能处理的最大扇区数目。
maxsector = -1;
old_dev = 0;
do {
char b[BDEVNAME_SIZE];
/*获得块设备的队列*/
q = bdev_get_queue(bio->bi_bdev);
if (!q) {
end_io:
bio_endio(bio, bio->bi_size, -EIO);
break;
}
/* I/O超过了设备的物理最大允许扇区*/
if (unlikely(bio_sectors(bio) > q->max_hw_sectors)) {
goto end_io;
}
if (unlikely(test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))
goto end_io;
/*如果磁盘有分区的处理*/
blk_partition_remap(bio);
....../*省略I/O追踪的代码*/
maxsector = bio->bi_sector;
old_dev = bio->bi_bdev->bd_dev;
ret = q->make_request_fn(q, bio);
} while (ret);
}NOTE
如果I/O所在的块设备是个分区块设备,必须找到分区的结构
hd_struct,I/O的起始扇区地址要加上分区的起始地址,才是真正的物理地址。也要将分区块设备替换为块设备所在物理硬盘的主块设备。
3. __make_request 函数
最后,调用队列提供的 make_request_fn 函数。前文分析过,scsi提供的队列 make_request_fn 就是 __make_request 函数,如代码清单11-12所示。
/* 代码清单11-12 __make_request(ll_rw_blk.c) */
static int __make_request(request_queue_t *q, struct bio *bio)
{
....../*省略部分代码*/
sector = bio->bi_sector;
nr_sectors = bio_sectors(bio);
cur_nr_sectors = bio_cur_sectors(bio);
prio = bio_prio(bio);
rw = bio_data_dir(bio);
sync = bio_sync(bio);__make_request 函数是通用块层的主处理流程。它的作用就是判断I/O能否合并,如果可以合并,则不申请新请求,而是合入前面的请求;如果不能合并,就申请新请求结构,并且插入电梯的队列,分五部分介绍。
第一部分从 bio 结构获得I/O的起始扇区地址和以扇区度量的I/O大小。变量 sync 标志当前I/O是否需要同步处理。带有 sync 标志的I/O和普通I/O的处理方式有所不同。
WARNING
这里的
sync标志和打开文件时候设置的O_SYNC标志不是一回事,两者的处理逻辑也不相同。
__make_request 函数第二部分首先检查是否需要bounce。
blk_queue_bounce(q, &bio);
spin_lock_prefetch(q->queue_lock);
/*检查是否barrier I/O,后面要处理*/
barrier = bio_barrier(bio);
if (unlikely(barrier) && (q->next_ordered == QUEUE_ORDERED_N
err = -EOPNOTSUPP;
goto end_io;
}
spin_lock_irq(q->queue_lock);
/*如果是barrier请求,或者电梯空,不再判断是否需要合并I/O*/
if (unlikely(barrier) || elv_queue_empty(q))
goto get_rq;TIP
需要bounce的原因是:有些老设备支持的DMA区间不能覆盖内存空间,而bio结构成员
bio_vec里面提供的内存可能不在设备能访问的内存范围之内,如果这种情况发生,就要另外申请低位内存,以低位内存作为DMA内存,等设备DMA操作完成后,再把数据复制到bio_vec提供的内存里。
其次检查I/O是否barrier I/O。对于barrier I/O,就直接进入 get_rq 分支,不再判断是否可以合并,因为barrier I/O的性质决定了它不能和之前的I/O进行合并。
__make_request 函数第三部分处理后向合并。
el_ret = elv_merge(q, &req, bio);
switch (el_ret) {
/*后向合并*/
case ELEVATOR_BACK_MERGE:
BUG_ON(!rq_mergeable(req));
if (!q->back_merge_fn(q, req, bio))
break;
blk_add_trace_bio(q, bio, BLK_TA_BACKMERGE);
/*合并后,要改bio的尾部为新的这个I/O,同时调整扇区数*/
req->biotail->bi_next = bio;
req->biotail = bio;
req->nr_sectors = req->hard_nr_sectors += nr_sectors;
req->ioprio = ioprio_best(req->ioprio, prio);
drive_stat_acct(req, nr_sectors, 0);
/*调用电梯提供的合并函数*/
if (!attempt_back_merge(q, req))
elv_merged_request(q, req);
goto out;后向合并指的是当前I/O可以合并到某个请求的尾部,这种情况要把请求的尾部bio改为新的bio,请求的扇区数要加上新bio的扇区数。然后调用电梯提供的合并函数执行I/O合并操作。
__make_request 函数第四部分处理前向合并。
/*前向合并*/
case ELEVATOR_FRONT_MERGE:
BUG_ON(!rq_mergeable(req));
if (!q->front_merge_fn(q, req, bio))
break;
blk_add_trace_bio(q, bio, BLK_TA_FRONTMERGE);
bio->bi_next = req->bio;
req->bio = bio;
req->buffer = bio_data(bio);
req->current_nr_sectors = cur_nr_sectors;
req->hard_cur_sectors = cur_nr_sectors;
req->sector = req->hard_sector = sector;
req->nr_sectors = req->hard_nr_sectors += nr_sectors;
req->ioprio = ioprio_best(req->ioprio, prio);
drive_stat_acct(req, nr_sectors, 0);
if (!attempt_front_merge(q, req))
elv_merged_request(q, req);
goto out;前向合并的处理方式和后向合并很相似,区别是前向合并要把新bio置于请求结构中bio链表的头部。然后调用电梯的合并函数执行I/O合并。
__make_request 函数第五部分申请请求。
get_rq:
req = get_request_wait(q, rw, bio);
init_request_from_bio(req, bio);
spin_lock_irq(q->queue_lock);
/*如果块设备队列和电梯队列都空的,需要阻塞块设备队列*/
if (elv_queue_empty(q))
blk_plug_device(q);
/*把请求加入请求队列*/
add_request(q, req);
out:
/*如果是同步I/O,立即解除队列阻塞,把I/O请求下发*/
if (sync)
__generic_unplug_device(q);
spin_unlock_irq(q->queue_lock);
return 0;如果前面部分的前向合并和后向合并都不能执行,就必须申请一个新的请求。__make_request 函数调用 get_request_wait 申请一个新的请求,而 init_request_from_bio 函数则根据bio的类型初始化请求和设置请求标志。通常的请求标志如下列所示。
- REQ_FAILFAST:一般I/O失败后要重试几次。
REQ_FAILFAST标志不重试,立即返回。 - REQ_HARDBARRIER 和 REQ_SOFTBARRIER:标志一个barrier请求。
- REQ_RW_SYNC:同步标志。有同步标志,则立即对电梯队列执行unplug操作。
NOTE
创建新请求后,要对队列执行plug和unplug操作。plug就是阻塞,一个阻塞的队列是不能下发I/O的,要下发I/O,必须执行unplug。plug队列的同时要启动一个定时器(默认3毫秒),在时间到达后unplug队列,开始下发I/O。plug队列和定时器的设置,说明I/O不是马上下发,需要等待后续的I/O。对带有
sync标志的I/O是个特例,要立即unplug。 plug和unplug针对的是块设备队列,操作对象并不一定是同一个I/O。plug时阻塞的I/O,不一定在unplug时被下发,可能下发别的I/O。
4. elv_merge 函数
在 __make_request 函数里面,判断两个I/O能否合并,使用了 elv_merge 函数,需要分析一下它的实现,函数代码如代码清单11-13所示。
/* 代码清单11-13 elv_merge函数 */
int elv_merge(request_queue_t *q, struct request **req, struct
{
elevator_t *e = q->elevator;
int ret;
/*判断块设备队列能否合并?*/
if (q->last_merge) {
ret = elv_try_merge(q->last_merge, bio);
if (ret != ELEVATOR_NO_MERGE) {
*req = q->last_merge;
return ret;
}
}
/*否则在电梯队列里面查找,看能否合并*/
if (e->ops->elevator_merge_fn)
return e->ops->elevator_merge_fn(q, req, bio);
return ELEVATOR_NO_MERGE;
}
static inline int elv_try_merge(struct request *__rq, struct b
{
int ret = ELEVATOR_NO_MERGE;
if (elv_rq_merge_ok(__rq, bio)) {
if (__rq->sector + __rq->nr_sectors == bio->bi_sector)
ret = ELEVATOR_BACK_MERGE;
else if (__rq->sector - bio_sectors(bio) == bio->bi_sect)
ret = ELEVATOR_FRONT_MERGE;
}
return ret;
}elv_merge 函数通过两步来判断I/O能否合并。
第一步判断能否和块设备队列的最后一个请求合并。判断是通过请求的扇区地址决定的,如果请求的最后扇区位置是10000,而新的I/O从10000开始,那就是后向合并。如果新的I/O从9995开始,而I/O长度是五个扇区,那就是前向合并。
第二步调用电梯队列提供的函数判断能否合并。I/O调度算法一般都要提供自己的合并函数。比如deadline调度算法提供的合并函数就要在电梯队列中寻找能和当前I/O合并的请求。
5. __elv_add_request 函数
对于不能合并的I/O,需要把一个请求插入到队列。这通过函数 add_request 实现。它又是通过调用 __elv_add_request 函数实现插入的功能,因此直接分析 __elv_add_request 函数,它的代码如代码清单11-14所示。
/* 代码清单11-14 __elv_add_request函数 */
void __elv_add_request(request_queue_t *q, struct request *rq,
int where, int plug)
{
if (q->ordcolor)
rq->flags |= REQ_ORDERED_COLOR;
if (rq->flags & (REQ_SOFTBARRIER | REQ_HARDBARRIER)) {
if (blk_barrier_rq(rq))
q->ordcolor ^= 1;
if (where == ELEVATOR_INSERT_SORT)
where = ELEVATOR_INSERT_BACK;
/*如果是文件系统来的请求,则更新end_sector和边界请求*/
if (blk_fs_request(rq)) {
q->end_sector = rq_end_sector(rq);
q->boundary_rq = rq;
}
} else if (!(rq->flags & REQ_ELVPRIV) && where == ELEVATOR_I
where = ELEVATOR_INSERT_BACK;
/*阻塞块设备队列*/
if (plug)
blk_plug_device(q);
elv_insert(q, rq, where);
}I/O插入电梯队列时,要判断I/O是否barrier请求。barrier I/O要求保证顺序,因此barrier I/O之前的I/O请求必须完成,所以对于barrier请求,把默认插入方式改为从后方加入。
6. elv_insert 函数
函数最后调用了 elv_insert,这个函数要根据插入的位置执行,它的代码如代码清单11-15所示。
/* 代码清单11-15 elv_insert函数 */
void elv_insert(request_queue_t *q, struct request *rq, int wh
{
struct list_head *pos;
unsigned ordseq;
int unplug_it = 1;
blk_add_trace_rq(q, rq, BLK_TA_INSERT);
rq->q = q;
switch (where) {
case ELEVATOR_INSERT_FRONT:
/*对前向插入,标志加一个SOFTBARRIER,请求直接加入块设备队列的头*/
rq->flags |= REQ_SOFTBARRIER;
list_add(&rq->queuelist, &q->queue_head);
break;elv_insert 函数第一部分是处理前向插入。前向插入意味着新请求位置在所有队列I/O的最前面。这种情况请求不需要加入电梯队列,因为它不需要在电梯中等待,而是直接插入到块设备队列的最前面。
elv_insert 函数第二部分是处理后向插入。后向插入意味着新请求位置在所有队列I/O的最后面。
case ELEVATOR_INSERT_BACK:
/*后向插入,要排空队列中的I/O.然后把请求加入块设备队列的链表尾*/
rq->flags |= REQ_SOFTBARRIER;
elv_drain_elevator(q);
list_add_tail(&rq->queuelist, &q->queue```c
/*移走阻塞标志,调用request_fn开始I/O*/
blk_remove_plug(q);
q->request_fn(q);
break;现有电梯队列中的I/O怎么办?必须全部下发,所以首先要调用 elv_drain_elevator 函数排空电梯队列的所有I/O,排空之后所有的I/O进入块设备队列,然后把请求加到块设备队列的尾部,实现后向插入的要求.最后unplug队列,调用队列的 request_fn 函数从块设备队列挑选I/O执行.
elv_insert 函数第三部分是处理按顺序插入和重插.
case ELEVATOR_INSERT_SORT:
BUG_ON(!blk_fs_request(rq));
rq->flags |= REQ_SORTED;
q->nr_sorted++;
/*更新last_merge,即最后一个可merge的请求*/
if (q->last_merge == NULL && rq_mergeable(rq))
q->last_merge = rq;
q->elevator->ops->elevator_add_req_fn(q, rq);
break;
case ELEVATOR_INSERT_REQUEUE:
/*重插队列*/
rq->flags |= REQ_SOFTBARRIER;
/*如果队列无保证顺序操作,则插入队列头*/
if (q->ordseq == 0) {
list_add(&rq->queuelist, &q->queue_head);
break;
}
/*根据请求的性质,判断重插的位置*/
ordseq = blk_ordered_req_seq(rq);
list_for_each(pos, &q->queue_head) {
struct request *pos_rq = list_entry_rq(pos);
if (ordseq <= blk_ordered_req_seq(pos_rq))
break;
}
list_add_tail(&rq->queuelist, pos);
unplug_it = 0;
break;按顺序插入的情况比较简单,调用电梯队列提供的插入函数 elevator_add_req_fn 即可.
重插比较复杂,必须考虑I/O的顺序关系.重插入队列,是指I/O虽然返回,但是没有成功完成I/O,需要重新插入电梯队列的情况.如果此时队列中不必保证顺序,把I/O简单地插入块设备队列头部即可.否则,就必须考虑I/O的位置.如果需要保证顺序,说明有barrier I/O需要处理,回顾前面分析的背景知识,barrier I/O之前的I/O必须先完成,然后再完成barrier I/O,barrier I/O之后的I/O必须等barrier I/O完成之后再执行.ordseq 变量目的就是检查I/O的顺序,判断是barrier I/O之前的I/O,还是之后的I/O,或者是barrier I/O自身.根据这个顺序关系,逐个遍历整个队列,把I/O重新插入到正确的位置.
elv_insert 函数第四部分检查队列的限制.
/*如果请求达到限制,则unplug队列*/
if (unplug_it && blk_queue_plugged(q)) {
int nrq = q->rq.count[READ] + q->rq.count[WRITE]
- q->in_flight;
if (nrq >= q->unplug_thresh)
__generic_unplug_device(q);
}为了避免过多I/O请求在队列中堆积,队列设置了一个数值,当请求个数超过这个限制,就unplug队列,开始挑选I/O执行.
11.5.2 I/O 出队列的过程分析
什么条件下 I/O 从队列出来真正下发到硬盘?总结内核中的处理过程,列出以下几个条件.
- 第一个 I/O 启动了 3 毫秒的定时器,时间到了,会执行 unplug 函数,开始下发 I/O.
- 请求数目超过设定的限制(默认是 4),执行 unplug 函数,开始下发.
- 带有 sync 标志的 I/O,立即执行 unplug 函数,开始下发.
- barrier I/O,需要先清空电梯队列,然后执行 unplug 函数,开始下发.
- 当硬盘执行完毕一个 I/O,也要 unplug 队列,检查是否有 I/O 可以执行.
1. unplug 函数
I/O 的下发是通过 unplug 函数实现的,因此首先分析这个函数,它的代码如代码清单 11-16 所示.
代码清单 11-16 __generic_unplug_device
void __generic_unplug_device(request_queue_t *q)
{
if (unlikely(blk_queue_stopped(q)))
return;
if (!blk_remove_plug(q))
return;
q->request_fn(q);
}unplug 函数首先要检查队列是否设置了 stop 标志,设置这个标志将停止队列,不能进行 unplug 操作.其次清除队列的 plug 标志,然后调用队列的 request_fn 函数.
2. scsi_request_fn 函数
对于 scsi 设备而言,这个函数就是 scsi_request_fn,如代码清单 11-17 所示.
代码清单 11-17 scsi_request_fn 函数
static void scsi_request_fn(struct request_queue *q)
{
……/*省略部分代码*/
while (!blk_queue_plugged(q)) {
int rtn;
req = elv_next_request(q);
/*检查队列是否ready,这个队列是块设备本身的设备队列,非上文的块设备*/
if (!req || !scsi_dev_queue_ready(q, sdev))
break;
/*设备状态是否online*/
if (unlikely(!scsi_device_online(sdev))) {
scsi_kill_request(req, q);
continue;
}scsi_request_fn 函数主体是个循环,它要一直执行直到队列为空或者 HBA 卡不能够再接收 I/O 为止.函数第一部分调用 elv_next_request 从块设备队列获得一个请求.然后检查块设备队列是否 ready 以及 scsi 设备状态是否 online.
scsi_request_fn 函数第二部分仍然检查各种异常情况,参考代码中添加的注释.如果无异常情况,把 I/O 请求从电梯队列中删除.
if (!(blk_queue_tagged(q) && !blk_queue_start_tag(q, req)))
blkdev_dequeue_request(req);
sdev->device_busy++;
spin_unlock(q->queue_lock);
cmd = req->special;
/*检查命令是否为空*/
if (unlikely(cmd == NULL)) {
BUG();
}
spin_lock(shost->host_lock);
/*HBA卡的host队列是否ready*/
if (!scsi_host_queue_ready(q, shost, sdev))
goto not_ready;
if (sdev->single_lun) {
/*检查target的用户是否就是sdev本身*/
if (scsi_target(sdev)->starget_sdev_user &&
scsi_target(sdev)->starget_sdev_user != sdev)
goto not_ready;
scsi_target(sdev)->starget_sdev_user = sdev;
}
shost->host_busy++;
spin_unlock_irq(shost->host_lock);scsi_request_fn 函数第三部分,首先初始化保存 scsi 错误返回值的数据 buffer,然后调用 scsi_dispatch_cmd 将 I/O 下发到驱动.
scsi_init_cmd_errh(cmd);
rtn = scsi_dispatch_cmd(cmd);
spin_lock_irq(q->queue_lock);
if(rtn) {
/*出错,尝试重新plug队列*/
if(sdev->device_busy == 0)
blk_plug_device(q);
break;
}
}
goto out;scsi_request_fn 函数第四部分是两个处理错误分支.
not_ready:
spin_unlock_irq(shost->host_lock);
spin_lock_irq(q->queue_lock);
blk_requeue_request(q, req);
sdev->device_busy--;
if(sdev->device_busy == 0)
blk_plug_device(q);
out:
spin_unlock_irq(q->queue_lock);
put_device(&sdev->sdev_gendev);
spin_lock_irq(q->queue_lock);
}not_ready 分支说明设备还没有准备好,因此把 I/O 重新插入电梯的队列.而 out 分支则只减少引用计数,不对 I/O 进行操作.这是因为进入 out 分支有两种情况,要么队列已经空,I/O 全部下放到驱动,要么 I/O 还没有真正从电梯队列解除,这两种情况都不需要将 I/O 重新插入电梯队列.
3. elv_next_request 函数
在 scsi_request_fn 函数中,获得一个 I/O 请求使用的是 elv_next_request 函数.这个函数从块设备队列获得一个 I/O,如代码清单 11-18 所示.
代码清单 11-18 elv_next_request 函数
struct request *elv_next_request(request_queue_t *q)
{
struct request *rq;
int ret;
while ((rq = __elv_next_request(q)) != NULL) {
if (!(rq->flags & REQ_STARTED)) {
elevator_t *e = q->elevator;
if (blk_sorted_rq(rq) && e->ops->elevator_activate
e->ops->elevator_activate_req_fn(q, rq);
rq->flags |= REQ_STARTED;
blk_add_trace_rq(q, rq, BLK_TA_ISSUE);
}
/*设置结束扇区*/
if (!q->boundary_rq || q->boundary_rq == rq) {
q->end_sector = rq_end_sector(rq);
q->boundary_rq = NULL;
}elv_next_request 函数第一部分调用 __elv_next_request 从队列中获得一个 I/O 请求.如果该 I/O 未设置 REQ_STARTED 标志,说明设备驱动第一次见到这个请求,这种情况需要通知 I/O 调度算法.然后为该 I/O 设置 REQ_STARTED 标志.
如果队列的 boundary_rq 等于该 I/O,该 I/O 此时要离开队列,因此设置队列的 boundary_rq 为空并设置队列最终扇区地址为该 I/O 的最终扇区地址.如果队列的 boundary_rq 为空,执行同样的设置.
elv_next_request 函数第二部分调用队列的 prep_rq_fn 函数来预处理 I/O.根据预处理结果执行不同的处理.
if ((rq->flags & REQ_DONTPREP) || !q->prep_rq_fn)
break;
ret = q->prep_rq_fn(q, rq);
if (ret == BLKPREP_OK) {
/*ok,返回请求,准备下发*/
break;
} else if (ret == BLKPREP_DEFER) {
/*有错,保留请求在队列*/
rq = NULL;
break;
} else if (ret == BLKPREP_KILL) {
/*出错误了,结束这个请求*/
int nr_bytes = rq->hard_nr_sectors << 9;
if (!nr_bytes)
nr_bytes = rq->data_len;
blkdev_dequeue_request(rq);
rq->flags |= REQ_QUIET;
end_that_request_chunk(rq, 0, nr_bytes);
end_that_request_last(rq, 0);
} else {如果结果正确,则返回该 I/O,如果结果为 BLKPREP_DEFER,意味着部分正确,I/O 放在队列前面,但是不返回该 I/O,等待下次挑选 I/O 的动作.第一部分设置 REQ_STARTED 标志将阻止其他 I/O 越过该 I/O.如果结果为 BLKPREP_KILL,说明该 I/O 存在严重错误,需要从队列中删除并结束该 I/O.
真正从队列获取请求的函数是 __elv_next_request,它的代码如代码清单 11-19 所示.
代码清单 11-19 __elv_next_request
static inline struct request *__elv_next_request(request_queue_t *q)
{
struct request *rq;
while (1) {
while (!list_empty(&q->queue_head)) {
rq = list_entry_rq(q->queue_head.next);
if (blk_do_ordered(q, &rq))
return rq;
}
if (!q->elevator->ops->elevator_dispatch_fn(q, 0))
return NULL;
}
}如果块设备队列不空,__elv_next_request 函数直接从块设备队列获得一个 I/O 请求,如果队列为空,则调用 elevator_dispatch_fn 函数从电梯队列获得一个请求,然后加入块设备队列的尾部.因为函数主体是一个循环,下一轮循环时,就可以从块设备队列获得请求了.
4. blk_do_ordered 函数
从队列获取一个请求后,要调用 blk_do_ordered 函数检查 I/O 的顺序,它的功能是为了处理 barrier I/O.如代码清单 11-20 所示.
代码清单 11-20 blk_do_ordered 函数
int blk_do_ordered(request_queue_t *q, struct request **rqp)
{
struct request *rq = *rqp;
/*判断是否为barrier I/O*/
int is_barrier = blk_fs_request(rq) && blk_barrier_rq(rq);
if (!q->ordseq) {
/*非barrier请求,返回*/
if (!is_barrier)
return 1;
if (q->next_ordered != QUEUE_ORDERED_NONE) {
*rqp = start_ordered(q, rq);
return 1;
} else {
/*如果队列切换到ORDERED_NONE,清I/O,错误标志设置为功能不支持*/
blkdev_dequeue_request(rq);
end_that_request_first(rq, -EOPNOTSUPP, rq->hard_nr_sectors);
end_that_request_last(rq, -EOPNOTSUPP);
*rqp = NULL;
return 0;
}
}blk_do_ordered 函数第一部分检查之前在队列中是否设置了需要保证顺序的 I/O,如果没有且新挑选的 I/O 不是 barrier I/O,不需要保证顺序的处理,直接返回.如果该 I/O 是个 barrier I/O,调用 start_ordered 执行保证顺序的操作.如果队列切换为 ORDERED_NONE,说明队列不支持保证顺序的功能,直接清除 I/O 并返回.
blk_do_ordered 函数第二部分处理特殊情况,此时队列中已经存在需要保证顺序的 I/O.如果队列设置了 QUEUE_ORDERED_TAG 标志,说明硬件支持 TAG 队列,硬件可以对 I/O 执行顺序进行控制,只阻塞下一个 barrier I/O 就可以,不是 barrier I/O 的其他 I/O 可以放行,下发到驱动执行.
if (!blk_fs_request(rq) && rq != &q->pre_flush_rq &&
rq != &q->post_flush_rq)
return 1;
if (q->ordered & QUEUE_ORDERED_TAG) {
/* Ordered by tag. Blocking the next barrier is enough. */
if (is_barrier && rq != &q->bar_rq)
*rqp = NULL;
} else {
/* Ordered by draining. Wait for turn. */
WARN_ON(blk_ordered_req_seq(rq) < blk_ordered_cur_seq(q));
if (blk_ordered_req_seq(rq) > blk_ordered_cur_seq(q))
*rqp = NULL;
}
return 1;
}如果队列未设置 QUEUE_ORDERED_TAG 标志,需调用 blk_ordered_req_seq 检查 I/O 的顺序关系,判断 I/O 是否可以执行.如果 I/O 的顺序大于当前队列的处理顺序,说明 I/O 应该在后面处理,因此不应该被处理.I/O 顺序是 barrier I/O 必须要考虑的重要方面,当 I/O 没有成功完成,需要重新插入电梯队列的时候,也需要考虑 I/O 顺序,必须把 I/O 插入到正确的位置.
5. start_ordered 函数
start_ordered 函数根据要求设置同步 cache 命令,保证同步 cache 命令之前的 I/O 必须完成,它的代码如代码清单 11-21 所示.
代码清单 11-21 start_ordered 函数
static inline struct request *start_ordered(request_queue_t *q, struct request *rq)
{
q->bi_size = 0;
q->orderr = 0;
q->ordered = q->next_ordered;
/*标记队列的保序操作开始*/
q->ordseq |= QUEUE_ORDSEQ_STARTED;
blkdev_dequeue_request(rq);
q->orig_bar_rq = rq;
rq = &q->bar_rq;
rq_init(q, rq);
rq->flags = bio_data_dir(q->orig_bar_rq->bio);
/*设备是否支持FUA?*/
rq->flags |= q->ordered & QUEUE_ORDERED_FUA ? REQ_FUA : 0;
rq->elevator_private = NULL;
rq->rl = NULL;
init_request_from_bio(rq, q->orig_bar_rq->bio);
rq->end_io = bar_end_io;
}start_ordered 函数第一部分设置 I/O 请求的必要参数.由于这是一个 barrier I/O,所以将 I/O 保存在队列的 orig_bar_rq 成员后,使用了队列的 bar_rq 成员作为真正下发 I/O 的数据结构,并根据原来的 bio 结构初始化 bar_rq 成员的参数,对于 barrier I/O,它的 I/O 完成函数被设置为特殊提供的 bar_end_io 函数.
start_ordered 函数第二部分设置同步 cache 的命令.
/*是否需要后刷,如果需要,则增加一个同步cache的命令,插入块设备队列头*/
if (q->ordered & QUEUE_ORDERED_POSTFLUSH)
queue_flush(q, QUEUE_ORDERED_POSTFLUSH);
else
q->ordseq |= QUEUE_ORDSEQ_POSTFLUSH;
/*barrier I/O插入队列的头部*/
elv_insert(q, rq, ELEVATOR_INSERT_FRONT);
/*是否需要前刷?如果需要,增加一个同步cache命令,插入块设备队列头*/
if (q->ordered & QUEUE_ORDERED_PREFLUSH) {
queue_flush(q, QUEUE_ORDERED_PREFLUSH);
rq = &q->pre_flush_rq;
} else
q->ordseq |= QUEUE_ORDSEQ_PREFLUSH;
if ((q->ordered & QUEUE_ORDERED_TAG) || q->in_flight == 0)
q->ordseq |= QUEUE_ORDSEQ_DRAIN;
else
rq = NULL;
return rq;
}如果队列带有 QUEUE_ORDERED_POSTFLUSH 标志,这种情况在队列头部插入一个同步 cache 命令.然后插入 barrier I/O 自身,最后如果队列带有 QUEUE_ORDERED_PREFLUSH 标志,说明要把之前下发的 I/O 执行完毕,还要插入一个同步 cache 命令,这个命令的目的是强制之前的 I/O 执行完毕.因为三次插入都是插入队列的头部,所以先插入 POSTFLUSH 同步 cache 命令,再插入 barrier I/O 自身,最后插入 PREFLUSH 同步 cache 命令.执行时,实际是 PREFLUSH 同步 cache 命令最先执行.
6. scsi_dispatch_cmd 函数
从电梯取得 I/O 请求后,要初始化请求的参数,然后检查队列和设备是否准备好.如果一切正常,调用 scsi_dispatch_cmd 发送 I/O 请求,如代码清单 11-22 所示.
代码清单 11-22 scsi_dispatch_cmd 函数
cc
int scsi_dispatch_cmd(struct scsi_cmnd *cmd)
{
struct Scsi_Host *host = cmd→device→host;
unsigned long flags = 0;
unsigned long timeout;
int rtn = 0;
/* 检查设备是否可用 */
if (unlikely(cmd->device->sdev_state == SDEV_DEL)) {
/*如果设备不可用,则终结这个I/O命令*/
cmd->result = DID_NO_CONNECT << 16;
atomic_inc(&cmd->device->iorequest_cnt);
__scsi_done(cmd);
/* return 0 (because the command has been processed) */
goto out;
}
/*检查设备是否阻塞,如果阻塞,要重新把命令插入电梯队列*/
if (unlikely(cmd->device->sdev_state == SDEV_BLOCK)) {
scsi_queue_insert(cmd, SCSI_MLQUEUE_DEVICE_BUSY);
SCSI_LOG_MLQUEUE(3, printk("queuecommand : device block\n"));
goto out;
}
if (cmd->device->scsi_level <= SCSI_2 &&
cmd->device->scsi_level != SCSI_UNKNOWN) {
cmd->cmnd[1] = (cmd->cmnd[1] & 0x1f) | (cmd->device->lun << 5);
}
/*检查设备的reset时钟,避免设备未准备好的情况*/
timeout = host->last_reset + MIN_RESET_DELAY;
if (host->resetting && time_before(jiffies, timeout)) {
int ticks_remaining = timeout - jiffies;
while (--ticks_remaining >= 0)
mdelay(1 + 999 / HZ);
host->resetting = 0;
}
/*设置该scsi命令的超时时间*/
scsi_add_timer(cmd, cmd->timeout_per_command, scsi_times_out);
scsi_log_send(cmd);
atomic_inc(&cmd->device->iorequest_cnt);
/*检查命令是否超过hba支持的最大命令长度,超过结束命令*/
if (CDB_SIZE(cmd) > cmd->device->host->max_cmd_len) {
cmd->result = (DID_ABORT << 16);
scsi_done(cmd);
goto out;
}
spin_lock_irqsave(host->host_lock, flags);
scsi_cmd_get_serial(host, cmd);
if (unlikely(host->shost_state == SHOST_DEL)) {
cmd->result = (DID_NO_CONNECT << 16);
scsi_done(cmd);
} else {
/*调用HBA提供的命令函数*/
rtn = host->hostt->queuecommand(cmd, scsi_done);
}
spin_unlock_irqrestore(host->host_lock, flags);
if (rtn) {
/*命令执行中出错,则删掉命令的计时器,然后把命令再次插入队列*/
if (scsi_delete_timer(cmd)) {
atomic_inc(&cmd->device->iodone_cnt);
scsi_queue_insert(cmd,
(rtn == SCSI_MLQUEUE_DEVICE_BUSY) ?
rtn : SCSI_MLQUEUE_HOST_BUSY);
}
}
/* ... 省略后续部分 */
}
`scsi_dispatch_cmd` 函数提供了很多注释,主要是处理很多设备不可用的情况。如果设备可用,则调用 hba 卡提供的 `queuecommand` 把 scsi 命令提交给硬盘驱动,同时提供了一个回调函数 `scsi_done`。
异常处理有多种情况,如果设备不可用,要停止 I/O,返回错误。如果是执行 I/O 中错误,而且符合重试条件,要把 I/O 重新插入电梯队列,再次执行 I/O。
### 11.5.3 I/O 返回路径
`scsi_dispatch_cmd` 提交 scsi 命令时,要提供一个回调函数 `scsi_done`。这个回调是在硬盘驱动的中断中调用的。scsi 命令从中断中返回,意味着这个命令已经执行完毕,有可能是成功执行了 scsi 命令,也有可能出现错误。不管成功还是错误,内核都需要处理这两种情况。
`scsi_done` 是 I/O 返回路径上的第一个函数,因此就从这个函数开始分析过程,它的代码如代码清单 11-23 所示。
**代码清单 11-23 `scsi_done` 函数**
```c
static void scsi_done(struct scsi_cmnd *cmd)
{
if (!scsi_delete_timer(cmd))
return;
__scsi_done(cmd);
}
scsi_done 函数首先要停止 scsi 命令的计时器,然后调用 __scsi_done。__scsi_done 函数也比较简单,如代码清单 11-24 所示。
代码清单 11-24 __scsi_done 函数
void __scsi_done(struct scsi_cmnd *cmd)
{
struct request *rq = cmd->request;
cmd->serial_number = 0;
/*将命令的完成次数加1*/
atomic_inc(&cmd->device->iodone_cnt);
/*如果result不为0,错误计数加1*/
if (cmd->result)
atomic_inc(&cmd->device->ioerr_cnt);
rq->completion_data = cmd;
blk_complete_request(rq);
}中断返回函数的参数是一个 scsi 命令,__scsi_done 函数首先要从 scsi 命令结构中获得当初下发 I/O 时使用的请求对象,然后调用 blk_complete_request 函数来处理一个 I/O 的返回过程。
blk_complete_request 是中断上下文的最后一个函数,它要启动一个软中断继续处理 I/O,如代码清单 11-25 所示。
代码清单 11-25 blk_complete_request 函数
void blk_complete_request(struct request *req)
{
struct list_head *cpu_list;
unsigned long flags;
BUG_ON(!req->q->softirq_done_fn);
local_irq_save(flags);
cpu_list = &__get_cpu_var(blk_cpu_done);
/*将请求加入每cpu变量链表blk_cpu_done*/
list_add_tail(&req->donelist, cpu_list);
/*启动一个软中断*/
raise_softirq_irqoff(BLOCK_SOFTIRQ);
local_irq_restore(flags);
}截至目前,代码一直在硬盘驱动的中断上下文中执行,而中断上下文具有高优先级,会阻塞硬盘自身的中断,因此 blk_complete_request 函数启动块设备的软中断来处理中断的下半部。将中断处理代码划分为两部分,是内核的常见方式。网络设备的中断处理代码中,同样启动软中断来处理中断的下半部。
为了传递参数到软中断上下文,内核定义了一个 CPU 变量 blk_cpu_done,这是一个链表,返回 I/O 的请求结构要加入该链表。块设备软中断的处理函数是 blk_done_softirq。该函数要从 CPU 变量 blk_cpu_done 获得已经完成的 I/O 请求,然后调用队列的软中断处理函数 softirq_done_fn 处理。前文已经分析过,这个软中断处理函数是 scsi_softirq_done。它的代码如代码清单 11-26 所示。
代码清单 11-26 scsi_softirq_done 函数
static void scsi_softirq_done(struct request *rq)
{
struct scsi_cmnd *cmd = rq->completion_data;
unsigned long wait_for = (cmd->allowed + 1) * cmd->timeout_per;
int disposition;
/*初始化错误I/O链表*/
INIT_LIST_HEAD(&cmd->eh_entry);
/*判断I/O是否成功完成*/
disposition = scsi_decide_disposition(cmd);
if (disposition != SUCCESS &&
time_before(cmd->jiffies_at_alloc + wait_for, jiffies)) {
/*已经超时了,则设置命令完成,不再重复执行命令*/
disposition = SUCCESS;
}
scsi_log_completion(cmd, disposition);
switch (disposition) {
case SUCCESS:
scsi_finish_command(cmd);
break;
case NEEDS_RETRY:
scsi_retry_command(cmd);
break;
case ADD_TO_MLQUEUE:
scsi_queue_insert(cmd, SCSI_MLQUEUE_DEVICE_BUSY);
break;
default:
if (!scsi_eh_scmd_add(cmd, 0))
scsi_finish_command(cmd);
}
}软中断处理函数 scsi_softirq_done 根据命令执行的结果,分为四种情况来处理:
- 结束命令,交给上层处理(这种情况并不一定命令成功完成了,如果命令超时了,也要结束命令,交给上层处理);
- 重试命令;
- 命令重新插入电梯队列,重新排队;
- 由 scsi 错误处理线程来处理。
scsi_decide_disposition 函数值得仔细研究,这个函数总结了所有的 scsi 错误类型,对每种错误类型给出了处理措施。
如果 I/O 正常完成,调用 scsi_finish_command 函数返回上层,这个函数比较简单,它最终又调用了命令本身的 done 函数。done 函数是在初始化 scsi 命令时调用 sd_init_command 函数设置为 sd_rw_intr,同时还设置了 scsi 命令的最大重试次数和超时时间值。
1. sd_rw_intr 函数
sd_rw_intr 函数要对 scsi 命令的各种错误进行处理,如代码清单 11-27 所示。
代码清单 11-27 sd_rw_intr 函数
static void sd_rw_intr(struct scsi_cmnd * SCpnt)
{
int result = SCpnt->result;
unsigned int xfer_size = SCpnt->request_bufflen;
unsigned int good_bytes = result ? 0 : xfer_size;
u64 start_lba = SCpnt->request->sector;
u64 bad_lba;
struct scsi_sense_hdr sshdr;
int sense_valid = 0;
int sense_deferred = 0;
int info_valid;
if (result) {
/*设置scsi命令的sense_key和asc值,ascq值 */
sense_valid = scsi_command_normalize_sense(SCpnt, &sshdr);
if (sense_valid)
sense_deferred = scsi_sense_is_deferred(&sshdr);
}
if (driver_byte(result) != DRIVER_SENSE &&
(!sense_valid || sense_deferred))
goto out;
/*第一部分,如果scsi命令的返回结果result不为0,说明命令执行中产生了错误.scsi协议定义了可能的错误类型,通过三个参数共同说明错误类型,这三个参数是sense key、asc和acsq.参数的详细定义读者需要参考scsi协议文档,本书只对代码中处理的部分错误进行解释.*/
/*sd_rw_intr函数第二部分主要是检查scsi命令的sense key*/
switch (sshdr.sense_key) {
case HARDWARE_ERROR: /*硬件错误*/
case MEDIUM_ERROR: /*坏道*/
if (!blk_fs_request(SCpnt->request))
goto out;
info_valid = scsi_get_sense_info_fld(SCpnt->sense_buffer,
SCSI_SENSE_BUFFERSIZE,
&bad_lba);
if (!info_valid)
goto out;
if (xfer_size <= SCpnt->device->sector_size)
goto out;
switch (SCpnt->device->sector_size) {
case 256:
start_lba <<= 1;
break;
case 512:
break;
case 1024:
start_lba >>= 1;
break;
case 2048:
start_lba >>= 2;
break;
case 4096:
start_lba >>= 3;
break;
default:
/* Print something here with limiting frequency. */
goto out;
break;
}
good_bytes = (bad_lba - start_lba) * SCpnt->device->sector_size;
break;
case RECOVERED_ERROR: /*可修复的错误*/
case NO_SENSE: /*no sense,成功执行*/
SCpnt->result = 0;
memset(SCpnt->sense_buffer, 0, SCSI_SENSE_BUFFERSIZE);
good_bytes = xfer_size;
break;
case ILLEGAL_REQUEST: /*不合法的请求*/
if (SCpnt->device->use_10_for_rw &&
(SCpnt->cmnd[0] == READ_10 ||
SCpnt->cmnd[0] == WRITE_10))
SCpnt->device->use_10_for_rw = 0;
if (SCpnt->device->use_10_for_ms &&
(SCpnt->cmnd[0] == MODE_SENSE_10 ||
SCpnt->cmnd[0] == MODE_SELECT_10))
SCpnt->device->use_10_for_ms = 0;
break;
default:
break;
}
out:
scsi_io_completion(SCpnt, good_bytes);
}如果 sense key 不够,还需要一些附加的信息,通过 sense_buffer 里面的数据 asc 和 ascq 表示。sense key 主要包括以下几个:
- ❑
HARDWARE_ERROR:硬件错误。 - ❑
MEDIUM_ERROR:坏道错误。 - ❑
RECOVERED_ERROR:可修复的错误。 - ❑
NO_SENSE:无 sense 数据。 - ❑
ILLEGAL_REQUEST:不合法的请求。
NOTE
RECOVERED_ERROR和NO_SENSE代表命令已经成功执行了,这两个 sense key 作用只是提示用户。
变量 good_bytes 的含义有两种:
- 如果命令成功完成,
good_bytes就是 scsi 命令设定的读写数值; - 如果命令部分完成(比如由于坏道错误只读了一部分数据),
good_bytes代表完成的字节数。
2. scsi_io_completion 函数
good_bytes 要作为输入参数调用 scsi_io_completion,由这个函数继续 I/O 的返回过程,如代码清单 11-28 所示。
代码清单 11-28 scsi_io_completion 函数
void scsi_io_completion(struct scsi_cmnd *cmd, unsigned int good_bytes)
{
int result = cmd->result;
int this_count = cmd->request_bufflen;
request_queue_t *q = cmd->device->request_queue;
struct request *req = cmd->request;
int clear_errors = 1;
struct scsi_sense_hdr sshdr;
int sense_valid = 0;
int sense_deferred = 0;
scsi_release_buffers(cmd);
if (result) {
sense_valid = scsi_command_normalize_sense(cmd, &sshdr);
if (sense_valid)
sense_deferred = scsi_sense_is_deferred(&sshdr);
}
/*scsi命令来自SG_IO,也就是说不是来自于文件系统*/
if (blk_pc_request(req)) { /* SG_IO ioctl from block level */
req->errors = result;
if (result) {
clear_errors = 0;
if (sense_valid && req->sense) {
/*
* SG_IO wants current and deferred errors
*/
int len = 8 + cmd->sense_buffer[7];
/*复制返回的sense buffer内容*/
if (len > SCSI_SENSE_BUFFERSIZE)
len = SCSI_SENSE_BUFFERSIZE;
memcpy(req->sense, cmd->sense_buffer, len);
req->sense_len = len;
}
} else
req->data_len = cmd->resid;
}scsi_io_completion 第一部分和 sd_rw_intr 函数一样,都根据 scsi 命令是否成功完成,将 sense key 和 asc、ascq 值复制出来。然后要检查 I/O 命令是否来自 SG_IO(SG_IO 指 scsi 命令来自于文件系统的 I/O control 调用,而不是来自文件系统本身)。这种情况要把返回命令的 sense buffer 整个复制出来。
scsi_io_completion 第二部分调用 scsi_end_request 函数返回上层处理。
if (clear_errors)
req->errors = 0;
/*命令成功完成返回,否则需要进一步处理*/
if (scsi_end_request(cmd, 1, good_bytes, result == 0) == NULL)
return;如果命令成功完成,整个 I/O 返回过程就执行完毕,如果命令没有成功完成,还需要第三部分处理错误。
命令是否成功完成,是由输入参数 good_bytes 决定的。
/* good_bytes = 0, or (inclusive) there were leftovers and
* result = 0, so scsi_end_request couldn't retry.
*/
if (sense_valid && !sense_deferred) {
switch (sshdr.sense_key) {
case UNIT_ATTENTION:
if (cmd->device->removable) {
/*如果设备被移走,则结束请求*/
cmd->device->changed = 1;
scsi_end_request(cmd, 0, this_count, 1);
return;
} else {
/*重新插入命令*/
scsi_requeue_command(q, cmd);
return;
}
break;
case ILLEGAL_REQUEST:
if ((cmd->device->use_10_for_rw &&
sshdr.asc == 0x20 && sshdr.ascq == 0x00) &&
(cmd->cmnd[0] == READ_10 ||
cmd->cmnd[0] == WRITE_10)) {
cmd->device->use_10_for_rw = 0;
/* This will cause a retry with a
* 6-byte command.
*/
scsi_requeue_command(q, cmd);
return;
} else {
scsi_end_request(cmd, 0, this_count, 1);
return;
}
break;
case NOT_READY:
if (sshdr.asc == 0x04) {
switch (sshdr.ascq) {
case 0x01: /* becoming ready */
case 0x04: /* format in progress */
case 0x05: /* rebuild in progress */
case 0x06: /* recalculation in progress */
case 0x07: /* operation in progress */
case 0x08: /* Long write in progress */
case 0x09: /* self test in progress */
scsi_requeue_command(q, cmd);
return;
default:
break;
}
}
if (!(req->flags & REQ_QUIET)) {
scmd_printk(KERN_INFO, cmd, "Device not ready: ");
scsi_print_sense_hdr("", &sshdr);
}
scsi_end_request(cmd, 0, this_count, 1);
return;
case VOLUME```c
case VOLUME_OVERFLOW:
if (!(req->flags & REQ_QUIET)) {
scmd_printk(KERN_INFO, cmd, "Volume overflow, CD");
__scsi_print_command(cmd->cmnd);
scsi_print_sense("", cmd);
}
/* See SSC3rXX or current. */
scsi_end_request(cmd, 0, this_count, 1);
return;
default:
break;
}
}
/*如果返回DID_RESET,则重新插入命令*/
if (host_byte(result) == DID_RESET) {
scsi_requeue_command(q, cmd);
return;
}
if (result) {
if (!(req->flags & REQ_QUIET)) {
}
}
scsi_end_request(cmd, 0, this_count, !result);
}scsi_io_completion 第三部分根据 sense key 和 asc、ascq 的值决定处理的方式.处理方式其实比较简单,一种是结束请求,另一种是把请求重新插入电梯队列.
值得注意的是,由于 scsi 硬件种类繁杂,错误定义混乱,scsi 返回信息很多时候不能准确反映设备的情况,这造成内核 I/O 处理的异常.比如有的 scsi 硬件总是返回 DID_RESET,结果 I/O 请求无限次重插,CPU 占有率达到 100%,阻塞了操作系统的运行.
3. scsi_end_request 函数
scsi_io_completion 函数分析完成后,还需要重点分析 scsi_end_request 函数,观察 I/O 的返回过程.它的代码如代码清单 11-29 所示.
代码清单 11-29 scsi_end_request 函数
static struct scsi_cmnd *scsi_end_request(struct scsi_cmnd *cmd,
int bytes, int requeue)
{
request_queue_t *q = cmd->device->request_queue;
struct request *req = cmd->request;
unsigned long flags;
/*根据uptodate和goodbytes结束请求*/
if (end_that_request_chunk(req, uptodate, bytes)) {
/*如果返回非0,说明还有块未完成*/
int leftover = (req->hard_nr_sectors << 9);
if (blk_pc_request(req))
leftover = req->data_len;
/* kill remainder if no retrys */
if (!uptodate && blk_noretry_request(req))
/*如果返回了fastfail,说明不需要重试,则直接结束*/
end_that_request_chunk(req, 0, leftover);
else {
/*重插入队列*/
if (requeue) {
scsi_requeue_command(q, cmd);
cmd = NULL;
}
return cmd;
}
}
……/*省略部分代码*/
scsi_next_command(cmd);
return NULL;
} scsi_end_request 函数调用 end_that_request_chunk 函数结束 I/O 的返回过程.如果返回结果不为 0,说明 I/O 没有成功完成,根据返回结果有两种处理方式.如果返回结果带有 REQ_FAILFAST 标志,指示快速返回,不需要重试,直接结束 I/O,否则就将 I/O 重新插入电梯队列,再次执行.
最后调用 scsi_next_command 从队列中挑选下一个 I/O 执行.
参数 good_bytes 说明了 I/O 的完成情况,是部分完成还是全部完成.scsi_end_request 要根据 good_bytes 确定如何结束 I/O 请求.
end_that_request_chunk 调用了 __end_that_request_first 来完成 bio 的返回.回顾第10章的内容,bio 设置的回调函数是 mpage_end_io_read.__end_that_request_first 函数调用这个回调函数,完成唤醒阻塞进程的任务,通知 I/O 完成返回.
11.6 本章小结
通用块层、I/O调度算法和scsi层,共同构建了文件系统之下的I/O处理流程.Linux内核采用了很多对象的思想来设计程序,这样块设备的执行流程和电梯算法都对象化了,用户可以自行设计自己的模块来替换内核提供的软件模块.
第12章 内核回写机制
回顾第10章关于文件写的部分,Linux的写操作只是写数据到page cache中,真正的写磁盘要由内核的pdflush线程完成的.
12.1 内核的触发条件
何时启动写磁盘操作由内核根据一些条件触发pdflush线程完成.内核的触发条件如下.
- 文件系统的写函数
generic_file_buffered_write要调用balance_dirty_pages_ratelimited.后者首先检查累加的dirty页(这是一个CPU变量)是否达到一个限制,超过限制则执行写操作. - 内核定时器触发.当内核启动的时候,要启动一个回写定时器(默认是5秒).当定时时间到达,触发pdflush线程执行写操作.
- 当系统申请内存失败,或者文件系统执行同步(sync)操作,或者内存管理模块试图释放更多内存的时候,都可能触发pdflush线程回写页面.
提示
CPU变量是内核的一种数据结构,每个CPU核都有变量的一个副本.
12.2 内核回写控制参数
内核通过4个参数控制数据回写的算法.这四个参数由proc文件系统提供,用户可以直接修改这些参数从而调整内核回写的行为.具体如表12-1所示.
表12-1 内核回写控制参数
(表内容原文未给出,此处保留占位)
12.3 定时器触发回写
系统初始化的start_kernel函数启动了一个定时器,这个定时器的作用就是推动内核回写数据.因此从start_kernel函数开始分析内核的回写机制.如代码清单12-1所示.
代码清单12-1 start_kernel函数
asmlinkage void __init start_kernel(void)
{
……/*省略无关代码*/
/* rootfs populating might need page-writeback */
page_writeback_init();
……/*省略无关代码*/
}12.3.1 启动定时器
start_kernel函数调用page_writeback_init函数启动定时器,如代码清单12-2所示.
代码清单12-2 page_writeback_init函数
void __init page_writeback_init(void)
{
long buffer_pages = nr_free_buffer_pages();
long correction;
total_pages = nr_free_pagecache_pages();
correction = (100 * 4 * buffer_pages) / total_pages;
if (correction < 100) {
/*脏页比例和背景脏页比例两个参数乘以内存比例*/
dirty_background_ratio *= correction;
dirty_background_ratio /= 100;
vm_dirty_ratio *= correction;
vm_dirty_ratio /= 100;
if (dirty_background_ratio <= 0)
dirty_background_ratio = 1;
if (vm_dirty_ratio <= 0)
vm_dirty_ratio = 1;
}
mod_timer(&wb_timer, jiffies + dirty_writeback_interval);
set_ratelimit();
register_cpu_notifier(&ratelimit_nb);
}page_writeback_init函数首先获得内存区内所有可分配内存的总数.这里的可分配内存包括ZONE_HIGH、ZONE_NORMAL、ZONE_DMA三个管理区.
然后检查当前系统的内存使用情况.buffer_pages是ZONE_DMA和ZONE_NORMAL管理区的可分配内存.如果buffer_pages少于所有可分配内存total_pages的1/4,此时脏页比例和背景脏页比例两个参数要调整,调整算法是比例参数乘以内存之比.通过放大比例参数达到尽快回写数据回收内存的目的.
注意
此处涉及内核内存管理和分配的知识.简单说,32位操作系统的内核将内存划分为三个管理区,分别是
ZONE_DMA、ZONE_NORMAL、ZONE_HIGH.
set_ratelimit函数设置内存页面限制.这个参数的上限是4兆内存,按4K的页面计算,限制在16个页面和1024个页面之间,具体根据CPU个数和可分配内存数决定.如果页面尺寸不是4K,就要按4兆内存除以实际页面尺寸计算真正的页面最大值.
page_writeback_init函数启动定时器wb_timer作为回写定时器.这个定时器如代码清单12-3所示.
代码清单12-3 wb_timer_fn函数
static DEFINE_TIMER(wb_timer, wb_timer_fn, 0, 0);
static void wb_timer_fn(unsigned long unused)
{
if (pdflush_operation(wb_kupdate, 0) < 0)
mod_timer(&wb_timer, jiffies + HZ); /* delay 1 second */
}
int pdflush_operation(void (*fn)(unsigned long), unsigned long arg0)
{
unsigned long flags;
int ret = 0;
BUG_ON(fn == NULL); /* Hard to diagnose if it's deferred */
spin_lock_irqsave(&pdflush_lock, flags);
if (list_empty(&pdflush_list)) {
spin_unlock_irqrestore(&pdflush_lock, flags);
ret = -1;
} else {
struct pdflush_work *pdf;
pdf = list_entry(pdflush_list.next, struct pdflush_work, list);
list_del_init(&pdf->list);
if (list_empty(&pdflush_list))
last_empty_jifs = jiffies;
pdf->fn = fn;
pdf->arg0 = arg0;
wake_up_process(pdf->who);
spin_unlock_irqrestore(&pdflush_lock, flags);
}
return ret;
}定时器wb_timer的执行函数是wb_timer_fn.后者调用pdflush_operation函数,从pdflush_list全局链表获得一个pdflush_work结构,设置pdflush_work结构的工作函数wb_kupdate,然后把pdflush_work结构从链表中删除,唤醒pdflush内核线程.
pdflush线程是内核启动的一个内核线程,它的执行函数体是pdflush.这个函数的作用是执行pdflush_work结构设置的工作函数(就是注册的wb_kupdate函数).
12.3.2 执行回写操作
wb_kupdate是内核回写的控制函数,我们从这个函数开始分析回写机制,如代码清单12-4所示.
代码清单12-4 wb_kupdate函数
static void wb_kupdate(unsigned long arg)
{
……/*省略部分代码*/
oldest_jif = jiffies - dirty_expire_interval;
start_jif = jiffies;
next_jif = start_jif + dirty_writeback_interval;
nr_to_write = global_page_state(NR_FILE_DIRTY) +
global_page_state(NR_UNSTABLE_NFS) +
(inodes_stat.nr_inodes - inodes_stat.nr_unused);
while (nr_to_write > 0) {
wbc.encountered_congestion = 0;
/*一次写页面的最大值为1024*/
wbc.nr_to_write = MAX_WRITEBACK_PAGES;
writeback_inodes(&wbc);
if (wbc.nr_to_write > 0) {
if (wbc.encountered_congestion)
blk_congestion_wait(WRITE, HZ/10);
else
break; /* All the old data is written */
}
nr_to_write -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
}
/*如果当前时间加1秒已超过下一轮的定时器时间,设置定时器为当前时间加1秒*/
if (time_before(next_jif, jiffies + HZ))
next_jif = jiffies + HZ;
/*更新定时器*/
if (dirty_writeback_interval)
mod_timer(&wb_timer, next_jif);
}wb_kupdate函数首先计算三个时间.
start_jif:当前时间.oldest_jif:脏页面允许存在的最长时间,早于这个时间的页面必须回写.这个值默认是当前时间减去30秒.next_jif:下一次执行回写的时间.默认是5秒,就是每5秒触发一次wb_kupdate.
其次检查系统内有多少可写的页面.如果大于0,则调用writeback_inodes执行回写.
执行一次回写操作后,如果还有页面未写并且设置了阻塞标志,则启动阻塞处理.阻塞处理是等1/10秒,然后继续执行下一次回写.如果未设置阻塞标志,则直接进行下一轮的回写.
12.3.3 检查需要回写的页面
writeback_inodes函数用于扫描系统的超级块,检查需要回写的页面,如代码清单12-5所示.
代码清单12-5 writeback_inodes函数
writeback_inodes(struct writeback_control *wbc)
{
struct super_block *sb;
might_sleep();
spin_lock(&sb_lock);
restart:
sb = sb_entry(super_blocks.prev);
for (; sb != sb_entry(&super_blocks); sb =
sb_entry(sb->s_list.prev)) {
if (!list_empty(&sb->s_dirty) || !list_empty(&sb->s_io))
/* we're making our own get_super here */
sb->s_count++;
spin_unlock(&sb_lock);
if (down_read_trylock(&sb->s_umount)) {
if (sb->s_root) {
spin_lock(&inode_lock);
sync_sb_inodes(sb, wbc);
spin_unlock(&inode_lock);
}
……/*省略部分代码*/writeback_inodes函数首先遍历系统的超级块对象,如果超级块的脏页面链表非空,则调用sync_sb_inodes回写超级块内的inode.
12.3.4 回写超级块内的inode
因为每个文件系统有一个超级块,而文件系统内所有的inode结构都链接到超级块对象的链表头,sync_sb_inodes函数要检查超级块对象内所有需要回写的inode,如代码清单12-6所示.
代码清单12-6 sync_sb_inodes函数
static void
sync_sb_inodes(struct super_block *sb, struct writeback_control *wbc)
{
const unsigned long start = jiffies; /* livelock avoidance */
/*将dirty链表合并到s_io链表*/
if (!wbc->for_kupdate || list_empty(&sb->s_io))
list_splice_init(&sb->s_dirty, &sb->s_io);
while (!list_empty(&sb->s_io)) {
struct inode *inode = list_entry(sb->s_io.prev, struct inode, i_list);
struct address_space *mapping = inode->i_mapping;
struct backing_dev_info *bdi = mapping->backing_dev_info;
long pages_skipped;
/*无回写能力的处理*/
if (!bdi_cap_writeback_dirty(bdi)) {
list_move(&inode->i_list, &sb->s_dirty);
if (sb == blockdev_superblock) {
continue;
}
break;
}
/*bdi指示写拥塞。设置拥塞标志*/
if (wbc->nonblocking && bdi_write_congested(bdi)) {
wbc->encountered_congestion = 1;
if (sb != blockdev_superblock)
break; /* Skip a congested fs */
list_move(&inode->i_list, &sb->s_dirty);
continue; /* Skip a congested blockdev */
}
if (wbc->bdi && bdi != wbc->bdi) {
if (sb != blockdev_superblock)
break; /* fs has the wrong queue */
list_move(&inode->i_list, &sb->s_dirty);
continue; /* blockdev has wrong queue */
}sync_sb_inodes函数第一部分遍历超级块链表的脏inode结构,检查是否可回写.
根据第9章块设备文件系统的分析,所有块设备的超级块是块设备文件系统提供的全局变量blockdev_superblock.
- 如果inode不具备回写能力,且超级块等于
blockdev_superblock,说明这个inode是一个块设备文件的inode结构,这种情况继续遍历块设备文件系统的其他inode,检查是否可写. - 如果超级块不等于
blockdev_superblock,说明这是一个文件系统,这种情况跳过整个超级块,检查下一个超级块里面的inode. - 如果写拥塞,和上面的处理类似.
- 如果文件系统拥塞,跳过整个超级块,检查下一个超级块.
- 如果块设备拥塞,只跳过该块设备,检查超级块链表的下一个块设备.
2. 检查时间参数
sync_sb_inodes函数第二部分首先检查时间参数.
- 如果inode结构置脏的时间在
sync_sb_inodes函数开始执行的时间之后,跳出循环. - 如果inode结构置脏的时间早于设定的回写时间(回写时间默认是当前时间之前30秒),同样跳出循环.
- 如果有另外一个pdflush线程也在回写队列,同样跳出循环.
/*inode的dirty时间在sync_sb_inodes调用之后*/
if (time_after(inode->dirtied_when, start))
break;
/* Was this inode dirtied too recently? */
if (wbc->older_than_this && time_after(inode->dirtied_when,
*wbc->older_than_this))
break;
/*是否已经有另一个pdflush在回写这个inode*/
if (current_is_pdflush() && !writeback_acquire(bdi))
break;
BUG_ON(inode->i_state & I_FREEING);
__iget(inode);
pages_skipped = wbc->pages_skipped;
__writeback_single_inode(inode, wbc);
……/*省略部分代码*/3. 检查inode状态
如果检查都通过,调用__writeback_single_inode检查inode的状态,如代码清单12-7所示.
代码清单12-7 __writeback_single_inode函数
static int
__writeback_single_inode(struct inode *inode, struct writeback_control *wbc)
{
wait_queue_head_t *wqh;
if (!atomic_read(&inode->i_count))
WARN_ON(!(inode->i_state & (I_WILL_FREE|I_FREEING)));
else
WARN_ON(inode->i_state & I_WILL_FREE);
if ((wbc->sync_mode != WB_SYNC_ALL) && (inode->i_state & I_LOCK)) {
list_move(&inode->i_list, &inode->i_sb->s_dirty);
return 0;
}
if (inode->i_state & I_LOCK) {
DEFINE_WAIT_BIT(wq, &inode->i_state, __I_LOCK);
wqh = bit_waitqueue(&inode->i_state, __I_LOCK);
do {
spin_unlock(&inode_lock);
__wait_on_bit(wqh, &wq, inode_wait, TASK_UNINTERRUPTIBLE);
spin_lock(&inode_lock);
} while (inode->i_state & I_LOCK);
}
return __sync_single_inode(inode, wbc);
}__writeback_single_inode函数执行两个条件判断.
- 如果inode状态已经是LOCK,且回写模式不是
WB_SYNC_ALL,把inode链接到超级块的s_dirty链表,然后退出. - 如果回写模式是
WB_SYNC_ALL,一直等待,直到inode解除LOCK状态,然后调用__sync_single_inode函数进行文件的回写.
4. 文件回写操作
__sync_single_inode函数如代码清单12-8所示.
代码清单12-8 __sync_single_inode函数
static int
__sync_single_inode(struct inode *inode, struct writeback_control *wbc)
{
unsigned dirty;
struct address_space *mapping = inode->i_mapping;
struct super_block *sb = inode->i_sb;
int wait = wbc->sync_mode == WB_SYNC_ALL;
int ret;
BUG_ON(inode->i_state & I_LOCK);
/* Set I_LOCK, reset I_DIRTY */
dirty = inode->i_state & I__sync_single_inode函数第一部分执行脏数据和inode本身的回写。为了表示inode数据的各种置脏条件,内核定义了三个参数。
- `I_DIRTY_SYNC`:表示只是inode访问时间发生了变化。
- `I_DIRTY_DATASYNC`:表示inode其他属性数据发生了变化,比如文件的长度。
- `I_DIRTY_PAGES`:文件的内容数据发生了变化。
`__sync_single_inode`函数首先调用`do_writepages`将所有置脏的数据页面进行回写(根据控制参数`wbc`设置的条件)。
如果inode设置了`I_DIRTY_SYNC`或者`I_DIRTY_DATASYNC`,说明不只是文件的内容,文件本身的属性也发生了改变,因此调用`write_inode`函数对inode本身进行回写。当`write_inode`返回时,已经完成了inode的回写或者是发生了错误。
如果控制参数`wbc`设置了`WB_SYNC_ALL`,说明是紧要数据,需要等所有的数据回写完成。因此调用`filemap_fdatawait`等待数据回写完成。
`__sync_single_inode`函数第二部分首先检查文件是否有数据未进行回写。
```c
spin_lock(&inode_lock);
inode->i_state &= ~I_LOCK;
if (!(inode->i_state & I_FREEING)) {
if (!(inode->i_state & I_DIRTY) &&
mapping_tagged(mapping, PAGECACHE_TAG_DIRTY)) {
if (wbc->for_kupdate) {
/*让inode得到更多的回写机会*/
inode->i_state |= I_DIRTY_PAGES;
list_move_tail(&inode->i_list, &sb->s_dirty);
} else {
/*修改inode的置脏时间,让其他inode有更多的回写机会*/
inode->i_state |= I_DIRTY_PAGES;
inode->dirtied_when = jiffies;
list_move(&inode->i_list, &sb->s_dirty);
}
} else if (inode->i_state & I_DIRTY) {
list_move(&inode->i_list, &sb->s_dirty);
} else if (atomic_read(&inode->i_count)) {
list_move(&inode->i_list, &inode_in_use);
} else {
list_move(&inode->i_list, &inode_unused);
}
}
/*唤醒阻塞的进程*/
wake_up_inode(inode);
return ret;
}这一步通过调用mapping_tagged检查文件的page cache状态。
- 如果仍有置脏的数据页面,要将inode状态设置
I_DIRTY_PAGES标志位。 - 如果没有其他的置脏的数据页面,但是inode本身被置脏,说明有别的任务修改了inode的属性,此时要将inode加入超级块的脏inode链表。
- 如果以上条件都不成立且inode引用计数不为0,说明文件没有置脏的数据页面,文件属性也没发生改变,将inode加入
inode_in_use链表,如果引用计数为0,则将inode加入inode_unused链表。
12.4 平衡写
平衡写在文件系统写过程的上下文执行。第10章文件系统读写分析已经观察到写过程调用了 balance_dirty_pages_ratelimited 函数。这个函数作用是检查脏页面是否到达限值,是否进行平衡写操作。具体如代码清单12-9所示。
代码清单12-9
balance_dirty_pages_ratelimited函数balance_dirty_pages_ratelimited(struct address_space *mapping) { balance_dirty_pages_ratelimited_nr(mapping, 1); } void balance_dirty_pages_ratelimited_nr(struct address_space *mapping, unsigned long nr_pages_dirtied) { static DEFINE_PER_CPU(unsigned long, ratelimits) = 0; unsigned long ratelimit; unsigned long *p; ratelimit = ratelimit_pages; if (dirty_exceeded) ratelimit = 8; /* * Check the rate limiting. Also, we do not want to throttle * tasks in balance_dirty_pages(). Period. */ preempt_disable(); p = &__get_cpu_var(ratelimits); *p += nr_pages_dirtied; if (unlikely(*p >= ratelimit)) { *p = 0; preempt_enable(); balance_dirty_pages(mapping); return; } preempt_enable(); }
12.4.1 检查直接回写的条件
balance_dirty_pages_ratelimited 函数调用了 balance_dirty_pages_ratelimited_nr 函数。后者首先定义了一个CPU变量 ratelimits,初始化为0。每次写页面的时候,这个变量累加脏页面的总量,当脏页面超过前面定义的内存页面限制值,则调用 balance_dirty_pages 开始执行回写操作。balance_dirty_pages 函数如代码清单12-10所示。
代码清单12-10
balance_dirty_pages函数static void balance_dirty_pages(struct address_space *mapping) { ....../*省略部分代码*/ for (;;) { struct writeback_control wbc = { .bdi = bdi, .sync_mode = WB_SYNC_NONE, .older_than_this = NULL, .nr_to_write = write_chunk, .range_cyclic = 1, }; get_dirty_limits(&background_thresh, &dirty_thresh, mapping); nr_reclaimable = global_page_state(NR_FILE_DIRTY) + global_page_state(NR_UNSTABLE_NFS); if (nr_reclaimable + global_page_state(NR_WRITEBACK) <= break; if (!dirty_exceeded) dirty_exceeded = 1; }
balance_dirty_pages 函数第一部分首先调用 get_dirty_limits 获得设置的脏页面回写数目。前面通过 page_writeback_init 函数已经设置了系统的脏页面比例。变量 background_thresh 和 dirty_thresh 分别代表背景写的脏页面数目和直接回写的脏页面数目。
12.4.2 回写系统脏页面的条件
然后计算系统内需要回写的脏页面数目总和。如果这些页面总数超过 dirty_thresh 的限制,则进入直接回写,否则退出循环,检查是否超过了 background_thresh 限制。
继续
balance_dirty_pages函数if (nr_reclaimable) { writeback_inodes(&wbc); get_dirty_limits(&background_thresh, &dirty_thresh, mapping); nr_reclaimable = global_page_state(NR_FILE_DIRTY) + global_page_state(NR_UNSTABLE_NFS); if (nr_reclaimable + global_page_state(NR_WRITEBACK) <= dirty_threshold) break; pages_written += write_chunk - wbc.nr_to_write; if (pages_written >= write_chunk) break; /* We've done our duty */ } blk_congestion_wait(WRITE, HZ/10); } if (nr_reclaimable + global_page_state(NR_WRITEBACK) <= dirty_thresh && dirty_exceeded) dirty_exceeded = 0; /*如果pdflush线程已经在写当前队列,退出*/ if (writeback_in_progress(bdi)) return; /* pdflush is already working this queue */ /*如果是笔记本模式,要等达到直接写的标准才开始背景写*/ if ((laptop_mode && pages_written) || (!laptop_mode && (nr_reclaimable > background_thresh))) pdflush_operation(background_writeout, 0); }
balance_dirty_pages 函数第二部分调用 writeback_inodes 回写系统的脏页面。回写完毕,要执行两个检查:一是检查脏页和回写页是否大于 dirty_thresh 脏页限制,二是检查已经回写的页面是否小于设置的回写页面数,如果两个条件都满足,则阻塞进程1/10秒,然后再次回写。
12.4.3 检查计算机模式
最后,balance_dirty_pages 函数要检查计算机的模式。
- ❑ 笔记本模式:这种模式回写的标准比较高,要等待达到标准较高的直接写条件,才进行背景写。
- ❑ 普通模式:检查脏页面是否超过背景写的限制数
background_thresh,如果超过,触发pdflush线程进行回写。
writeback_inodes 前文已经分析过,不再赘述。
12.5 本章小结
Linux内核的回写机制提供了一种缓写机制,真实的写并不是直接写入硬盘,而是在 page cache 中缓存,等待合适的时机才真正执行写入。这种机制存在的前提是适配硬盘的物理特性,在内核软件的设计中,乃至用户软件的设计中,考虑硬件特性并适配之,是系统设计的重要方面。
第13章 一个真实文件系统 ext2
通过前面章节的分析和学习,本章分析一个真正的文件系统。本章选的例子是 Linux 的 ext2。ext2 是 Linux 系统使用最广泛的文件系统,它和 ext3 构成 Linux 文件系统的基石。而 ext3 和 ext2 的结构基本相同,只是 ext3 多了日志功能。本章不再分析代码,主要介绍 ext2 的磁盘布局和超级块,以及 inode 结构,代码分析工作留给读者。原因是 Linux 内核庞大复杂无比,任何书都不可能完备的分析代码。通过本书内核基础层和应用层的分析,读者可以从架构层次掌握系统的框架和脉络,在这个基础上,可以比较流畅的分析阅读内核代码。
13.1 ext2 的硬盘布局
根据 Linux 文件系统的知识,文件和 dentry 是内存中的结构,并不真正存在于硬盘中。真正存在于硬盘的是超级块结构和 inode 结构。首先看 ext2 的硬盘布局,如图13-1所示。
图13-1 ext2硬盘布局(此处为图片占位,原文包含此图)
描述:硬盘的第一个扇区是引导区,占据1K字节。ext2 通过块组的方式组织硬盘。每个硬盘分区由若干个大小相同的块组组成。
硬盘的第一个扇区是引导区,占据1K字节,引导区是文件系统不能使用的,用来存储分区信息。ext2 是通过块组的方式来组织硬盘。每个硬盘分区都由若干个大小相同的块组组成。每个块组包括下列信息:
- ❑ 超级块(super block):每个块组的起始位置都有一个超级块,这些超级块的内容都是相同的。超级块包括文件系统的信息,比如每个块组的块数目、每个块组的 inode 数目等。
- ❑ 块组描述符表(GDT):块组描述符表由很多的块组描述符组成。ext2 的每个块组描述符为32字节。整个文件系统分区有多少个块组,就有多少个块组描述符。块组描述符描述了一个块组的信息,比如一个块组中 inode 位图的起始位置、inode 表的起始位置等。
- ❑ 块位图(block bitmap):每个比特代表块组中哪些块可用,哪些块已经被占有。块位图本身要占有一个块。如果块大小设置为1K字节,则一个块组的大小为1K×1K×8bit = 8M字节。
- ❑ inode 位图(inode bitmap):inode 位图也占用一个块。它的每一位代表一个 inode 是否空闲。
- ❑ inode 表(inode table):每个文件都有一个 inode,inode 保存了文件的描述信息、文件的类型、文件的大小、文件的创建访问时间等。一个 inode 占128字节。如果文件系统块大小为1K字节,一个块可容纳8个 inode。inode 表可以占用多个块。
- ❑ 数据块(data block):保存文件的内容。常规文件的数据保存在数据块中,如果是目录文件,那么该目录下所有的文件名和下级目录名都保存在数据块中。
13.2 ext2 文件系统目录树
文件系统的层次结构可以用目录树来表示。对 ext2 文件系统而言,根目录是一个固定的 inode。通过读根目录的内容,就可以获得根目录下的文件的 inode 信息、文件类型和文件名字。如果该文件是目录文件,重复这个过程,就可以获得二级目录的信息。
ext2 目录文件的结构如代码清单13-1所示。
代码清单13-1
ext2_dir_entry_2结构struct ext2_dir_entry_2 { __le32 inode; /* Inode number */ __le16 rec_len; /* Directory entry length */ __u8 name_len; /* Name length */ __u8 file_type; char name[EXT2_NAME_LEN]; /* File name */ };
根目录是一个固定的 inode,它的 inode 号是2。当文件系统初始化的时候,读到超级块的内容,就获得了文件块的大小、每个块组的块数目、每个块组的 inode 数目。从超级块之后的块组描述符表,可以获得所有的块组信息。
通过超级块提供的信息和块组描述符表信息,就可以获得根目录在 inode 表的位置,从而读到根目录的内容。从根目录的内容里面,进而获得根目录下文件的 inode 号和文件类型。
如果这是个目录文件,那么重复上面的过程,可以获得二级目录的目录结构。不断重复这个过程,最终获得整个文件系统的目录树。
13.3 ext2 文件内容管理
ext2 的 inode 信息可以存放15个块的地址,用户数据就存放在这些块中。但是15个块不一定能存放全面的用户数据,那么可以采用分层的方式存储。这15个块中,前12个块是直接数据块,直接存放用户数据。而第13个块,是一级索引块,这个块存放的又是块的地址,这些块地址指向的块才真正存放用户数据。而第14个块,是二级索引块,比一级索引块又多了一层,而第15个块,是三级索引块,比二级索引块又多了一层。通过这种方式,大大扩展了 ext2 文件系统的文件存放内容。
13.4 ext2 文件系统读写
对一个文件系统来说,最重要基础的部分无非是打开和创建文件以及文件的读写。
通过文件系统的目录树,可以获得文件所在目录的上级目录。读上级目录的内容,就可以获得文件的 inode 号和文件名字。根据文件的 inode 号,可以获得文件的 inode 信息。也就获得了文件的类型、创建修改时间、文件大小等信息,完成打开文件的过程。
文件的读写,首先要获得文件数据内容的物理扇区位置。在文件的 inode 信息里,保存了15个文件块的地址。这15个文件块通过分层方式,存储了文件的数据内容。知道了文件数据的块地址,就可以获得文件数据的物理扇区地址。根据第10章的分析,可以真正对文件执行读写操作。
13.5 本章小结
本章可以作为对内核学习的一个实战。读者应该结合 ext2 文件系统的代码,分析文件系统的读写和打开的过程,深度理解一个广泛使用的文件系统。内核本身在不断更新,想利用内核知识解决实际问题,必须具备快速阅读代码的能力,从代码中学习,从代码中实践。