在C中实现简单的合作流

哈Ha!



感谢您对我们先前翻译的REST文章的关注。今天,我们提供了从稍微不同的角度来看系统设计的主题,并发表了Linux名人斯蒂芬·布伦南(Stephen Brennan)的文章翻译,他谈到了自己在用户空间中实现多任务的方式以及它如何发挥作用。



像操作系统提供的许多其他功能一样,多任务处理不仅是理所当然的,而且还将其视为普通事物。使用我们功能强大的计算机和智能手机,计算机无法处理数百个进程的想法似乎很奇怪。我认为这是使计算机如此有用的可能性之一,但是正因为如此,它们如此复杂,有时才显得不可思议。



很难涉足实现多任务的代码,并且在任何情况下都最好自己实现,这并不总是很清楚,这样就不必编写整个操作系统。我很确定,除非您自己意识到它,否则不可能完全理解该现象。因此,我决定写一篇文章,告诉您如何使用简单的线程实现。在本文中,我们将在常规

C程序(而不是操作系统)中实现简单的流



关于setjmp和longjmp的抒情离题



调度程序将高度依赖功能setjmp()longjmp()。它们看起来有些不可思议,所以我将首先描述它们的作用,然后我将花一些时间来确切地告诉您如何做。



该功能setjmp()使您可以记录有关程序处于什么执行阶段的信息,以便以后可以再次跳转到这一点。一个类型变量传递给它jmp_buf,我们将在其中存储此信息。第一次返回时,该函数setjmp()返回0。



之后,您可以使用该函数longjmp(jmp_buf, value)从调用该点立即恢复执行setjmp()。在您的程序中,这种情况看起来好像setjmp()又回来了。这次参数将返回value您通过了longjmp()-区分第二个回报和第一个回报更方便。这是一个说明这一点的示例:



#include <stdio.h>
#include <setjmp.h>

jmp_buf saved_location;
int main(int argc, char **argv)
{
    if (setjmp(saved_location) == 0) {
        printf("We have successfully set up our jump buffer!\n");
    } else {
        printf("We jumped!\n");
        return 0;
    }

    printf("Getting ready to jump!\n");
    longjmp(saved_location, 1);
    printf("This will never execute...\n");
    return 0;
}


如果我们编译并运行该程序,则会得到以下输出:



We have successfully set up our jump buffer!
Getting ready to jump!
We jumped!


哇!就像goto语句一样,但是在这种情况下,它甚至可以用于跳出函数。它也比goto普通的函数调用更难读如果您的代码使用setjmp()丰富longjmp(),那么这将是非常困难的阅读它的人(包括你)。



既然是这样goto,所以一般建议,以避免setjmp()longjmp()但是像goto,如果少量且一致地使用上述功能,则可能会很有用。调度程序需要能够切换上下文,因此我们将负责任地使用这些功能。最重要的是,我们将使用API​​中的这些功能,以便计划人员无需处理这种复杂性。



Setjmp和longjmp不会保存堆栈

True,函数setjmp()longjmp()不打算支持任何种类的跳跃。它们是针对非常具体的实际案例而设计的。假设您正在执行一些复杂的操作,例如发出HTTP请求。在这种情况下,将涉及一组复杂的函数调用,并且如果其中任何一个失败,则您将不得不从每个函数返回一个特殊的错误代码。无论您在何处调用该函数(可能数十次),此类代码都将在下面的清单中显示:



int rv = do_function_call();
if (rv != SUCCESS) {
    return rv;
}


含义setjmp()含义longjmp()setjmp()在真正开始工作之前帮助确定位置的真正困难。然后,您可以将所有错误处理集中在一个地方:



int rv;
jmp_buf buf;
if ((rv = setjmp(buf)) != 0) {
    /*    */
    return;
}
do_complicated_task(buf, args...);


如果涉及的任何功能失败do_complicated_task(),它只会发生longjmp(buf, error_code)。这意味着组合中的每个函数do_complicated_task()都可以假定任何函数调用都成功,这意味着您不能在每个函数调用中都使用此代码来处理错误(实际上,这从来没有做过,但这是另一篇文章的主题) ...



这里的基本思想是longjmp()它仅允许您跳出深层嵌套的函数。您无法跳入先前跳出的那个深层嵌套的函数。这是您跳出函数时的堆栈外观。星号(*)表示存储它的堆栈指针setjmp()



  | Stack before longjmp    | Stack after longjmp
      +-------------------------+----------------------------
stack | main()              (*) | main()
grows | do_http_request()       |
down  | send_a_header()         |
 |    | write_bytes()           |
 v    | write()  - fails!       |


如您所见,您只能在堆栈上向后移动,因此没有数据损坏的危险。另一方面,想象一下如果您想在两个任务之间跳转将会是什么样子。如果您致电setjmp()然后返回,请做一些其他事情,然后尝试恢复以前一直在做的工作,那么就会出现问题:



      | Stack at setjmp() | Stack later      | Stack after longjmp()
      +-------------------+------------------+----------------------
stack | main()            | main()           | main()
grows | do_task_one()     | do_task_two()    | do_stack_two()
down  | subtask()         | subtask()        | subtask()
 |    | foo()             |                  | ???
 v    | bar()         (*) |              (*) | ???               (*)


保存的堆栈指针setjmp()将指向不再存在的堆栈帧,并且在某个时间点可能已被其他数据覆盖。如果我们尝试longjmp()跳回到帮助返回的函数,那么很奇怪的事情就会开始,这很可能导致整个程序崩溃。



道德:如果要使用setjmp()longjmp()在此类复杂任务之间跳转,则必须确保每个任务都有其自己的单独堆栈。在这种情况下,该问题已完全消除,因为当longjmp()重置堆栈指针,程序本身将用所需的堆栈替换堆栈,并且不会发生堆栈擦除。



让我们写一个调度程序API



题外话有点冗长,但是有了我们所学的知识,我们现在就可以实现用户空间流。首先,我注意到自己设计API进行初始化,创建和启动线程非常有用。如果我们提前这样做,我们将对我们要构建的内容有更好的了解!



void scheduler_init(void);
void scheduler_create_task(void (*func)(void*), void *arg);
void scheduler_run(void);


这些功能将用于初始化调度程序,添加任务,然后最终在调度程序中启动任务。一旦启动,它将scheduler_run()一直运行直到所有任务完成。当前正在运行的任务将具有以下API:



void scheduler_exit_current_task(void);
void scheduler_relinquish(void);


第一个功能负责退出任务。从任务的功能返回时,也可以从任务中退出,因此这种构造仅是为了方便起见。第二个函数描述了我们的线程如何告诉调度程序另一个任务应该运行一段时间。任务调用时scheduler_relinquish(),可以在其他任务运行时将其暂时挂起;但最终该函数将返回并且第一个任务可以继续。



为了举一个具体的例子,让我们考虑一下我们的API的假设用例,我们将用它来测试调度程序:



#include <stdlib.h>
#include <stdio.h>

#include "scheduler.h"

struct tester_args {
    char *name;
    int iters;
};

void tester(void *arg)
{
    int i;
    struct tester_args *ta = (struct tester_args *)arg;
    for (i = 0; i < ta->iters; i++) {
        printf("task %s: %d\n", ta->name, i);
        scheduler_relinquish();
    }
    free(ta);
}

void create_test_task(char *name, int iters)
{
    struct tester_args *ta = malloc(sizeof(*ta));
    ta->name = name;
    ta->iters = iters;
    scheduler_create_task(tester, ta);
}

int main(int argc, char **argv)
{
    scheduler_init();
    create_test_task("first", 5);
    create_test_task("second", 2);
    scheduler_run();
    printf("Finished running all tasks!\n");
    return EXIT_SUCCESS;
}


在此示例中,我们创建了两个执行相同功能但使用不同自变量的任务。因此,可以分别跟踪其实现。每个任务执行一定数量的迭代。在每次迭代时,它都会打印一条消息,然后运行另一个任务。我们希望看到类似程序输出的内容:



task first: 0
task second: 0
task first: 1
task second: 1
task first: 2
task first: 3
task first: 4
Finished running all tasks!


让我们实现调度程序API



要实现API,我们需要某种内部的任务表示形式。所以,让我们开始做生意。让我们收集所需的字段:



struct task {
    enum {
        ST_CREATED,
        ST_RUNNING,
        ST_WAITING,
    } status;

    int id;

    jmp_buf buf;

    void (*func)(void*);
    void *arg;

    struct sc_list_head task_list;

    void *stack_bottom;
    void *stack_top;
    int stack_size;
};


让我们分别讨论这些领域。在执行之前,所有创建的任务必须处于“已创建”状态。当任务开始执行时,它将进入“正在运行”状态,如果该任务曾经需要等待一些异步操作,则可以将其置于“正在等待”状态。字段id只是任务的唯一标识符。这buf包含有关何时执行longjmp()任务以恢复的信息。func字段arg将传递到scheduler_create_task()并且是开始任务所必需的。该字段是task_list实施所有任务的双向链接列表必需的。字段stack_bottomstack_top并且stack_size都属于一个独立的堆栈专门用于这项任务。底部是返回的地址malloc()但是“ top”是指向内存中给定区域正上方的地址的指针。由于x86堆栈向下增长,因此需要将堆栈指针设置为值stack_top,而不是stack_bottom



在这种情况下,您可以实现以下功能scheduler_create_task()



void scheduler_create_task(void (*func)(void *), void *arg)
{
    static int id = 1;
    struct task *task = malloc(sizeof(*task));
    task->status = ST_CREATED;
    task->func = func;
    task->arg = arg;
    task->id = id++;
    task->stack_size = 16 * 1024;
    task->stack_bottom = malloc(task->stack_size);
    task->stack_top = task->stack_bottom + task->stack_size;
    sc_list_insert_end(&priv.task_list, &task->task_list);
}


通过使用static int,我们保证每次调用该函数时,id字段都会递增,并且会有一个新的数字。除了sc_list_insert_end()简单地添加struct task到全局列表中的功能之外,其他所有内容都应该清楚无须说明全局列表存储在第二个结构内,该结构包含调度程序的所有私有数据。以下是结构本身及其初始化功能:



struct scheduler_private {
    jmp_buf buf;
    struct task *current;
    struct sc_list_head task_list;
} priv;

void scheduler_init(void)
{
    priv.current = NULL;
    sc_list_init(&priv.task_list);
}


该字段task_list用于引用任务列表(毫不奇怪)。该字段current存储当前正在执行的任务(null如果当前没有此类任务,则存储该任务)。最重要的是,该字段buf将用于跳入代码scheduler_run()



enum {
    INIT=0,
    SCHEDULE,
    EXIT_TASK,
};

void scheduler_run(void)
{
    /*     ! */
    switch (setjmp(priv.buf)) {
    case EXIT_TASK:
        scheduler_free_current_task();
    case INIT:
    case SCHEDULE:
        schedule();
        /*       ,    */
        return;
    default:
        fprintf(stderr, "Uh oh, scheduler error\n");
        return;
    }
}


调用该函数后scheduler_run(),我们将设置缓冲区,setjmp()以便始终可以返回该函数。第一次返回0(INIT),我们立即调用schedule()随后,我们可以在中传递SCHEDULE或EXIT_TASK常量longjmp(),这将引发不同的行为。现在,让我们忽略EXIT_TASK情况,直接跳转到实现schedule()



static void schedule(void)
{
    struct task *next = scheduler_choose_task();

    if (!next) {
        return;
    }

    priv.current = next;
    if (next->status == ST_CREATED) {
        /*
         *     .   
         * ,        .
         */
        register void *top = next->stack_top;
        asm volatile(
            "mov %[rs], %%rsp \n"
            : [ rs ] "+r" (top) ::
        );

        /*
         *   
         */
        next->status = ST_RUNNING;
        next->func(next->arg);

        /*
         *     ,    .   – ,   
         *   
         */
        scheduler_exit_current_task();
    } else {
        longjmp(next->buf, 1);
    }
    /*   */
}


首先,我们调用内部函数以选择要运行的下一个任务。该调度程序将像常规轮播一样工作,因此它将仅从列表中选择一个新任务。如果此函数返回NULL,则我们没有其他要执行的任务,然后返回。否则,我们必须开始执行任务(如果它处于ST_CREATED状态),或者继续执行它。



要运行创建的任务,我们使用x86_64的汇编指令将字段分配给stack_top寄存器rsp(堆栈指针)。然后,我们更改任务的状态,运行函数并退出。注:setjmp()longjmp()店和重新排列堆栈指针,所以在这里我们只需要使用汇编来改变堆栈指针。



如果任务已经启动,则该字段buf应包含longjmp()恢复任务所需的上下文,这就是我们要做的。

接下来,让我们看一下选择下一个要运行的任务的辅助函数。这是调度程序的核心,正如我之前所说的,此调度程序的工作方式就像轮播一样:



static struct task *scheduler_choose_task(void)
{
    struct task *task;

    sc_list_for_each_entry(task, &priv.task_list, task_list, struct task)
    {
        if (task->status == ST_RUNNING || task->status == ST_CREATED) {
            sc_list_remove(&task->task_list);
            sc_list_insert_end(&priv.task_list, &task->task_list);
            return task;
        }
    }

    return NULL;
}


如果您不熟悉我的链表实现(取自Linux内核),没什么大不了的。函数sc_list_for_each_entry()是一个宏,可让您遍历任务列表中的所有任务。我们找到的第一个可选任务(未处于挂起状态)从其当前位置删除,并移到任务列表的末尾。这样可以确保下次运行调度程序时,我们将收到其他任务(如果有)。我们返回可供选择的第一个任务,如果根本没有任务,则返回NULL。



最后,让我们继续执行scheduler_relinquish(),看看任务如何自我消除:



void scheduler_relinquish(void)
{
    if (setjmp(priv.current->buf)) {
        return;
    } else {
        longjmp(priv.buf, SCHEDULE);
    }
}


这是setjmp()我们调度程序中函数的另一个用例。基本上,此选项似乎有些混乱。当任务调用此函数时,我们将使用它setjmp()来保存当前上下文(包括实际的堆栈指针)。然后,我们使用longjmp()它进入调度程序(再次在中scheduler_run())并传递SCHEDULE函数;因此,我们要求您分配一个新任务。



恢复任务后,该函数setjmp()将返回非零值,并且我们退出之前可能已经执行的所有任务!

最后,这是任务退出时发生的情况(这可以通过调用exit函数或从相应的Task函数返回来显式完成):



void scheduler_exit_current_task(void)
{
    struct task *task = priv.current;
    sc_list_remove(&task->task_list);
    longjmp(priv.buf, EXIT_TASK);
    /*   */
}

static void scheduler_free_current_task(void)
{
    struct task *task = priv.current;
    priv.current = NULL;
    free(task->stack_bottom);
    free(task);
}


这是一个两部分的过程。第一个函数由任务本身直接返回。我们将从任务列表中删除与此条目对应的条目,因为将不再分配该条目。然后,使用,longjmp()返回到函数scheduler_run()。这次我们使用EXIT_TASK。这是我们在分配新任务之前告诉调度程序要调用什么的方式scheduler_free_current_task()。如果返回到说明scheduler_run(),您将看到它正是它的功能scheduler_run()



从那时起,我们分两步执行此操作scheduler_exit_current_task(),它会主动使用任务结构中包含的堆栈。如果释放堆栈并继续使用它,则该函数仍有可能访问我们刚刚释放的堆栈存储器!为了确保不会发生这种情况,我们将不得不longjmp()使用带有help的单独堆栈恢复到调度程序。然后,我们可以安全地释放与任务相关的数据。



因此,我们已经完全分析了调度程序的整个实现。如果我们尝试通过添加链表实现和上面的主程序来对其进行编译,则将获得一个可以正常使用的调度程序!为了不打扰您复制粘贴,我将您定向到github上的存储库,该存储库包含本文的所有代码。



所描述方法的用途是什么?



如果您到目前为止已经读过,我认为没有必要说服您该示例很有趣。但是乍看之下,它似乎不是很有用。毕竟,您可以在C中使用“真实”线程,这些线程可以并行运行,而不必等到其中一个调用彼此scheduler_relinquish()



但是,我将其作为一系列激动人心的有用功能实现的起点。我们可以谈论繁重的I / O任务,关于实现单线程异步应用程序,就像在Python中使用新的异步实用程序一样。生成器和协程也可以使用这样的系统来实现。最后,通过一些艰苦的工作,该系统还可以与“真正的”操作系统线程配合使用,以在需要时提供附加的并行性。每个想法背后都隐藏着一个有趣的项目,亲爱的读者,我建议您尝试自己完成其中一个。



它是安全的?



我认为更可能是拒绝!影响堆栈指针的内联汇编代码不能被认为是安全的。不要冒险在生产中使用这些东西,但一定要修补它们并进行研究!



可以使用“非上下文” API(请参见man getcontext)构建这种系统的更安全实现,该API允许您在这些类型的用户空间“流”之间进行切换,而无需嵌入汇编代码。不幸的是,此类API未包含在标准中(已从POSIX规范中删除)。但是它仍然可以用作glibc的一部分。



如何使这种机制产生位移?



如此处所示,此调度程序仅在线程将控制权显式转移回调度程序时才起作用。这对于一般程序(例如对于操作系统)来说是不好的,因为做得不好的线程会阻止所有其他线程的执行(尽管这并不能阻止在MS-DOS中使用协作式多任务处理!)这一切都取决于应用程序。



当使用非标准的“上下文外” API时,POSIX信号将存储先前执行的代码的上下文。通过将计时器设置为发出蜂鸣声,用户空间调度程序确实可以提供抢先式多任务处理的工作版本!这是另一个很酷的项目,值得单独撰写。



All Articles