第11章 通用块层和scsi层

在内核中通用块层scsi层的位置,上接文件系统的VFS层,下接硬盘驱动。通用块层的作用就是处理I/O的合并或者排序。而scsi层的作用主要是管理scsi设备、处理的设备的上线和离线、为设备加载合适的驱动等。scsi层同时是通用块层的下一层,I/O处理的时候也需要经过scsi层。通用块层和scsi层的一部分共同执行I/O的处理过程。

scsi层在内核中担当特殊的角色。即使某些不是scsi设备的磁盘(比如ATA格式的硬盘),在内核中也是通过scsi层来管理的。本章关注重点是I/O的处理过程,主要涉及通用块层和scsi层的一部分,scsi层对设备的管理功能不在本章讨论范围之内。

11.1 块设备队列

第9章分析了块设备的队列和队列处理函数。在Linux内核,读写操作是以一个个请求的方式出入块设备的队列。一个请求可以代表一个I/O,也可以代表多个I/O。如果一个I/O和其他I/O发生了合并,这两个I/O在一个请求里面。所有需要执行的请求,都要链接到块设备队列的链表。第9章还分析了块设备队列的电梯对象,电梯提供了块设备排序的算法和排序结构,本章将继续讨论。

11.1.1 scsi块设备队列处理函数

scsi作为一个广泛使用的框架,提供了一系列的块设备队列处理函数。这是在scsi_alloc_queue函数中实现的,如代码清单11-1所示。

代码清单11-1 scsi_alloc_queue函数

struct request_queue *scsi_alloc_queue(struct scsi_device *sdev)
{
  struct Scsi_Host *shost = sdev->host;
  struct request_queue *q;
  q = blk_init_queue(scsi_request_fn, NULL);
  if (!q)
      return NULL;
  blk_queue_prep_rq(q, scsi_prep_fn);
  blk_queue_max_hw_segments(q, shost->sg_tablesize);
  blk_queue_max_phys_segments(q, SCSI_MAX_PHYS_SEGMENTS);
  blk_queue_max_sectors(q, shost->max_sectors);
  blk_queue_bounce_limit(q, scsi_calculate_bounce_limit(shost));
  blk_queue_segment_boundary(q, shost->dma_boundary);
  blk_queue_issue_flush_fn(q, scsi_issue_flush_fn);
  blk_queue_softirq_done(q, scsi_softirq_done);
  if (!shost->use_clustering)
      clear_bit(QUEUE_FLAG_CLUSTER, &q->queue_flags);
  return q;
}

scsi_alloc_queue函数为scsi块设备创建队列结构时调用。默认为scsi设备提供了出队列函数scsi_request_fn和软中断完成函数scsi_softirq_done

11.1.2 电梯算法和对象

电梯算法在内核中已经被抽象为一个对象,只要实现一些基本的电梯函数,就可以提供一个电梯算法。电梯结构的定义如代码清单11-2所示。

代码清单11-2 elevator_noop算法

static struct elevator_type elevator_noop = {
  .ops = {
      .elevator_merge_req_fn                = noop_merged_request,
      .elevator_dispatch_fn                = noop_dispatch,
      .elevator_add_req_fn                = noop_add_request,
      .elevator_queue_empty_fn                = noop_queue_empty,
      .elevator_former_req_fn                = noop_former_req,
      .elevator_latter_req_fn                = noop_latter_req,
      .elevator_init_fn                        = noop_init_queue,
      .elevator_exit_fn                        = noop_exit_queue,
  },
  .elevator_name = "noop",
  .elevator_owner = THIS_MODULE,
};

电梯结构elevator_type最重要的执行函数是elevator_add_req_fnelevator_dispatch_fn,分别用来向电梯加入一个请求和从电梯获得一个请求。而elevator_merge_req_fn则实现I/O的合并。在下文将看到这些函数的使用方式。

电梯算法一般需要维护一个队列,这个队列是为了对请求排序或者执行I/O合并。

11.2 硬盘HBA抽象层

HBA(Host Bus Adapter,主机总线适配器)通常用来连接计算机内部总线和存储系统。用来接入硬盘的设备,如果是一个PCI设备,它既是一个PCI设备,同时支持SCSI硬盘或者ATA硬盘,它就是一个HBA设备。

HBA设备的驱动,既是PCI驱动,同时又要管理和控制硬盘,所以它也可算作是硬盘驱动。scsi层最终要把scsi命令发送到硬盘的驱动层。硬盘驱动层是内核软件的最后一层,当硬盘驱动把命令发送到硬件后,就脱离了软件控制的范围。

为便于管理硬盘驱动,内核抽象出一个scsi_host_template对象,所有的硬盘驱动都要以这种方式提供。将scsi_host_template结构的定义简化后,如代码清单11-3所示。

代码清单11-3 scsi_host_template函数

struct scsi_host_template {
  const char *name;
  int (* queuecommand)(struct scsi_cmnd *, void (*done)(struct scsi_cmnd *));
  int (* eh_abort_handler)(struct scsi_cmnd *);
  int (* eh_device_reset_handler)(struct scsi_cmnd *);
  ......
  int (* eh_host_reset_handler)(struct scsi_cmnd *);
};

scsi_host_template结构的重要成员做如下解释。

  • name:HBA卡的名字。
  • queuecommand:I/O函数。通过这个函数将scsi命令发送到HBA卡,完成一次I/O。
  • eh_abort_handler:撤销一个scsi命令。
  • eh_device_reset_handler:reset某个硬盘设备。这个硬盘设备将离线,然后重新上线。
  • eh_host_reset_handler:reset整个适配器芯片。执行这个调用,整个适配器芯片被重启,所有的硬盘离线,然后重新被扫描一次。

scsi_host_template结构提供的调用函数中,异常处理占了很大一部分。对一个I/O来说,执行结果可以分为几种情况。

  • I/O命令执行完成,而且I/O执行成功。
  • I/O命令执行完成,但是I/O未成功,返回有错误。
  • I/O超时未返回。

对于I/O返回有错误的情况,内核根据错误类型,选择再次执行该I/O命令或者交给scsi错误处理任务处理。对I/O超时未返回,则必须由scsi错误处理任务控制。

scsi错误处理任务通常首先abort(取消)出错的命令,如果不能奏效,尝试reset硬盘设备;如果仍然不能奏效,尝试reset总线;如果仍不能奏效,尝试reset整个芯片。由于各个厂家的HBA芯片实现各有不同,笔者发现很多情况下,这个错误处理流程会导致错误。最常见的错误就是CPU占有率100%和abort处理时异常,导致硬盘离线。所以实现用户定制的错误处理逻辑可能是一个正确的选择。

11.3 I/O的顺序控制

I/O的顺序是通用块层中一个比较重要的概念。很多应用要求I/O必须按照指定的顺序执行,为此内核使用了barrier I/O的概念来实现I/O的顺序。指的是barrier I/O之前的I/O必须执行完毕,然后再执行barrier I/O,而barrier I/O之后的I/O必须在barrier I/O执行完毕才能执行,就像I/O队列中插入了一个栅栏。

假设有1,2,3,4,5五个I/O,其中4是barrier I/O。这意味着,必须1,2,3执行完毕再执行4,在4执行完毕之前不能执行任何I/O。

需要指出的是,按照顺序下发1,2,3,4,5并不能保证I/O的完成顺序。这是因为硬盘本身有缓存(cache)和队列,并不是按照下发的顺序来执行I/O。

如何实现I/O顺序?

这就必须利用同步cache命令。即发现4是一个barrier I/O后,插一个同步cache命令,再下发4,然后再插一个同步cache命令。在4执行完毕之前,不能再发送新的I/O命令。有人可能有疑问,1,2,3的执行顺序能否保证?答案是不能,1,2,3既然没设置barrier标志,意味可以乱序执行。

复杂一点的情况是,如果硬件支持FUA标志,也就是说这个I/O会跳过硬盘的buffer,后面的一次同步cache命令就可以省略。更复杂的情况,如果硬件支持TAG队列功能,在执行保证顺序操作的过程中,仍然可以下发新的I/O。在后面的代码中,可以见到对I/O的这种处理方法。

11.4 I/O调度算法

一个I/O调度算法,关键就是实现elevator结构需要的几个函数,然后注册调度算法。用户可以通过proc文件系统选择I/O调度算法,内核将根据设定的I/O调度算法对I/O执行调度。

11.4.1 noop调度算法

noop调度算法是最简单的调度算法,如它的名字所言,基本什么都没做,等同于一个先进先出的调度算法,本身没对I/O进行真正的排序。I/O调度算法的入队列函数决定了I/O是如何插入电梯队列的,因此首先分析入队列函数。noop调度算法的入队列函数是noop_add_request,如代码清单11-4所示。

代码清单11-4 noop_add_request函数

static void noop_add_request(request_queue_t *q, struct request *rq)
{
  struct noop_data *nd = q->elevator->elevator_data;
  list_add_tail(&rq->queuelist, &nd->queue);
}

noop调度算法的入队列函数极其简单,就是把请求加入noop的链表,没有执行任何排序工作。

I/O调度算法的出队列函数决定了I/O是如何被挑选,然后由硬盘驱动执行。noop调度算法的出队列函数是noop_dispatch,如代码清单11-5所示。

代码清单11-5 noop_dispatch函数

static int noop_dispatch(request_queue_t *q, int force)
{
  struct noop_data *nd = q->elevator->elevator_data;
 /*判断队列非空*/
  if (!list_empty(&nd->queue)) {
      struct request *rq;
      /*取出I/O请求*/
      rq = list_entry(nd->queue.next, struct request, queuelist);
      list_del_init(&rq->queuelist);
      /*I/O请求送到块设备的队列*/
      elv_dispatch_sort(q, rq);
      return 1;
  }
  return 0;
}

noop出队列函数从队列头取出最先插入队列的一个I/O,然后将I/O送到块设备的队列。I/O从块设备队列到驱动执行的过程,是由scsi层控制的。

11.4.2 deadline调度算法

和noop调度算法相比,deadline调度算法更复杂,它内部启用了一个红黑树结构来对I/O进行排序,保证I/O出队列时,已经按照扇区地址排好顺序了。deadline调度算法代码在block目录的deadline-iosched文件中。在分析算法前,首先分析deadline调度算法定义的内部数据结构,如代码清单11-6所示。

代码清单11-6 deadline_data结构

struct deadline_data {
  struct rb_root sort_list[2];        
  struct list_head fifo_list[2];
        
  struct deadline_rq *next_drq[2];
  struct hlist_head *hash;        /* request hash */
  unsigned int batching;                /* number of sequential dispatches */
  sector_t last_sector;                /* head position */
  unsigned int starved;                /* times reads have starved */
  int fifo_expire[2];
  int fifo_batch;
  int writes_starved;
  int front_merges;
  mempool_t *drq_pool;
}

从deadline的数据结构可以发现,有两个队列,一个是红黑树sort_list,另一个是链表fifo_list

一个I/O进入deadline队列的时候,要被插入红黑树队列和fifo两个队列,红黑树是按照扇区地址排序的队列,而fifo则按照I/O的先后顺序排序。每个队列都是两成员的数组,这是为了分别保存读请求和写请求,即读写请求分别保存在不同的队列中。

deadline调度算法源文件只有几百行,重点分析插入队列的过程和出队列的过程,就可以基本明白deadline算法的设计思路。首先从入队列函数开始分析,deadline算法的入队列函数是deadline_add_request,如代码清单11-7所示。

代码清单11-7 deadline_add_request函数

static void
deadline_add_request(struct request_queue *q, struct request *rq)
{
  struct deadline_data *dd = q->elevator->elevator_data;
  struct deadline_rq *drq = RQ_DATA(rq);
  const int data_dir = rq_data_dir(drq->request);
/*deadline请求加入红黑树队列*/
  deadline_add_drq_rb(dd, drq);
  /*设置deadline请求的超时时间,然后加入到fifo队列*/
drq->expires = jiffies + dd->fifo_expire[data_dir];
  list_add_tail(&drq->fifo, &dd->fifo_list[data_dir]);
 /*如果请求可以合并,则还要加入hash链表*/
  if (rq_mergeable(rq))
      deadline_add_drq_hash(dd, drq);
}

deadline_add_request函数首先把请求加入红黑树队列。deadline算法的红黑树根据扇区地址的顺序排序,因此插入过程是以I/O请求的扇区地址作为key。其次要设置请求的超时时间,然后加入先进先出的fifo队列。设置超时时间的目的是防止I/O在队列中时间过长,影响业务的使用。

deadline调度算法I/O出队列的函数是deadline_dispatch_requests,如代码清单11-8所示。

代码清单11-8 deadline_dispatch_requests函数

static int deadline_dispatch_requests(request_queue_t *q, int force)
{
  struct deadline_data *dd = q->elevator->elevator_data;
  const int reads = !list_empty(&dd->fifo_list[READ]);
  const int writes = !list_empty(&dd->fifo_list[WRITE]);
  struct deadline_rq *drq;
  int data_dir;
  if (dd->next_drq[WRITE])
      drq = dd->next_drq[WRITE];
  else
      drq = dd->next_drq[READ];
  if (drq) {
      /* we have a "next request" */
      /*I/O批处理*/
      if (dd->last_sector != drq->request->sector)
          /* end the batch on a non sequential request */
          dd->batching += dd->fifo_batch;
      
      if (dd->batching < dd->fifo_batch)
          /* we are still entitled to batch */
          goto dispatch_request;
  }

deadline算法首先根据一系列设定的条件选择要处理的I/O。

第一部分判断是否连续的批模式。如果当前I/O和前面I/O的扇区地址是连续的而非随机的,满足批模式的条件,则直接进入dispatch_request优先处理。deadline还设置了一个fifo_batch数值,批模式优先处理的I/O个数不能超过这个数值。

if (reads) {
    BUG_ON(RB_EMPTY_ROOT(&dd->sort_list[READ]));
    /*存在读写请求,而写请求已经超过设定的饿死时间的情况下,优先处理写*/
    if (writes && (dd->starved++ >= dd->writes_starved))
        goto dispatch_writes;
    /*如果不存在写超过饿死时间,则处理读请求*/
    data_dir = READ;
    goto dispatch_find_request;
}
if (writes)dispatch_find_request:
  if (deadline_check_fifo(dd, data_dir)) {
      /*如果有读请求超时,则优先处理*/
      /* An expired request exists - satisfy it */
      dd->batching = 0;
      drq = list_entry_fifo(dd->fifo_list[data_dir].next);
      
  } else if (dd->next_drq[data_dir]) {
      /*从请求方向相同(都是读或者都是写)的队列挑选下一个请求*/
      drq = dd->next_drq[data_dir];
  } else {
      /*从红黑树队列选一个请求*/
      dd->batching = 0;
      drq = deadline_find_first_drq(dd, data_dir);
  }
dispatch_request:
  dd->batching++;
  deadline_move_request(dd, drq);
  return 1;

deadline算法第三部分要挑选一个出队列的I/O。首先的条件是检查读请求是否超时。读请求入队列的时候设置了一个超时时间,超过这个时间必须马上处理。如果没超时的读请求,则从同方向的请求队列(都是读或者都是写)选择下一个请求。

最后,如果以上条件都不满足,说明同一个方向已经没有下一个请求,或者一个电梯已经完成(完成从低到高的一个循环),需要重新开始一轮循环,因此从红黑树队列中重新挑选一个新的I/O。挑选I/O的函数是deadline_find_first_drq,如代码清单11-9所示。

代码清单11-9 deadline_find_first_drq函数

static struct deadline_rq *
deadline_find_first_drq(struct deadline_data *dd, int data_dir)
{
  struct rb_node *n = dd->sort_list[data_dir].rb_node;
  for (;;) {
      if (n->rb_left == NULL)
          return rb_entry_drq(n);
      n = n->rb_left;
  }
}

deadline算法重新挑选一个I/O请求的条件,就是根据I/O的方向(读或者写)从相应的红黑树队列中取出最左边的一个I/O。因为硬盘顺序是按照从低到高的扇区地址排序的,选出最左边的I/O,其实就是选择扇区最小的I/O,后续I/O的扇区地址都大于这个I/O,从而开启一轮新的循环。

11.5 I/O的处理过程

11.5.1 I/O插入队列的过程分析

根据前文第10章的分析,文件系统提交I/O都要通过 submit_bio 来提交一个I/O。

1. submit_bio 函数

submit_bio 函数是文件系统VFS层和通用块层的衔接点,本节从这个函数开始分析,如代码清单11-10所示。

/* 代码清单11-10 submit_bio函数 */
void submit_bio(int rw, struct bio *bio)
{        
  /*计算扇区数目*/
  int count = bio_sectors(bio);
  bio->bi_rw |= rw;
  if (rw & WRITE)
      count_vm_events(PGPGOUT, count);
  else
      count_vm_events(PGPGIN, count);
  /*dump io,用来调试*/
  if (unlikely(block_dump)) {
  ....../*省略部分代码*/
  generic_make_request(bio);
}

2. generic_make_request 函数

submit_bio 函数调用 generic_make_request 向通用块层提交一个请求,输入的参数是一个bio结构,generic_make_request 函数要把bio转换为底层处理的请求结构,分两部分介绍。

/* 代码清单11-11 generic_make_request(ll_rw_blk.c) */
void generic_make_request(struct bio *bio)
{
  request_queue_t *q;
  sector_t maxsector;
  int ret, nr_sectors = bio_sectors(bio);
  dev_t old_dev;
  /*在执行较长时间的任务之前,执行一次调度*/
  might_sleep();
  /* Test device or partition size, when known. */
  /*检查是否超过了磁盘的扇区限制*/
  maxsector = bio->bi_bdev->bd_inode->i_size >> 9;
  if (maxsector) {
      sector_t sector = bio->bi_sector;
      if (maxsector < nr_sectors || maxsector - nr_sectors < s
          handle_bad_sector(bio);
          goto end_io;
      }
  }

generic_make_request 函数第一部分检查最大扇区限制。执行I/O的起始扇区地址加I/O大小的结果不应该超过块设备的物理扇区地址,否则就要结束本次I/O,返回错误。

generic_make_request 函数第二部分检查I/O的大小不应该超过块设备的最大可处理扇区。后者是块设备本身的特性,它限制了块设备一次I/O所能处理的最大扇区数目。

  maxsector = -1;
  old_dev = 0;
  do {
      char b[BDEVNAME_SIZE];
      /*获得块设备的队列*/
      q = bdev_get_queue(bio->bi_bdev);
      if (!q) {
end_io:
          bio_endio(bio, bio->bi_size, -EIO);
          break;
      }
      /* I/O超过了设备的物理最大允许扇区*/
      if (unlikely(bio_sectors(bio) > q->max_hw_sectors)) {
          goto end_io;
      }
      if (unlikely(test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))
          goto end_io;
      /*如果磁盘有分区的处理*/ 
      blk_partition_remap(bio);
      ....../*省略I/O追踪的代码*/
      maxsector = bio->bi_sector;
      old_dev = bio->bi_bdev->bd_dev;
      ret = q->make_request_fn(q, bio);
  } while (ret);
}

NOTE

如果I/O所在的块设备是个分区块设备,必须找到分区的结构 hd_struct,I/O的起始扇区地址要加上分区的起始地址,才是真正的物理地址。也要将分区块设备替换为块设备所在物理硬盘的主块设备。

3. __make_request 函数

最后,调用队列提供的 make_request_fn 函数。前文分析过,scsi提供的队列 make_request_fn 就是 __make_request 函数,如代码清单11-12所示。

/* 代码清单11-12 __make_request(ll_rw_blk.c) */
static int __make_request(request_queue_t *q, struct bio *bio)
{
  ....../*省略部分代码*/
  sector = bio->bi_sector;
  nr_sectors = bio_sectors(bio);
  cur_nr_sectors = bio_cur_sectors(bio);
  prio = bio_prio(bio);
  rw = bio_data_dir(bio);
  sync = bio_sync(bio);

__make_request 函数是通用块层的主处理流程。它的作用就是判断I/O能否合并,如果可以合并,则不申请新请求,而是合入前面的请求;如果不能合并,就申请新请求结构,并且插入电梯的队列,分五部分介绍。

第一部分从 bio 结构获得I/O的起始扇区地址和以扇区度量的I/O大小。变量 sync 标志当前I/O是否需要同步处理。带有 sync 标志的I/O和普通I/O的处理方式有所不同。

WARNING

这里的 sync 标志和打开文件时候设置的 O_SYNC 标志不是一回事,两者的处理逻辑也不相同。

__make_request 函数第二部分首先检查是否需要bounce。

  blk_queue_bounce(q, &bio);
  spin_lock_prefetch(q->queue_lock);
  /*检查是否barrier I/O,后面要处理*/
  barrier = bio_barrier(bio);
  if (unlikely(barrier) && (q->next_ordered == QUEUE_ORDERED_N
      err = -EOPNOTSUPP;
      goto end_io;
  }
  spin_lock_irq(q->queue_lock);
  /*如果是barrier请求,或者电梯空,不再判断是否需要合并I/O*/
  if (unlikely(barrier) || elv_queue_empty(q))
      goto get_rq;

TIP

需要bounce的原因是:有些老设备支持的DMA区间不能覆盖内存空间,而bio结构成员 bio_vec 里面提供的内存可能不在设备能访问的内存范围之内,如果这种情况发生,就要另外申请低位内存,以低位内存作为DMA内存,等设备DMA操作完成后,再把数据复制到 bio_vec 提供的内存里。

其次检查I/O是否barrier I/O。对于barrier I/O,就直接进入 get_rq 分支,不再判断是否可以合并,因为barrier I/O的性质决定了它不能和之前的I/O进行合并。

__make_request 函数第三部分处理后向合并。

el_ret = elv_merge(q, &req, bio);
switch (el_ret) {
    /*后向合并*/
    case ELEVATOR_BACK_MERGE:
        BUG_ON(!rq_mergeable(req));
        if (!q->back_merge_fn(q, req, bio))
            break;
        blk_add_trace_bio(q, bio, BLK_TA_BACKMERGE);
        /*合并后,要改bio的尾部为新的这个I/O,同时调整扇区数*/
        req->biotail->bi_next = bio;
        req->biotail = bio;
        req->nr_sectors = req->hard_nr_sectors += nr_sectors;
        req->ioprio = ioprio_best(req->ioprio, prio);
        drive_stat_acct(req, nr_sectors, 0);
        /*调用电梯提供的合并函数*/
        if (!attempt_back_merge(q, req))
            elv_merged_request(q, req);
        goto out;

后向合并指的是当前I/O可以合并到某个请求的尾部,这种情况要把请求的尾部bio改为新的bio,请求的扇区数要加上新bio的扇区数。然后调用电梯提供的合并函数执行I/O合并操作。

__make_request 函数第四部分处理前向合并。

/*前向合并*/
case ELEVATOR_FRONT_MERGE:
  BUG_ON(!rq_mergeable(req));
  if (!q->front_merge_fn(q, req, bio))
      break;
  blk_add_trace_bio(q, bio, BLK_TA_FRONTMERGE);
  bio->bi_next = req->bio;
  req->bio = bio;
  req->buffer = bio_data(bio);
  req->current_nr_sectors = cur_nr_sectors;
  req->hard_cur_sectors = cur_nr_sectors;
  req->sector = req->hard_sector = sector;
  req->nr_sectors = req->hard_nr_sectors += nr_sectors;
  req->ioprio = ioprio_best(req->ioprio, prio);
  drive_stat_acct(req, nr_sectors, 0);
  if (!attempt_front_merge(q, req))
      elv_merged_request(q, req);
  goto out;

前向合并的处理方式和后向合并很相似,区别是前向合并要把新bio置于请求结构中bio链表的头部。然后调用电梯的合并函数执行I/O合并。

__make_request 函数第五部分申请请求。

get_rq:
  req = get_request_wait(q, rw, bio);
  init_request_from_bio(req, bio);
  spin_lock_irq(q->queue_lock);
 /*如果块设备队列和电梯队列都空的,需要阻塞块设备队列*/
  if (elv_queue_empty(q))
      blk_plug_device(q);
      /*把请求加入请求队列*/
      add_request(q, req);
      out:
  /*如果是同步I/O,立即解除队列阻塞,把I/O请求下发*/
  if (sync)
      __generic_unplug_device(q);
      spin_unlock_irq(q->queue_lock);
  return 0;

如果前面部分的前向合并和后向合并都不能执行,就必须申请一个新的请求。__make_request 函数调用 get_request_wait 申请一个新的请求,而 init_request_from_bio 函数则根据bio的类型初始化请求和设置请求标志。通常的请求标志如下列所示。

  • REQ_FAILFAST:一般I/O失败后要重试几次。REQ_FAILFAST 标志不重试,立即返回。
  • REQ_HARDBARRIERREQ_SOFTBARRIER:标志一个barrier请求。
  • REQ_RW_SYNC:同步标志。有同步标志,则立即对电梯队列执行unplug操作。

NOTE

创建新请求后,要对队列执行plug和unplug操作。plug就是阻塞,一个阻塞的队列是不能下发I/O的,要下发I/O,必须执行unplug。plug队列的同时要启动一个定时器(默认3毫秒),在时间到达后unplug队列,开始下发I/O。plug队列和定时器的设置,说明I/O不是马上下发,需要等待后续的I/O。对带有 sync 标志的I/O是个特例,要立即unplug。 plug和unplug针对的是块设备队列,操作对象并不一定是同一个I/O。plug时阻塞的I/O,不一定在unplug时被下发,可能下发别的I/O。

4. elv_merge 函数

__make_request 函数里面,判断两个I/O能否合并,使用了 elv_merge 函数,需要分析一下它的实现,函数代码如代码清单11-13所示。

/* 代码清单11-13 elv_merge函数 */
int elv_merge(request_queue_t *q, struct request **req, struct
{
  elevator_t *e = q->elevator;
  int ret;
  /*判断块设备队列能否合并?*/
  if (q->last_merge) {
      ret = elv_try_merge(q->last_merge, bio);
      if (ret != ELEVATOR_NO_MERGE) {
          *req = q->last_merge;
          return ret;
      }
  }
  /*否则在电梯队列里面查找,看能否合并*/ 
  if (e->ops->elevator_merge_fn)
      return e->ops->elevator_merge_fn(q, req, bio);
  return ELEVATOR_NO_MERGE;
} 
 
static inline int elv_try_merge(struct request *__rq, struct b
{
  int ret = ELEVATOR_NO_MERGE;
  if (elv_rq_merge_ok(__rq, bio)) {
      if (__rq->sector + __rq->nr_sectors == bio->bi_sector)
          ret = ELEVATOR_BACK_MERGE;
      else if (__rq->sector - bio_sectors(bio) == bio->bi_sect)
          ret = ELEVATOR_FRONT_MERGE;
  }
  return ret;
}

elv_merge 函数通过两步来判断I/O能否合并。

第一步判断能否和块设备队列的最后一个请求合并。判断是通过请求的扇区地址决定的,如果请求的最后扇区位置是10000,而新的I/O从10000开始,那就是后向合并。如果新的I/O从9995开始,而I/O长度是五个扇区,那就是前向合并。

第二步调用电梯队列提供的函数判断能否合并。I/O调度算法一般都要提供自己的合并函数。比如deadline调度算法提供的合并函数就要在电梯队列中寻找能和当前I/O合并的请求。

5. __elv_add_request 函数

对于不能合并的I/O,需要把一个请求插入到队列。这通过函数 add_request 实现。它又是通过调用 __elv_add_request 函数实现插入的功能,因此直接分析 __elv_add_request 函数,它的代码如代码清单11-14所示。

/* 代码清单11-14 __elv_add_request函数 */
void __elv_add_request(request_queue_t *q, struct request *rq,
                    int where, int plug)
{
  if (q->ordcolor)
      rq->flags |= REQ_ORDERED_COLOR;
  if (rq->flags & (REQ_SOFTBARRIER | REQ_HARDBARRIER)) {
      if (blk_barrier_rq(rq))
          q->ordcolor ^= 1;
      if (where == ELEVATOR_INSERT_SORT)
          where = ELEVATOR_INSERT_BACK;
      /*如果是文件系统来的请求,则更新end_sector和边界请求*/ 
      if (blk_fs_request(rq)) {
          q->end_sector = rq_end_sector(rq);
          q->boundary_rq = rq;
      }
  } else if (!(rq->flags & REQ_ELVPRIV) && where == ELEVATOR_I
      where = ELEVATOR_INSERT_BACK;
  /*阻塞块设备队列*/
  if (plug)
      blk_plug_device(q);
  elv_insert(q, rq, where);
}

I/O插入电梯队列时,要判断I/O是否barrier请求。barrier I/O要求保证顺序,因此barrier I/O之前的I/O请求必须完成,所以对于barrier请求,把默认插入方式改为从后方加入。

6. elv_insert 函数

函数最后调用了 elv_insert,这个函数要根据插入的位置执行,它的代码如代码清单11-15所示。

/* 代码清单11-15 elv_insert函数 */
void elv_insert(request_queue_t *q, struct request *rq, int wh
{
  struct list_head *pos;
  unsigned ordseq;
  int unplug_it = 1;
  blk_add_trace_rq(q, rq, BLK_TA_INSERT);
  rq->q = q;
  switch (where) {
  case ELEVATOR_INSERT_FRONT:
  /*对前向插入,标志加一个SOFTBARRIER,请求直接加入块设备队列的头*/
      rq->flags |= REQ_SOFTBARRIER;
      list_add(&rq->queuelist, &q->queue_head);
      break;

elv_insert 函数第一部分是处理前向插入。前向插入意味着新请求位置在所有队列I/O的最前面。这种情况请求不需要加入电梯队列,因为它不需要在电梯中等待,而是直接插入到块设备队列的最前面。

elv_insert 函数第二部分是处理后向插入。后向插入意味着新请求位置在所有队列I/O的最后面。

  case ELEVATOR_INSERT_BACK:
  /*后向插入,要排空队列中的I/O.然后把请求加入块设备队列的链表尾*/
      rq->flags |= REQ_SOFTBARRIER;
      elv_drain_elevator(q);
      list_add_tail(&rq->queuelist, &q->queue```c
      /*移走阻塞标志,调用request_fn开始I/O*/
      blk_remove_plug(q);
      q->request_fn(q);
      break;

现有电梯队列中的I/O怎么办?必须全部下发,所以首先要调用 elv_drain_elevator 函数排空电梯队列的所有I/O,排空之后所有的I/O进入块设备队列,然后把请求加到块设备队列的尾部,实现后向插入的要求.最后unplug队列,调用队列的 request_fn 函数从块设备队列挑选I/O执行.

elv_insert 函数第三部分是处理按顺序插入和重插.

case ELEVATOR_INSERT_SORT:
     BUG_ON(!blk_fs_request(rq));
     rq->flags |= REQ_SORTED;
     q->nr_sorted++;
     /*更新last_merge,即最后一个可merge的请求*/
     if (q->last_merge == NULL && rq_mergeable(rq))
          q->last_merge = rq;
     q->elevator->ops->elevator_add_req_fn(q, rq);
     break;
case ELEVATOR_INSERT_REQUEUE:
     /*重插队列*/ 
     rq->flags |= REQ_SOFTBARRIER;
     /*如果队列无保证顺序操作,则插入队列头*/ 
     if (q->ordseq == 0) {
          list_add(&rq->queuelist, &q->queue_head);
          break;
     }
     /*根据请求的性质,判断重插的位置*/ 
     ordseq = blk_ordered_req_seq(rq);
     list_for_each(pos, &q->queue_head) {
          struct request *pos_rq = list_entry_rq(pos);
          if (ordseq <= blk_ordered_req_seq(pos_rq))
               break;
     }
     list_add_tail(&rq->queuelist, pos);
     unplug_it = 0;
     break;

按顺序插入的情况比较简单,调用电梯队列提供的插入函数 elevator_add_req_fn 即可.

重插比较复杂,必须考虑I/O的顺序关系.重插入队列,是指I/O虽然返回,但是没有成功完成I/O,需要重新插入电梯队列的情况.如果此时队列中不必保证顺序,把I/O简单地插入块设备队列头部即可.否则,就必须考虑I/O的位置.如果需要保证顺序,说明有barrier I/O需要处理,回顾前面分析的背景知识,barrier I/O之前的I/O必须先完成,然后再完成barrier I/O,barrier I/O之后的I/O必须等barrier I/O完成之后再执行.ordseq 变量目的就是检查I/O的顺序,判断是barrier I/O之前的I/O,还是之后的I/O,或者是barrier I/O自身.根据这个顺序关系,逐个遍历整个队列,把I/O重新插入到正确的位置.

elv_insert 函数第四部分检查队列的限制.

/*如果请求达到限制,则unplug队列*/
  if (unplug_it && blk_queue_plugged(q)) {
      int nrq = q->rq.count[READ] + q->rq.count[WRITE]
           - q->in_flight;
      if (nrq >= q->unplug_thresh)
           __generic_unplug_device(q);
  }

为了避免过多I/O请求在队列中堆积,队列设置了一个数值,当请求个数超过这个限制,就unplug队列,开始挑选I/O执行.

11.5.2 I/O 出队列的过程分析

什么条件下 I/O 从队列出来真正下发到硬盘?总结内核中的处理过程,列出以下几个条件.

  • 第一个 I/O 启动了 3 毫秒的定时器,时间到了,会执行 unplug 函数,开始下发 I/O.
  • 请求数目超过设定的限制(默认是 4),执行 unplug 函数,开始下发.
  • 带有 sync 标志的 I/O,立即执行 unplug 函数,开始下发.
  • barrier I/O,需要先清空电梯队列,然后执行 unplug 函数,开始下发.
  • 当硬盘执行完毕一个 I/O,也要 unplug 队列,检查是否有 I/O 可以执行.

1. unplug 函数

I/O 的下发是通过 unplug 函数实现的,因此首先分析这个函数,它的代码如代码清单 11-16 所示.

代码清单 11-16 __generic_unplug_device

void __generic_unplug_device(request_queue_t *q)
{
    if (unlikely(blk_queue_stopped(q)))
        return;
    if (!blk_remove_plug(q))
        return;
    q->request_fn(q);
}

unplug 函数首先要检查队列是否设置了 stop 标志,设置这个标志将停止队列,不能进行 unplug 操作.其次清除队列的 plug 标志,然后调用队列的 request_fn 函数.

2. scsi_request_fn 函数

对于 scsi 设备而言,这个函数就是 scsi_request_fn,如代码清单 11-17 所示.

代码清单 11-17 scsi_request_fn 函数

static void scsi_request_fn(struct request_queue *q)
{
    ……/*省略部分代码*/
    while (!blk_queue_plugged(q)) {
        int rtn;
        req = elv_next_request(q);
        /*检查队列是否ready,这个队列是块设备本身的设备队列,非上文的块设备*/
        if (!req || !scsi_dev_queue_ready(q, sdev))
            break;
        /*设备状态是否online*/ 
        if (unlikely(!scsi_device_online(sdev))) {
            scsi_kill_request(req, q);
            continue;
        }

scsi_request_fn 函数主体是个循环,它要一直执行直到队列为空或者 HBA 卡不能够再接收 I/O 为止.函数第一部分调用 elv_next_request 从块设备队列获得一个请求.然后检查块设备队列是否 ready 以及 scsi 设备状态是否 online.

scsi_request_fn 函数第二部分仍然检查各种异常情况,参考代码中添加的注释.如果无异常情况,把 I/O 请求从电梯队列中删除.

        if (!(blk_queue_tagged(q) && !blk_queue_start_tag(q, req)))
            blkdev_dequeue_request(req);
        sdev->device_busy++;
        spin_unlock(q->queue_lock);
        cmd = req->special;
        /*检查命令是否为空*/
        if (unlikely(cmd == NULL)) {
            BUG();
        }
        spin_lock(shost->host_lock);
        /*HBA卡的host队列是否ready*/
        if (!scsi_host_queue_ready(q, shost, sdev))
            goto not_ready;
        if (sdev->single_lun) {
            /*检查target的用户是否就是sdev本身*/
            if (scsi_target(sdev)->starget_sdev_user &&
                scsi_target(sdev)->starget_sdev_user != sdev)
                goto not_ready;
            scsi_target(sdev)->starget_sdev_user = sdev;
        }
        shost->host_busy++;
        spin_unlock_irq(shost->host_lock);

scsi_request_fn 函数第三部分,首先初始化保存 scsi 错误返回值的数据 buffer,然后调用 scsi_dispatch_cmd 将 I/O 下发到驱动.

        scsi_init_cmd_errh(cmd);
        rtn = scsi_dispatch_cmd(cmd);
        spin_lock_irq(q->queue_lock);
        if(rtn) {
            /*出错,尝试重新plug队列*/
            if(sdev->device_busy == 0)
                blk_plug_device(q);
            break;
        }
    }
    goto out;

scsi_request_fn 函数第四部分是两个处理错误分支.

not_ready:
    spin_unlock_irq(shost->host_lock);
    spin_lock_irq(q->queue_lock);
    blk_requeue_request(q, req);
    sdev->device_busy--;
    if(sdev->device_busy == 0)
        blk_plug_device(q);
out:
    spin_unlock_irq(q->queue_lock);
    put_device(&sdev->sdev_gendev);
    spin_lock_irq(q->queue_lock);
}

not_ready 分支说明设备还没有准备好,因此把 I/O 重新插入电梯的队列.而 out 分支则只减少引用计数,不对 I/O 进行操作.这是因为进入 out 分支有两种情况,要么队列已经空,I/O 全部下放到驱动,要么 I/O 还没有真正从电梯队列解除,这两种情况都不需要将 I/O 重新插入电梯队列.

3. elv_next_request 函数

scsi_request_fn 函数中,获得一个 I/O 请求使用的是 elv_next_request 函数.这个函数从块设备队列获得一个 I/O,如代码清单 11-18 所示.

代码清单 11-18 elv_next_request 函数

struct request *elv_next_request(request_queue_t *q)
{
    struct request *rq;
    int ret;
 
    while ((rq = __elv_next_request(q)) != NULL) {
        if (!(rq->flags & REQ_STARTED)) {
            elevator_t *e = q->elevator;
            if (blk_sorted_rq(rq) && e->ops->elevator_activate
                e->ops->elevator_activate_req_fn(q, rq);
            rq->flags |= REQ_STARTED;
            blk_add_trace_rq(q, rq, BLK_TA_ISSUE);
        }
        /*设置结束扇区*/
        if (!q->boundary_rq || q->boundary_rq == rq) {
            q->end_sector = rq_end_sector(rq);
            q->boundary_rq = NULL;
        }

elv_next_request 函数第一部分调用 __elv_next_request 从队列中获得一个 I/O 请求.如果该 I/O 未设置 REQ_STARTED 标志,说明设备驱动第一次见到这个请求,这种情况需要通知 I/O 调度算法.然后为该 I/O 设置 REQ_STARTED 标志.

如果队列的 boundary_rq 等于该 I/O,该 I/O 此时要离开队列,因此设置队列的 boundary_rq 为空并设置队列最终扇区地址为该 I/O 的最终扇区地址.如果队列的 boundary_rq 为空,执行同样的设置.

elv_next_request 函数第二部分调用队列的 prep_rq_fn 函数来预处理 I/O.根据预处理结果执行不同的处理.

        if ((rq->flags & REQ_DONTPREP) || !q->prep_rq_fn)
            break;
        ret = q->prep_rq_fn(q, rq);
        if (ret == BLKPREP_OK) {
            /*ok,返回请求,准备下发*/
            break;
        } else if (ret == BLKPREP_DEFER) {
            /*有错,保留请求在队列*/
            rq = NULL;
            break;
        } else if (ret == BLKPREP_KILL) {
            /*出错误了,结束这个请求*/
            int nr_bytes = rq->hard_nr_sectors << 9;
            if (!nr_bytes)
                nr_bytes = rq->data_len;
            blkdev_dequeue_request(rq);
            rq->flags |= REQ_QUIET;
            end_that_request_chunk(rq, 0, nr_bytes);
            end_that_request_last(rq, 0);
        } else {

如果结果正确,则返回该 I/O,如果结果为 BLKPREP_DEFER,意味着部分正确,I/O 放在队列前面,但是不返回该 I/O,等待下次挑选 I/O 的动作.第一部分设置 REQ_STARTED 标志将阻止其他 I/O 越过该 I/O.如果结果为 BLKPREP_KILL,说明该 I/O 存在严重错误,需要从队列中删除并结束该 I/O.

真正从队列获取请求的函数是 __elv_next_request,它的代码如代码清单 11-19 所示.

代码清单 11-19 __elv_next_request

static inline struct request *__elv_next_request(request_queue_t *q)
{
    struct request *rq;
    while (1) {
        while (!list_empty(&q->queue_head)) {
            rq = list_entry_rq(q->queue_head.next);
            if (blk_do_ordered(q, &rq))
                return rq;
        }
        if (!q->elevator->ops->elevator_dispatch_fn(q, 0))
            return NULL;
    }
}

如果块设备队列不空,__elv_next_request 函数直接从块设备队列获得一个 I/O 请求,如果队列为空,则调用 elevator_dispatch_fn 函数从电梯队列获得一个请求,然后加入块设备队列的尾部.因为函数主体是一个循环,下一轮循环时,就可以从块设备队列获得请求了.

4. blk_do_ordered 函数

从队列获取一个请求后,要调用 blk_do_ordered 函数检查 I/O 的顺序,它的功能是为了处理 barrier I/O.如代码清单 11-20 所示.

代码清单 11-20 blk_do_ordered 函数

int blk_do_ordered(request_queue_t *q, struct request **rqp)
{
    struct request *rq = *rqp;
    /*判断是否为barrier I/O*/
    int is_barrier = blk_fs_request(rq) && blk_barrier_rq(rq);
    if (!q->ordseq) {  
        /*非barrier请求,返回*/
        if (!is_barrier)
            return 1;
        if (q->next_ordered != QUEUE_ORDERED_NONE) {
            *rqp = start_ordered(q, rq);
            return 1;
        } else {
            /*如果队列切换到ORDERED_NONE,清I/O,错误标志设置为功能不支持*/
            blkdev_dequeue_request(rq);
            end_that_request_first(rq, -EOPNOTSUPP, rq->hard_nr_sectors);
            end_that_request_last(rq, -EOPNOTSUPP);
            *rqp = NULL;
            return 0;
        }
    }

blk_do_ordered 函数第一部分检查之前在队列中是否设置了需要保证顺序的 I/O,如果没有且新挑选的 I/O 不是 barrier I/O,不需要保证顺序的处理,直接返回.如果该 I/O 是个 barrier I/O,调用 start_ordered 执行保证顺序的操作.如果队列切换为 ORDERED_NONE,说明队列不支持保证顺序的功能,直接清除 I/O 并返回.

blk_do_ordered 函数第二部分处理特殊情况,此时队列中已经存在需要保证顺序的 I/O.如果队列设置了 QUEUE_ORDERED_TAG 标志,说明硬件支持 TAG 队列,硬件可以对 I/O 执行顺序进行控制,只阻塞下一个 barrier I/O 就可以,不是 barrier I/O 的其他 I/O 可以放行,下发到驱动执行.

    if (!blk_fs_request(rq) && rq != &q->pre_flush_rq && 
                               rq != &q->post_flush_rq)
        return 1;
    if (q->ordered & QUEUE_ORDERED_TAG) {
        /* Ordered by tag.  Blocking the next barrier is enough. */
        if (is_barrier && rq != &q->bar_rq)
            *rqp = NULL;
    } else {
        /* Ordered by draining.  Wait for turn. */
        WARN_ON(blk_ordered_req_seq(rq) < blk_ordered_cur_seq(q));
        if (blk_ordered_req_seq(rq) > blk_ordered_cur_seq(q))
            *rqp = NULL;
    }
    return 1;
}

如果队列未设置 QUEUE_ORDERED_TAG 标志,需调用 blk_ordered_req_seq 检查 I/O 的顺序关系,判断 I/O 是否可以执行.如果 I/O 的顺序大于当前队列的处理顺序,说明 I/O 应该在后面处理,因此不应该被处理.I/O 顺序是 barrier I/O 必须要考虑的重要方面,当 I/O 没有成功完成,需要重新插入电梯队列的时候,也需要考虑 I/O 顺序,必须把 I/O 插入到正确的位置.

5. start_ordered 函数

start_ordered 函数根据要求设置同步 cache 命令,保证同步 cache 命令之前的 I/O 必须完成,它的代码如代码清单 11-21 所示.

代码清单 11-21 start_ordered 函数

static inline struct request *start_ordered(request_queue_t *q, struct request *rq)
{
    q->bi_size = 0;
    q->orderr = 0;
    q->ordered = q->next_ordered;
    /*标记队列的保序操作开始*/
    q->ordseq |= QUEUE_ORDSEQ_STARTED;
    blkdev_dequeue_request(rq);
    q->orig_bar_rq = rq;
    rq = &q->bar_rq;
    rq_init(q, rq);
    rq->flags = bio_data_dir(q->orig_bar_rq->bio);
    /*设备是否支持FUA?*/
    rq->flags |= q->ordered & QUEUE_ORDERED_FUA ? REQ_FUA : 0;
    rq->elevator_private = NULL;
    rq->rl = NULL;
    init_request_from_bio(rq, q->orig_bar_rq->bio);
    rq->end_io = bar_end_io;
}

start_ordered 函数第一部分设置 I/O 请求的必要参数.由于这是一个 barrier I/O,所以将 I/O 保存在队列的 orig_bar_rq 成员后,使用了队列的 bar_rq 成员作为真正下发 I/O 的数据结构,并根据原来的 bio 结构初始化 bar_rq 成员的参数,对于 barrier I/O,它的 I/O 完成函数被设置为特殊提供的 bar_end_io 函数.

start_ordered 函数第二部分设置同步 cache 的命令.

    /*是否需要后刷,如果需要,则增加一个同步cache的命令,插入块设备队列头*/
    if (q->ordered & QUEUE_ORDERED_POSTFLUSH)
        queue_flush(q, QUEUE_ORDERED_POSTFLUSH);
    else
        q->ordseq |= QUEUE_ORDSEQ_POSTFLUSH;
    /*barrier I/O插入队列的头部*/
    elv_insert(q, rq, ELEVATOR_INSERT_FRONT);
    /*是否需要前刷?如果需要,增加一个同步cache命令,插入块设备队列头*/
    if (q->ordered & QUEUE_ORDERED_PREFLUSH) {
        queue_flush(q, QUEUE_ORDERED_PREFLUSH);
        rq = &q->pre_flush_rq;
    } else
        q->ordseq |= QUEUE_ORDSEQ_PREFLUSH;
    if ((q->ordered & QUEUE_ORDERED_TAG) || q->in_flight == 0)
        q->ordseq |= QUEUE_ORDSEQ_DRAIN;
    else
        rq = NULL;
    return rq;
}

如果队列带有 QUEUE_ORDERED_POSTFLUSH 标志,这种情况在队列头部插入一个同步 cache 命令.然后插入 barrier I/O 自身,最后如果队列带有 QUEUE_ORDERED_PREFLUSH 标志,说明要把之前下发的 I/O 执行完毕,还要插入一个同步 cache 命令,这个命令的目的是强制之前的 I/O 执行完毕.因为三次插入都是插入队列的头部,所以先插入 POSTFLUSH 同步 cache 命令,再插入 barrier I/O 自身,最后插入 PREFLUSH 同步 cache 命令.执行时,实际是 PREFLUSH 同步 cache 命令最先执行.

6. scsi_dispatch_cmd 函数

从电梯取得 I/O 请求后,要初始化请求的参数,然后检查队列和设备是否准备好.如果一切正常,调用 scsi_dispatch_cmd 发送 I/O 请求,如代码清单 11-22 所示.

代码清单 11-22 scsi_dispatch_cmd 函数

cc int scsi_dispatch_cmd(struct scsi_cmnd *cmd) { struct Scsi_Host *host = cmddevicehost; unsigned long flags = 0; unsigned long timeout; int rtn = 0;

/* 检查设备是否可用 */
if (unlikely(cmd->device->sdev_state == SDEV_DEL)) {
    /*如果设备不可用,则终结这个I/O命令*/
    cmd->result = DID_NO_CONNECT << 16;
    atomic_inc(&cmd->device->iorequest_cnt);
    __scsi_done(cmd);
    /* return 0 (because the command has been processed) */
    goto out;
}
/*检查设备是否阻塞,如果阻塞,要重新把命令插入电梯队列*/
if (unlikely(cmd->device->sdev_state == SDEV_BLOCK)) {
    scsi_queue_insert(cmd, SCSI_MLQUEUE_DEVICE_BUSY);
    SCSI_LOG_MLQUEUE(3, printk("queuecommand : device block\n"));
    goto out;
}
if (cmd->device->scsi_level <= SCSI_2 &&
    cmd->device->scsi_level != SCSI_UNKNOWN) {
    cmd->cmnd[1] = (cmd->cmnd[1] & 0x1f) | (cmd->device->lun << 5);
}
/*检查设备的reset时钟,避免设备未准备好的情况*/
timeout = host->last_reset + MIN_RESET_DELAY;
if (host->resetting && time_before(jiffies, timeout)) {
    int ticks_remaining = timeout - jiffies;
    while (--ticks_remaining >= 0)
        mdelay(1 + 999 / HZ);
    host->resetting = 0;
}
/*设置该scsi命令的超时时间*/
scsi_add_timer(cmd, cmd->timeout_per_command, scsi_times_out);
scsi_log_send(cmd);
atomic_inc(&cmd->device->iorequest_cnt);
/*检查命令是否超过hba支持的最大命令长度,超过结束命令*/
if (CDB_SIZE(cmd) > cmd->device->host->max_cmd_len) {
    cmd->result = (DID_ABORT << 16);
    scsi_done(cmd);
    goto out;
}
spin_lock_irqsave(host->host_lock, flags);
scsi_cmd_get_serial(host, cmd);
if (unlikely(host->shost_state == SHOST_DEL)) {
    cmd->result = (DID_NO_CONNECT << 16);
    scsi_done(cmd);
} else {
    /*调用HBA提供的命令函数*/
    rtn = host->hostt->queuecommand(cmd, scsi_done);
}
spin_unlock_irqrestore(host->host_lock, flags);
if (rtn) {
    /*命令执行中出错,则删掉命令的计时器,然后把命令再次插入队列*/
    if (scsi_delete_timer(cmd)) {
        atomic_inc(&cmd->device->iodone_cnt);
        scsi_queue_insert(cmd,
            (rtn == SCSI_MLQUEUE_DEVICE_BUSY) ?
             rtn : SCSI_MLQUEUE_HOST_BUSY);
    }
}
/* ... 省略后续部分 */

}


`scsi_dispatch_cmd` 函数提供了很多注释,主要是处理很多设备不可用的情况。如果设备可用,则调用 hba 卡提供的 `queuecommand` 把 scsi 命令提交给硬盘驱动,同时提供了一个回调函数 `scsi_done`。

异常处理有多种情况,如果设备不可用,要停止 I/O,返回错误。如果是执行 I/O 中错误,而且符合重试条件,要把 I/O 重新插入电梯队列,再次执行 I/O。

### 11.5.3 I/O 返回路径

`scsi_dispatch_cmd` 提交 scsi 命令时,要提供一个回调函数 `scsi_done`。这个回调是在硬盘驱动的中断中调用的。scsi 命令从中断中返回,意味着这个命令已经执行完毕,有可能是成功执行了 scsi 命令,也有可能出现错误。不管成功还是错误,内核都需要处理这两种情况。

`scsi_done` 是 I/O 返回路径上的第一个函数,因此就从这个函数开始分析过程,它的代码如代码清单 11-23 所示。

**代码清单 11-23 `scsi_done` 函数**

```c
static void scsi_done(struct scsi_cmnd *cmd)
{
  if (!scsi_delete_timer(cmd))
      return;
  __scsi_done(cmd);
}

scsi_done 函数首先要停止 scsi 命令的计时器,然后调用 __scsi_done__scsi_done 函数也比较简单,如代码清单 11-24 所示。

代码清单 11-24 __scsi_done 函数

void __scsi_done(struct scsi_cmnd *cmd)
{        
  struct request *rq = cmd->request;
  cmd->serial_number = 0;
  /*将命令的完成次数加1*/
  atomic_inc(&cmd->device->iodone_cnt);
  /*如果result不为0,错误计数加1*/
  if (cmd->result)
      atomic_inc(&cmd->device->ioerr_cnt);
  rq->completion_data = cmd;
  blk_complete_request(rq);
}

中断返回函数的参数是一个 scsi 命令,__scsi_done 函数首先要从 scsi 命令结构中获得当初下发 I/O 时使用的请求对象,然后调用 blk_complete_request 函数来处理一个 I/O 的返回过程。

blk_complete_request 是中断上下文的最后一个函数,它要启动一个软中断继续处理 I/O,如代码清单 11-25 所示。

代码清单 11-25 blk_complete_request 函数

void blk_complete_request(struct request *req)
{
  struct list_head *cpu_list;
  unsigned long flags;
 
  BUG_ON(!req->q->softirq_done_fn);
 
  local_irq_save(flags);
  cpu_list = &__get_cpu_var(blk_cpu_done);
  /*将请求加入每cpu变量链表blk_cpu_done*/  
  list_add_tail(&req->donelist, cpu_list);
  /*启动一个软中断*/
  raise_softirq_irqoff(BLOCK_SOFTIRQ);
  local_irq_restore(flags);
}

截至目前,代码一直在硬盘驱动的中断上下文中执行,而中断上下文具有高优先级,会阻塞硬盘自身的中断,因此 blk_complete_request 函数启动块设备的软中断来处理中断的下半部。将中断处理代码划分为两部分,是内核的常见方式。网络设备的中断处理代码中,同样启动软中断来处理中断的下半部。

为了传递参数到软中断上下文,内核定义了一个 CPU 变量 blk_cpu_done,这是一个链表,返回 I/O 的请求结构要加入该链表。块设备软中断的处理函数是 blk_done_softirq。该函数要从 CPU 变量 blk_cpu_done 获得已经完成的 I/O 请求,然后调用队列的软中断处理函数 softirq_done_fn 处理。前文已经分析过,这个软中断处理函数是 scsi_softirq_done。它的代码如代码清单 11-26 所示。

代码清单 11-26 scsi_softirq_done 函数

static void scsi_softirq_done(struct request *rq)
{
  struct scsi_cmnd *cmd = rq->completion_data;
  unsigned long wait_for = (cmd->allowed + 1) * cmd->timeout_per;
  int disposition;
 
  /*初始化错误I/O链表*/
  INIT_LIST_HEAD(&cmd->eh_entry);
  /*判断I/O是否成功完成*/
  disposition = scsi_decide_disposition(cmd);
  if (disposition != SUCCESS &&
     time_before(cmd->jiffies_at_alloc + wait_for, jiffies)) {
     /*已经超时了,则设置命令完成,不再重复执行命令*/ 
     disposition = SUCCESS;
  }
  scsi_log_completion(cmd, disposition);
  switch (disposition) {
     case SUCCESS:
          scsi_finish_command(cmd);
          break;
     case NEEDS_RETRY:
          scsi_retry_command(cmd);
          break;
     case ADD_TO_MLQUEUE:
          scsi_queue_insert(cmd, SCSI_MLQUEUE_DEVICE_BUSY);
          break;
     default:
          if (!scsi_eh_scmd_add(cmd, 0))
               scsi_finish_command(cmd);
  }
}

软中断处理函数 scsi_softirq_done 根据命令执行的结果,分为四种情况来处理:

  1. 结束命令,交给上层处理(这种情况并不一定命令成功完成了,如果命令超时了,也要结束命令,交给上层处理);
  2. 重试命令;
  3. 命令重新插入电梯队列,重新排队;
  4. 由 scsi 错误处理线程来处理。

scsi_decide_disposition 函数值得仔细研究,这个函数总结了所有的 scsi 错误类型,对每种错误类型给出了处理措施。

如果 I/O 正常完成,调用 scsi_finish_command 函数返回上层,这个函数比较简单,它最终又调用了命令本身的 done 函数。done 函数是在初始化 scsi 命令时调用 sd_init_command 函数设置为 sd_rw_intr,同时还设置了 scsi 命令的最大重试次数和超时时间值。

1. sd_rw_intr 函数

sd_rw_intr 函数要对 scsi 命令的各种错误进行处理,如代码清单 11-27 所示。

代码清单 11-27 sd_rw_intr 函数

static void sd_rw_intr(struct scsi_cmnd * SCpnt)
{
  int result = SCpnt->result;
  unsigned int xfer_size = SCpnt->request_bufflen;
  unsigned int good_bytes = result ? 0 : xfer_size;
  u64 start_lba = SCpnt->request->sector;
  u64 bad_lba;
  struct scsi_sense_hdr sshdr;
  int sense_valid = 0;
  int sense_deferred = 0;
  int info_valid;
 
  if (result) {
      /*设置scsi命令的sense_key和asc值,ascq值 */
      sense_valid = scsi_command_normalize_sense(SCpnt, &sshdr);
      if (sense_valid)
          sense_deferred = scsi_sense_is_deferred(&sshdr);
  }
 
  if (driver_byte(result) != DRIVER_SENSE &&
     (!sense_valid || sense_deferred))
      goto out;
 
  /*第一部分,如果scsi命令的返回结果result不为0,说明命令执行中产生了错误.scsi协议定义了可能的错误类型,通过三个参数共同说明错误类型,这三个参数是sense key、asc和acsq.参数的详细定义读者需要参考scsi协议文档,本书只对代码中处理的部分错误进行解释.*/
 
  /*sd_rw_intr函数第二部分主要是检查scsi命令的sense key*/
  switch (sshdr.sense_key) {
  case HARDWARE_ERROR:     /*硬件错误*/
  case MEDIUM_ERROR:       /*坏道*/
      if (!blk_fs_request(SCpnt->request))
          goto out;
      info_valid = scsi_get_sense_info_fld(SCpnt->sense_buffer,
                                           SCSI_SENSE_BUFFERSIZE,
                                           &bad_lba);
      if (!info_valid)
          goto out;
      if (xfer_size <= SCpnt->device->sector_size)
          goto out;
      switch (SCpnt->device->sector_size) {
      case 256:
          start_lba <<= 1;
          break;
      case 512:
          break;
      case 1024:
          start_lba >>= 1;
          break;
      case 2048:
          start_lba >>= 2;
          break;
      case 4096:
          start_lba >>= 3;
          break;
      default:
          /* Print something here with limiting frequency. */
          goto out;
          break;
      }
      good_bytes = (bad_lba - start_lba) * SCpnt->device->sector_size;
      break;
  case RECOVERED_ERROR:              /*可修复的错误*/
  case NO_SENSE:                     /*no sense,成功执行*/
      SCpnt->result = 0;
      memset(SCpnt->sense_buffer, 0, SCSI_SENSE_BUFFERSIZE);
      good_bytes = xfer_size;
      break;
  case ILLEGAL_REQUEST:              /*不合法的请求*/
      if (SCpnt->device->use_10_for_rw &&
         (SCpnt->cmnd[0] == READ_10 ||
          SCpnt->cmnd[0] == WRITE_10))
          SCpnt->device->use_10_for_rw = 0;
      if (SCpnt->device->use_10_for_ms &&
         (SCpnt->cmnd[0] == MODE_SENSE_10 ||
          SCpnt->cmnd[0] == MODE_SELECT_10))
          SCpnt->device->use_10_for_ms = 0;
      break;
  default:
      break;
  }
 
 out:
  scsi_io_completion(SCpnt, good_bytes);
}

如果 sense key 不够,还需要一些附加的信息,通过 sense_buffer 里面的数据 ascascq 表示。sense key 主要包括以下几个:

  • HARDWARE_ERROR:硬件错误。
  • MEDIUM_ERROR:坏道错误。
  • RECOVERED_ERROR:可修复的错误。
  • NO_SENSE:无 sense 数据。
  • ILLEGAL_REQUEST:不合法的请求。

NOTE

RECOVERED_ERRORNO_SENSE 代表命令已经成功执行了,这两个 sense key 作用只是提示用户。

变量 good_bytes 的含义有两种:

  1. 如果命令成功完成,good_bytes 就是 scsi 命令设定的读写数值;
  2. 如果命令部分完成(比如由于坏道错误只读了一部分数据),good_bytes 代表完成的字节数。

2. scsi_io_completion 函数

good_bytes 要作为输入参数调用 scsi_io_completion,由这个函数继续 I/O 的返回过程,如代码清单 11-28 所示。

代码清单 11-28 scsi_io_completion 函数

void scsi_io_completion(struct scsi_cmnd *cmd, unsigned int good_bytes)
{
  int result = cmd->result;
  int this_count = cmd->request_bufflen;
  request_queue_t *q = cmd->device->request_queue;
  struct request *req = cmd->request;
  int clear_errors = 1;
  struct scsi_sense_hdr sshdr;
  int sense_valid = 0;
  int sense_deferred = 0;
 
  scsi_release_buffers(cmd);
 
  if (result) {
      sense_valid = scsi_command_normalize_sense(cmd, &sshdr);
      if (sense_valid)
          sense_deferred = scsi_sense_is_deferred(&sshdr);
  }
 
  /*scsi命令来自SG_IO,也就是说不是来自于文件系统*/ 
  if (blk_pc_request(req)) { /* SG_IO ioctl from block level */
      req->errors = result;
      if (result) {
          clear_errors = 0;
          if (sense_valid && req->sense) {
              /*
               * SG_IO wants current and deferred errors
               */
              int len = 8 + cmd->sense_buffer[7];
              /*复制返回的sense buffer内容*/  
              if (len > SCSI_SENSE_BUFFERSIZE)
                  len = SCSI_SENSE_BUFFERSIZE;
              memcpy(req->sense, cmd->sense_buffer, len);
              req->sense_len = len;
          }
      } else
          req->data_len = cmd->resid;
  }

scsi_io_completion 第一部分和 sd_rw_intr 函数一样,都根据 scsi 命令是否成功完成,将 sense key 和 asc、ascq 值复制出来。然后要检查 I/O 命令是否来自 SG_IOSG_IO 指 scsi 命令来自于文件系统的 I/O control 调用,而不是来自文件系统本身)。这种情况要把返回命令的 sense buffer 整个复制出来。

scsi_io_completion 第二部分调用 scsi_end_request 函数返回上层处理。

  if (clear_errors)
      req->errors = 0;
  /*命令成功完成返回,否则需要进一步处理*/
  if (scsi_end_request(cmd, 1, good_bytes, result == 0) == NULL)
      return;

如果命令成功完成,整个 I/O 返回过程就执行完毕,如果命令没有成功完成,还需要第三部分处理错误。

命令是否成功完成,是由输入参数 good_bytes 决定的。

  /* good_bytes = 0, or (inclusive) there were leftovers and
   * result = 0, so scsi_end_request couldn't retry.
   */
  if (sense_valid && !sense_deferred) {
      switch (sshdr.sense_key) {
      case UNIT_ATTENTION:
           if (cmd->device->removable) {
               /*如果设备被移走,则结束请求*/
               cmd->device->changed = 1;
               scsi_end_request(cmd, 0, this_count, 1);
               return;
          } else {
               /*重新插入命令*/
               scsi_requeue_command(q, cmd);
               return;
          }
          break;
      case ILLEGAL_REQUEST:
          if ((cmd->device->use_10_for_rw &&
             sshdr.asc == 0x20 && sshdr.ascq == 0x00) &&
             (cmd->cmnd[0] == READ_10 ||
              cmd->cmnd[0] == WRITE_10)) {
              cmd->device->use_10_for_rw = 0;
              /* This will cause a retry with a
               * 6-byte command.
               */
              scsi_requeue_command(q, cmd);
              return;
          } else {
              scsi_end_request(cmd, 0, this_count, 1);
              return;
          }
          break;
      case NOT_READY:
          if (sshdr.asc == 0x04) {
              switch (sshdr.ascq) {
              case 0x01: /* becoming ready */
              case 0x04: /* format in progress */
              case 0x05: /* rebuild in progress */
              case 0x06: /* recalculation in progress */
              case 0x07: /* operation in progress */
              case 0x08: /* Long write in progress */
              case 0x09: /* self test in progress */
                  scsi_requeue_command(q, cmd);
                  return;
              default:
                  break;
              }
          }
          if (!(req->flags & REQ_QUIET)) {
              scmd_printk(KERN_INFO, cmd, "Device not ready: ");
              scsi_print_sense_hdr("", &sshdr);
          }
          scsi_end_request(cmd, 0, this_count, 1);
          return;
      case VOLUME```c
      case VOLUME_OVERFLOW:
          if (!(req->flags & REQ_QUIET)) {
              scmd_printk(KERN_INFO, cmd, "Volume overflow, CD");
              __scsi_print_command(cmd->cmnd);
              scsi_print_sense("", cmd);
          }
          /* See SSC3rXX or current. */
          scsi_end_request(cmd, 0, this_count, 1);
          return;
      default:
          break;
      }
  }
  /*如果返回DID_RESET,则重新插入命令*/ 
  if (host_byte(result) == DID_RESET) {
      scsi_requeue_command(q, cmd);
      return;
  }
  if (result) {
      if (!(req->flags & REQ_QUIET)) {
      }
  }
  scsi_end_request(cmd, 0, this_count, !result);
}

scsi_io_completion 第三部分根据 sense key 和 asc、ascq 的值决定处理的方式.处理方式其实比较简单,一种是结束请求,另一种是把请求重新插入电梯队列.

值得注意的是,由于 scsi 硬件种类繁杂,错误定义混乱,scsi 返回信息很多时候不能准确反映设备的情况,这造成内核 I/O 处理的异常.比如有的 scsi 硬件总是返回 DID_RESET,结果 I/O 请求无限次重插,CPU 占有率达到 100%,阻塞了操作系统的运行.

3. scsi_end_request 函数

scsi_io_completion 函数分析完成后,还需要重点分析 scsi_end_request 函数,观察 I/O 的返回过程.它的代码如代码清单 11-29 所示.

代码清单 11-29 scsi_end_request 函数

static struct scsi_cmnd *scsi_end_request(struct scsi_cmnd *cmd,
                      int bytes, int requeue)
{
  request_queue_t *q = cmd->device->request_queue;
  struct request *req = cmd->request;
  unsigned long flags;
 
  /*根据uptodate和goodbytes结束请求*/
  if (end_that_request_chunk(req, uptodate, bytes)) {
       /*如果返回非0,说明还有块未完成*/
       int leftover = (req->hard_nr_sectors << 9);
       if (blk_pc_request(req))
            leftover = req->data_len;
       /* kill remainder if no retrys */
       if (!uptodate && blk_noretry_request(req))
           /*如果返回了fastfail,说明不需要重试,则直接结束*/
           end_that_request_chunk(req, 0, leftover);
       else { 
       /*重插入队列*/
       if (requeue) {
           scsi_requeue_command(q, cmd);
           cmd = NULL;
       }
       return cmd;
    }
  }
  ……/*省略部分代码*/
  scsi_next_command(cmd);
  return NULL;
} 

scsi_end_request 函数调用 end_that_request_chunk 函数结束 I/O 的返回过程.如果返回结果不为 0,说明 I/O 没有成功完成,根据返回结果有两种处理方式.如果返回结果带有 REQ_FAILFAST 标志,指示快速返回,不需要重试,直接结束 I/O,否则就将 I/O 重新插入电梯队列,再次执行.

最后调用 scsi_next_command 从队列中挑选下一个 I/O 执行.

参数 good_bytes 说明了 I/O 的完成情况,是部分完成还是全部完成.scsi_end_request 要根据 good_bytes 确定如何结束 I/O 请求.

end_that_request_chunk 调用了 __end_that_request_first 来完成 bio 的返回.回顾第10章的内容,bio 设置的回调函数是 mpage_end_io_read.__end_that_request_first 函数调用这个回调函数,完成唤醒阻塞进程的任务,通知 I/O 完成返回.

11.6 本章小结

通用块层、I/O调度算法和scsi层,共同构建了文件系统之下的I/O处理流程.Linux内核采用了很多对象的思想来设计程序,这样块设备的执行流程和电梯算法都对象化了,用户可以自行设计自己的模块来替换内核提供的软件模块.


第12章 内核回写机制

回顾第10章关于文件写的部分,Linux的写操作只是写数据到page cache中,真正的写磁盘要由内核的pdflush线程完成的.

12.1 内核的触发条件

何时启动写磁盘操作由内核根据一些条件触发pdflush线程完成.内核的触发条件如下.

  • 文件系统的写函数generic_file_buffered_write要调用balance_dirty_pages_ratelimited.后者首先检查累加的dirty页(这是一个CPU变量)是否达到一个限制,超过限制则执行写操作.
  • 内核定时器触发.当内核启动的时候,要启动一个回写定时器(默认是5秒).当定时时间到达,触发pdflush线程执行写操作.
  • 当系统申请内存失败,或者文件系统执行同步(sync)操作,或者内存管理模块试图释放更多内存的时候,都可能触发pdflush线程回写页面.

提示

CPU变量是内核的一种数据结构,每个CPU核都有变量的一个副本.

12.2 内核回写控制参数

内核通过4个参数控制数据回写的算法.这四个参数由proc文件系统提供,用户可以直接修改这些参数从而调整内核回写的行为.具体如表12-1所示.

表12-1 内核回写控制参数

(表内容原文未给出,此处保留占位)

12.3 定时器触发回写

系统初始化的start_kernel函数启动了一个定时器,这个定时器的作用就是推动内核回写数据.因此从start_kernel函数开始分析内核的回写机制.如代码清单12-1所示.

代码清单12-1 start_kernel函数

asmlinkage void __init start_kernel(void)
{
  ……/*省略无关代码*/
  /* rootfs populating might need page-writeback */
  page_writeback_init();
  ……/*省略无关代码*/
}

12.3.1 启动定时器

start_kernel函数调用page_writeback_init函数启动定时器,如代码清单12-2所示.

代码清单12-2 page_writeback_init函数

void __init page_writeback_init(void)
{
  long buffer_pages = nr_free_buffer_pages();
  long correction;
  total_pages = nr_free_pagecache_pages();
  correction = (100 * 4 * buffer_pages) / total_pages;
  if (correction < 100) {
       /*脏页比例和背景脏页比例两个参数乘以内存比例*/
       dirty_background_ratio *= correction;
       dirty_background_ratio /= 100;
       vm_dirty_ratio *= correction;
       vm_dirty_ratio /= 100;
       if (dirty_background_ratio <= 0)
           dirty_background_ratio = 1;
       if (vm_dirty_ratio <= 0)
           vm_dirty_ratio = 1;
  }
  mod_timer(&wb_timer, jiffies + dirty_writeback_interval);
  set_ratelimit();
  register_cpu_notifier(&ratelimit_nb);
}

page_writeback_init函数首先获得内存区内所有可分配内存的总数.这里的可分配内存包括ZONE_HIGHZONE_NORMALZONE_DMA三个管理区.

然后检查当前系统的内存使用情况.buffer_pagesZONE_DMAZONE_NORMAL管理区的可分配内存.如果buffer_pages少于所有可分配内存total_pages的1/4,此时脏页比例和背景脏页比例两个参数要调整,调整算法是比例参数乘以内存之比.通过放大比例参数达到尽快回写数据回收内存的目的.

注意

此处涉及内核内存管理和分配的知识.简单说,32位操作系统的内核将内存划分为三个管理区,分别是ZONE_DMAZONE_NORMALZONE_HIGH.

set_ratelimit函数设置内存页面限制.这个参数的上限是4兆内存,按4K的页面计算,限制在16个页面和1024个页面之间,具体根据CPU个数和可分配内存数决定.如果页面尺寸不是4K,就要按4兆内存除以实际页面尺寸计算真正的页面最大值.

page_writeback_init函数启动定时器wb_timer作为回写定时器.这个定时器如代码清单12-3所示.

代码清单12-3 wb_timer_fn函数

static DEFINE_TIMER(wb_timer, wb_timer_fn, 0, 0);
 
static void wb_timer_fn(unsigned long unused)
{
  if (pdflush_operation(wb_kupdate, 0) < 0)
      mod_timer(&wb_timer, jiffies + HZ); /* delay 1 second */
}
 
int pdflush_operation(void (*fn)(unsigned long), unsigned long arg0)
{
  unsigned long flags;
  int ret = 0;
  BUG_ON(fn == NULL);        /* Hard to diagnose if it's deferred */
  spin_lock_irqsave(&pdflush_lock, flags);
  if (list_empty(&pdflush_list)) {
      spin_unlock_irqrestore(&pdflush_lock, flags);
      ret = -1;
  } else {
      struct pdflush_work *pdf;
      pdf = list_entry(pdflush_list.next, struct pdflush_work, list);
      list_del_init(&pdf->list);
      if (list_empty(&pdflush_list))
          last_empty_jifs = jiffies;
      pdf->fn = fn;
      pdf->arg0 = arg0;
      wake_up_process(pdf->who);
      spin_unlock_irqrestore(&pdflush_lock, flags);
  }
  return ret;
}

定时器wb_timer的执行函数是wb_timer_fn.后者调用pdflush_operation函数,从pdflush_list全局链表获得一个pdflush_work结构,设置pdflush_work结构的工作函数wb_kupdate,然后把pdflush_work结构从链表中删除,唤醒pdflush内核线程.

pdflush线程是内核启动的一个内核线程,它的执行函数体是pdflush.这个函数的作用是执行pdflush_work结构设置的工作函数(就是注册的wb_kupdate函数).

12.3.2 执行回写操作

wb_kupdate是内核回写的控制函数,我们从这个函数开始分析回写机制,如代码清单12-4所示.

代码清单12-4 wb_kupdate函数

 static void wb_kupdate(unsigned long arg)
{
  ……/*省略部分代码*/
  oldest_jif = jiffies - dirty_expire_interval;
  start_jif = jiffies;
  next_jif = start_jif + dirty_writeback_interval;
  nr_to_write = global_page_state(NR_FILE_DIRTY) +
                global_page_state(NR_UNSTABLE_NFS) +
                (inodes_stat.nr_inodes - inodes_stat.nr_unused);
  while (nr_to_write > 0) {
      wbc.encountered_congestion = 0;
      /*一次写页面的最大值为1024*/
      wbc.nr_to_write = MAX_WRITEBACK_PAGES;
      writeback_inodes(&wbc);
      if (wbc.nr_to_write > 0) {
          if (wbc.encountered_congestion)
              blk_congestion_wait(WRITE, HZ/10);
          else
              break;        /* All the old data is written */
      }
      nr_to_write -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
  }
  /*如果当前时间加1秒已超过下一轮的定时器时间,设置定时器为当前时间加1秒*/
  if (time_before(next_jif, jiffies + HZ))
      next_jif = jiffies + HZ;
  /*更新定时器*/
  if (dirty_writeback_interval)
      mod_timer(&wb_timer, next_jif);
}

wb_kupdate函数首先计算三个时间.

  • start_jif:当前时间.
  • oldest_jif:脏页面允许存在的最长时间,早于这个时间的页面必须回写.这个值默认是当前时间减去30秒.
  • next_jif:下一次执行回写的时间.默认是5秒,就是每5秒触发一次wb_kupdate.

其次检查系统内有多少可写的页面.如果大于0,则调用writeback_inodes执行回写.

执行一次回写操作后,如果还有页面未写并且设置了阻塞标志,则启动阻塞处理.阻塞处理是等1/10秒,然后继续执行下一次回写.如果未设置阻塞标志,则直接进行下一轮的回写.

12.3.3 检查需要回写的页面

writeback_inodes函数用于扫描系统的超级块,检查需要回写的页面,如代码清单12-5所示.

代码清单12-5 writeback_inodes函数

writeback_inodes(struct writeback_control *wbc)
{
  struct super_block *sb;
  might_sleep();
  spin_lock(&sb_lock);
restart:
  sb = sb_entry(super_blocks.prev);
  for (; sb != sb_entry(&super_blocks); sb = 
               sb_entry(sb->s_list.prev)) {
      if (!list_empty(&sb->s_dirty) || !list_empty(&sb->s_io))
          /* we're making our own get_super here */
          sb->s_count++;
          spin_unlock(&sb_lock);
          if (down_read_trylock(&sb->s_umount)) {
              if (sb->s_root) {
                  spin_lock(&inode_lock);
                  sync_sb_inodes(sb, wbc);
                  spin_unlock(&inode_lock);
              }
   ……/*省略部分代码*/

writeback_inodes函数首先遍历系统的超级块对象,如果超级块的脏页面链表非空,则调用sync_sb_inodes回写超级块内的inode.

12.3.4 回写超级块内的inode

因为每个文件系统有一个超级块,而文件系统内所有的inode结构都链接到超级块对象的链表头,sync_sb_inodes函数要检查超级块对象内所有需要回写的inode,如代码清单12-6所示.

代码清单12-6 sync_sb_inodes函数

static void
sync_sb_inodes(struct super_block *sb, struct writeback_control *wbc)
{
  const unsigned long start = jiffies;        /* livelock avoidance */
  /*将dirty链表合并到s_io链表*/
  if (!wbc->for_kupdate || list_empty(&sb->s_io))
      list_splice_init(&sb->s_dirty, &sb->s_io);
  while (!list_empty(&sb->s_io)) {
      struct inode *inode = list_entry(sb->s_io.prev, struct inode, i_list);
      struct address_space *mapping = inode->i_mapping;
      struct backing_dev_info *bdi = mapping->backing_dev_info;
      long pages_skipped;
      /*无回写能力的处理*/
      if (!bdi_cap_writeback_dirty(bdi)) {
          list_move(&inode->i_list, &sb->s_dirty);
          if (sb == blockdev_superblock) {
              continue;
          }
          break;
      }
      /*bdi指示写拥塞。设置拥塞标志*/   
      if (wbc->nonblocking && bdi_write_congested(bdi)) {
          wbc->encountered_congestion = 1;
          if (sb != blockdev_superblock)
              break;   /* Skip a congested fs */
          list_move(&inode->i_list, &sb->s_dirty);
          continue;    /* Skip a congested blockdev */
      }
      if (wbc->bdi && bdi != wbc->bdi) {
          if (sb != blockdev_superblock)
              break;    /* fs has the wrong queue */
          list_move(&inode->i_list, &sb->s_dirty);
          continue;     /* blockdev has wrong queue */
      }

sync_sb_inodes函数第一部分遍历超级块链表的脏inode结构,检查是否可回写.

根据第9章块设备文件系统的分析,所有块设备的超级块是块设备文件系统提供的全局变量blockdev_superblock.

  • 如果inode不具备回写能力,且超级块等于blockdev_superblock,说明这个inode是一个块设备文件的inode结构,这种情况继续遍历块设备文件系统的其他inode,检查是否可写.
  • 如果超级块不等于blockdev_superblock,说明这是一个文件系统,这种情况跳过整个超级块,检查下一个超级块里面的inode.
  • 如果写拥塞,和上面的处理类似.
  • 如果文件系统拥塞,跳过整个超级块,检查下一个超级块.
  • 如果块设备拥塞,只跳过该块设备,检查超级块链表的下一个块设备.

2. 检查时间参数

sync_sb_inodes函数第二部分首先检查时间参数.

  • 如果inode结构置脏的时间在sync_sb_inodes函数开始执行的时间之后,跳出循环.
  • 如果inode结构置脏的时间早于设定的回写时间(回写时间默认是当前时间之前30秒),同样跳出循环.
  • 如果有另外一个pdflush线程也在回写队列,同样跳出循环.
/*inode的dirty时间在sync_sb_inodes调用之后*/
if (time_after(inode->dirtied_when, start))
  break;
/* Was this inode dirtied too recently? */
if (wbc->older_than_this && time_after(inode->dirtied_when,
               *wbc->older_than_this))
  break;
/*是否已经有另一个pdflush在回写这个inode*/
if (current_is_pdflush() && !writeback_acquire(bdi))
  break;
BUG_ON(inode->i_state & I_FREEING);
__iget(inode);
pages_skipped = wbc->pages_skipped;
__writeback_single_inode(inode, wbc);
……/*省略部分代码*/

3. 检查inode状态

如果检查都通过,调用__writeback_single_inode检查inode的状态,如代码清单12-7所示.

代码清单12-7 __writeback_single_inode函数

 static int
__writeback_single_inode(struct inode *inode, struct writeback_control *wbc)
{
  wait_queue_head_t *wqh;
  if (!atomic_read(&inode->i_count))
      WARN_ON(!(inode->i_state & (I_WILL_FREE|I_FREEING)));
  else
      WARN_ON(inode->i_state & I_WILL_FREE);
  if ((wbc->sync_mode != WB_SYNC_ALL) && (inode->i_state & I_LOCK)) {
      list_move(&inode->i_list, &inode->i_sb->s_dirty);
      return 0;
  }
  if (inode->i_state & I_LOCK) {
      DEFINE_WAIT_BIT(wq, &inode->i_state, __I_LOCK);
      wqh = bit_waitqueue(&inode->i_state, __I_LOCK);
      do {
          spin_unlock(&inode_lock);
          __wait_on_bit(wqh, &wq, inode_wait, TASK_UNINTERRUPTIBLE);
          spin_lock(&inode_lock);
      } while (inode->i_state & I_LOCK);
  }
  return __sync_single_inode(inode, wbc);
}

__writeback_single_inode函数执行两个条件判断.

  • 如果inode状态已经是LOCK,且回写模式不是WB_SYNC_ALL,把inode链接到超级块的s_dirty链表,然后退出.
  • 如果回写模式是WB_SYNC_ALL,一直等待,直到inode解除LOCK状态,然后调用__sync_single_inode函数进行文件的回写.

4. 文件回写操作

__sync_single_inode函数如代码清单12-8所示.

代码清单12-8 __sync_single_inode函数

static int
__sync_single_inode(struct inode *inode, struct writeback_control *wbc)
{
  unsigned dirty;
  struct address_space *mapping = inode->i_mapping;
  struct super_block *sb = inode->i_sb;
  int wait = wbc->sync_mode == WB_SYNC_ALL;
  int ret;
  BUG_ON(inode->i_state & I_LOCK);
  /* Set I_LOCK, reset I_DIRTY */
  dirty = inode->i_state & I__sync_single_inode函数第一部分执行脏数据和inode本身的回写。为了表示inode数据的各种置脏条件,内核定义了三个参数。
 
- `I_DIRTY_SYNC`:表示只是inode访问时间发生了变化。
- `I_DIRTY_DATASYNC`:表示inode其他属性数据发生了变化,比如文件的长度。
- `I_DIRTY_PAGES`:文件的内容数据发生了变化。
 
`__sync_single_inode`函数首先调用`do_writepages`将所有置脏的数据页面进行回写(根据控制参数`wbc`设置的条件)。
 
如果inode设置了`I_DIRTY_SYNC`或者`I_DIRTY_DATASYNC`,说明不只是文件的内容,文件本身的属性也发生了改变,因此调用`write_inode`函数对inode本身进行回写。当`write_inode`返回时,已经完成了inode的回写或者是发生了错误。
 
如果控制参数`wbc`设置了`WB_SYNC_ALL`,说明是紧要数据,需要等所有的数据回写完成。因此调用`filemap_fdatawait`等待数据回写完成。
 
`__sync_single_inode`函数第二部分首先检查文件是否有数据未进行回写。
 
```c
  spin_lock(&inode_lock);
  inode->i_state &= ~I_LOCK;
  if (!(inode->i_state & I_FREEING)) {
      if (!(inode->i_state & I_DIRTY) &&
          mapping_tagged(mapping, PAGECACHE_TAG_DIRTY)) {
          if (wbc->for_kupdate) {
              /*让inode得到更多的回写机会*/
              inode->i_state |= I_DIRTY_PAGES;
              list_move_tail(&inode->i_list, &sb->s_dirty);
          } else {
              /*修改inode的置脏时间,让其他inode有更多的回写机会*/
              inode->i_state |= I_DIRTY_PAGES;
              inode->dirtied_when = jiffies;
              list_move(&inode->i_list, &sb->s_dirty);
          }
      } else if (inode->i_state & I_DIRTY) {
          list_move(&inode->i_list, &sb->s_dirty);
      } else if (atomic_read(&inode->i_count)) {
          list_move(&inode->i_list, &inode_in_use);
      } else {
          list_move(&inode->i_list, &inode_unused);
      }
  }
  /*唤醒阻塞的进程*/ 
  wake_up_inode(inode);
  return ret;
}

这一步通过调用mapping_tagged检查文件的page cache状态。

  • 如果仍有置脏的数据页面,要将inode状态设置I_DIRTY_PAGES标志位。
  • 如果没有其他的置脏的数据页面,但是inode本身被置脏,说明有别的任务修改了inode的属性,此时要将inode加入超级块的脏inode链表。
  • 如果以上条件都不成立且inode引用计数不为0,说明文件没有置脏的数据页面,文件属性也没发生改变,将inode加入inode_in_use链表,如果引用计数为0,则将inode加入inode_unused链表。

12.4 平衡写

平衡写在文件系统写过程的上下文执行。第10章文件系统读写分析已经观察到写过程调用了 balance_dirty_pages_ratelimited 函数。这个函数作用是检查脏页面是否到达限值,是否进行平衡写操作。具体如代码清单12-9所示。

12.4.1 检查直接回写的条件

balance_dirty_pages_ratelimited 函数调用了 balance_dirty_pages_ratelimited_nr 函数。后者首先定义了一个CPU变量 ratelimits,初始化为0。每次写页面的时候,这个变量累加脏页面的总量,当脏页面超过前面定义的内存页面限制值,则调用 balance_dirty_pages 开始执行回写操作。balance_dirty_pages 函数如代码清单12-10所示。

balance_dirty_pages 函数第一部分首先调用 get_dirty_limits 获得设置的脏页面回写数目。前面通过 page_writeback_init 函数已经设置了系统的脏页面比例。变量 background_threshdirty_thresh 分别代表背景写的脏页面数目和直接回写的脏页面数目。

12.4.2 回写系统脏页面的条件

然后计算系统内需要回写的脏页面数目总和。如果这些页面总数超过 dirty_thresh 的限制,则进入直接回写,否则退出循环,检查是否超过了 background_thresh 限制。

balance_dirty_pages 函数第二部分调用 writeback_inodes 回写系统的脏页面。回写完毕,要执行两个检查:一是检查脏页和回写页是否大于 dirty_thresh 脏页限制,二是检查已经回写的页面是否小于设置的回写页面数,如果两个条件都满足,则阻塞进程1/10秒,然后再次回写。

12.4.3 检查计算机模式

最后,balance_dirty_pages 函数要检查计算机的模式。

  • 笔记本模式:这种模式回写的标准比较高,要等待达到标准较高的直接写条件,才进行背景写。
  • 普通模式:检查脏页面是否超过背景写的限制数 background_thresh,如果超过,触发 pdflush 线程进行回写。

writeback_inodes 前文已经分析过,不再赘述。

12.5 本章小结

Linux内核的回写机制提供了一种缓写机制,真实的写并不是直接写入硬盘,而是在 page cache 中缓存,等待合适的时机才真正执行写入。这种机制存在的前提是适配硬盘的物理特性,在内核软件的设计中,乃至用户软件的设计中,考虑硬件特性并适配之,是系统设计的重要方面。


第13章 一个真实文件系统 ext2

通过前面章节的分析和学习,本章分析一个真正的文件系统。本章选的例子是 Linux 的 ext2。ext2 是 Linux 系统使用最广泛的文件系统,它和 ext3 构成 Linux 文件系统的基石。而 ext3 和 ext2 的结构基本相同,只是 ext3 多了日志功能。本章不再分析代码,主要介绍 ext2 的磁盘布局和超级块,以及 inode 结构,代码分析工作留给读者。原因是 Linux 内核庞大复杂无比,任何书都不可能完备的分析代码。通过本书内核基础层和应用层的分析,读者可以从架构层次掌握系统的框架和脉络,在这个基础上,可以比较流畅的分析阅读内核代码。

13.1 ext2 的硬盘布局

根据 Linux 文件系统的知识,文件和 dentry 是内存中的结构,并不真正存在于硬盘中。真正存在于硬盘的是超级块结构和 inode 结构。首先看 ext2 的硬盘布局,如图13-1所示。

硬盘的第一个扇区是引导区,占据1K字节,引导区是文件系统不能使用的,用来存储分区信息。ext2 是通过块组的方式来组织硬盘。每个硬盘分区都由若干个大小相同的块组组成。每个块组包括下列信息:

  • 超级块(super block):每个块组的起始位置都有一个超级块,这些超级块的内容都是相同的。超级块包括文件系统的信息,比如每个块组的块数目、每个块组的 inode 数目等。
  • 块组描述符表(GDT):块组描述符表由很多的块组描述符组成。ext2 的每个块组描述符为32字节。整个文件系统分区有多少个块组,就有多少个块组描述符。块组描述符描述了一个块组的信息,比如一个块组中 inode 位图的起始位置、inode 表的起始位置等。
  • 块位图(block bitmap):每个比特代表块组中哪些块可用,哪些块已经被占有。块位图本身要占有一个块。如果块大小设置为1K字节,则一个块组的大小为1K×1K×8bit = 8M字节。
  • inode 位图(inode bitmap):inode 位图也占用一个块。它的每一位代表一个 inode 是否空闲。
  • inode 表(inode table):每个文件都有一个 inode,inode 保存了文件的描述信息、文件的类型、文件的大小、文件的创建访问时间等。一个 inode 占128字节。如果文件系统块大小为1K字节,一个块可容纳8个 inode。inode 表可以占用多个块。
  • 数据块(data block):保存文件的内容。常规文件的数据保存在数据块中,如果是目录文件,那么该目录下所有的文件名和下级目录名都保存在数据块中。

13.2 ext2 文件系统目录树

文件系统的层次结构可以用目录树来表示。对 ext2 文件系统而言,根目录是一个固定的 inode。通过读根目录的内容,就可以获得根目录下的文件的 inode 信息、文件类型和文件名字。如果该文件是目录文件,重复这个过程,就可以获得二级目录的信息。

ext2 目录文件的结构如代码清单13-1所示。

根目录是一个固定的 inode,它的 inode 号是2。当文件系统初始化的时候,读到超级块的内容,就获得了文件块的大小、每个块组的块数目、每个块组的 inode 数目。从超级块之后的块组描述符表,可以获得所有的块组信息。

通过超级块提供的信息和块组描述符表信息,就可以获得根目录在 inode 表的位置,从而读到根目录的内容。从根目录的内容里面,进而获得根目录下文件的 inode 号和文件类型。

如果这是个目录文件,那么重复上面的过程,可以获得二级目录的目录结构。不断重复这个过程,最终获得整个文件系统的目录树。

13.3 ext2 文件内容管理

ext2 的 inode 信息可以存放15个块的地址,用户数据就存放在这些块中。但是15个块不一定能存放全面的用户数据,那么可以采用分层的方式存储。这15个块中,前12个块是直接数据块,直接存放用户数据。而第13个块,是一级索引块,这个块存放的又是块的地址,这些块地址指向的块才真正存放用户数据。而第14个块,是二级索引块,比一级索引块又多了一层,而第15个块,是三级索引块,比二级索引块又多了一层。通过这种方式,大大扩展了 ext2 文件系统的文件存放内容。

13.4 ext2 文件系统读写

对一个文件系统来说,最重要基础的部分无非是打开和创建文件以及文件的读写。

通过文件系统的目录树,可以获得文件所在目录的上级目录。读上级目录的内容,就可以获得文件的 inode 号和文件名字。根据文件的 inode 号,可以获得文件的 inode 信息。也就获得了文件的类型、创建修改时间、文件大小等信息,完成打开文件的过程。

文件的读写,首先要获得文件数据内容的物理扇区位置。在文件的 inode 信息里,保存了15个文件块的地址。这15个文件块通过分层方式,存储了文件的数据内容。知道了文件数据的块地址,就可以获得文件数据的物理扇区地址。根据第10章的分析,可以真正对文件执行读写操作。

13.5 本章小结

本章可以作为对内核学习的一个实战。读者应该结合 ext2 文件系统的代码,分析文件系统的读写和打开的过程,深度理解一个广泛使用的文件系统。内核本身在不断更新,想利用内核知识解决实际问题,必须具备快速阅读代码的能力,从代码中学习,从代码中实践。