From ff4c79e3df6c139682f0b6c75217e5cd3faf1109 Mon Sep 17 00:00:00 2001 From: Kevin Wolf Date: Tue, 14 Feb 2012 11:14:02 +0100 Subject: [PATCH 37/99] coroutine: introduce coroutines RH-Author: Kevin Wolf Message-id: <1329218101-24213-38-git-send-email-kwolf@redhat.com> Patchwork-id: 37229 O-Subject: [RHEL-6.3 qemu-kvm PATCH v2 37/96] coroutine: introduce coroutines Bugzilla: 783950 RH-Acked-by: Paolo Bonzini RH-Acked-by: Marcelo Tosatti RH-Acked-by: Laszlo Ersek Bugzilla: 783950 Asynchronous code is becoming very complex. At the same time synchronous code is growing because it is convenient to write. Sometimes duplicate code paths are even added, one synchronous and the other asynchronous. This patch introduces coroutines which allow code that looks synchronous but is asynchronous under the covers. A coroutine has its own stack and is therefore able to preserve state across blocking operations, which traditionally require callback functions and manual marshalling of parameters. Creating and starting a coroutine is easy: coroutine = qemu_coroutine_create(my_coroutine); qemu_coroutine_enter(coroutine, my_data); The coroutine then executes until it returns or yields: void coroutine_fn my_coroutine(void *opaque) { MyData *my_data = opaque; /* do some work */ qemu_coroutine_yield(); /* do some more work */ } Yielding switches control back to the caller of qemu_coroutine_enter(). This is typically used to switch back to the main thread's event loop after issuing an asynchronous I/O request. The request callback will then invoke qemu_coroutine_enter() once more to switch back to the coroutine. Note that if coroutines are used only from threads which hold the global mutex they will never execute concurrently. This makes programming with coroutines easier than with threads. Race conditions cannot occur since only one coroutine may be active at any time. Other coroutines can only run across yield. This coroutines implementation is based on the gtk-vnc implementation written by Anthony Liguori but it has been significantly rewritten by Kevin Wolf to use setjmp()/longjmp() instead of the more expensive swapcontext() and by Paolo Bonzini for Windows Fibers support. Signed-off-by: Kevin Wolf Signed-off-by: Stefan Hajnoczi (cherry picked from commit 00dccaf1f848290d979a4b1e6248281ce1b32aaa) Conflicts: Makefile.objs trace-events Signed-off-by: Kevin Wolf --- Makefile.objs | 7 ++ coroutine-ucontext.c | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++ coroutine-win32.c | 92 ++++++++++++++++++++ qemu-coroutine-int.h | 48 +++++++++++ qemu-coroutine.c | 75 ++++++++++++++++ qemu-coroutine.h | 95 +++++++++++++++++++++ trace-events | 5 + 7 files changed, 552 insertions(+), 0 deletions(-) create mode 100644 coroutine-ucontext.c create mode 100644 coroutine-win32.c create mode 100644 qemu-coroutine-int.h create mode 100644 qemu-coroutine.c create mode 100644 qemu-coroutine.h Signed-off-by: Michal Novotny --- Makefile.objs | 7 ++ coroutine-ucontext.c | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++ coroutine-win32.c | 92 ++++++++++++++++++++ qemu-coroutine-int.h | 48 +++++++++++ qemu-coroutine.c | 75 ++++++++++++++++ qemu-coroutine.h | 95 +++++++++++++++++++++ trace-events | 5 + 7 files changed, 552 insertions(+), 0 deletions(-) create mode 100644 coroutine-ucontext.c create mode 100644 coroutine-win32.c create mode 100644 qemu-coroutine-int.h create mode 100644 qemu-coroutine.c create mode 100644 qemu-coroutine.h diff --git a/Makefile.objs b/Makefile.objs index f05a23a..23696c1 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -5,6 +5,12 @@ qobject-obj-y += qjson.o json-lexer.o json-streamer.o json-parser.o qobject-obj-y += qerror.o ####################################################################### +# coroutines +coroutine-obj-y = qemu-coroutine.o +coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o +coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o + +####################################################################### # block-obj-y is code used by both qemu system emulation and qemu-img block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o async.o @@ -59,6 +65,7 @@ common-obj-y += qemu-thread.o common-obj-y += blockdev.o common-obj-y += $(net-obj-y) common-obj-y += readline.o console.o cursor.o +common-obj-y += $(coroutine-obj-y) common-obj-y += tcg-runtime.o host-utils.o common-obj-y += irq.o ioport.o diff --git a/coroutine-ucontext.c b/coroutine-ucontext.c new file mode 100644 index 0000000..41c2379 --- /dev/null +++ b/coroutine-ucontext.c @@ -0,0 +1,230 @@ +/* + * ucontext coroutine initialization code + * + * Copyright (C) 2006 Anthony Liguori + * Copyright (C) 2011 Kevin Wolf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.0 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see . + */ + +/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */ +#ifdef _FORTIFY_SOURCE +#undef _FORTIFY_SOURCE +#endif +#include +#include +#include +#include +#include +#include "qemu-common.h" +#include "qemu-coroutine-int.h" + +enum { + /* Maximum free pool size prevents holding too many freed coroutines */ + POOL_MAX_SIZE = 64, +}; + +typedef struct { + Coroutine base; + void *stack; + jmp_buf env; +} CoroutineUContext; + +/** + * Per-thread coroutine bookkeeping + */ +typedef struct { + /** Currently executing coroutine */ + Coroutine *current; + + /** Free list to speed up creation */ + QLIST_HEAD(, Coroutine) pool; + unsigned int pool_size; + + /** The default coroutine */ + CoroutineUContext leader; +} CoroutineThreadState; + +static pthread_key_t thread_state_key; + +/* + * va_args to makecontext() must be type 'int', so passing + * the pointer we need may require several int args. This + * union is a quick hack to let us do that + */ +union cc_arg { + void *p; + int i[2]; +}; + +static CoroutineThreadState *coroutine_get_thread_state(void) +{ + CoroutineThreadState *s = pthread_getspecific(thread_state_key); + + if (!s) { + s = qemu_mallocz(sizeof(*s)); + s->current = &s->leader.base; + QLIST_INIT(&s->pool); + pthread_setspecific(thread_state_key, s); + } + return s; +} + +static void qemu_coroutine_thread_cleanup(void *opaque) +{ + CoroutineThreadState *s = opaque; + Coroutine *co; + Coroutine *tmp; + + QLIST_FOREACH_SAFE(co, &s->pool, pool_next, tmp) { + qemu_free(DO_UPCAST(CoroutineUContext, base, co)->stack); + qemu_free(co); + } + qemu_free(s); +} + +static void __attribute__((constructor)) coroutine_init(void) +{ + int ret; + + ret = pthread_key_create(&thread_state_key, qemu_coroutine_thread_cleanup); + if (ret != 0) { + fprintf(stderr, "unable to create leader key: %s\n", strerror(errno)); + abort(); + } +} + +static void coroutine_trampoline(int i0, int i1) +{ + union cc_arg arg; + CoroutineUContext *self; + Coroutine *co; + + arg.i[0] = i0; + arg.i[1] = i1; + self = arg.p; + co = &self->base; + + /* Initialize longjmp environment and switch back the caller */ + if (!setjmp(self->env)) { + longjmp(*(jmp_buf *)co->entry_arg, 1); + } + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +static Coroutine *coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineUContext *co; + ucontext_t old_uc, uc; + jmp_buf old_env; + union cc_arg arg; + + /* The ucontext functions preserve signal masks which incurs a system call + * overhead. setjmp()/longjmp() does not preserve signal masks but only + * works on the current stack. Since we need a way to create and switch to + * a new stack, use the ucontext functions for that but setjmp()/longjmp() + * for everything else. + */ + + if (getcontext(&uc) == -1) { + abort(); + } + + co = qemu_mallocz(sizeof(*co)); + co->stack = qemu_malloc(stack_size); + co->base.entry_arg = &old_env; /* stash away our jmp_buf */ + + uc.uc_link = &old_uc; + uc.uc_stack.ss_sp = co->stack; + uc.uc_stack.ss_size = stack_size; + uc.uc_stack.ss_flags = 0; + + arg.p = co; + + makecontext(&uc, (void (*)(void))coroutine_trampoline, + 2, arg.i[0], arg.i[1]); + + /* swapcontext() in, longjmp() back out */ + if (!setjmp(old_env)) { + swapcontext(&old_uc, &uc); + } + return &co->base; +} + +Coroutine *qemu_coroutine_new(void) +{ + CoroutineThreadState *s = coroutine_get_thread_state(); + Coroutine *co; + + co = QLIST_FIRST(&s->pool); + if (co) { + QLIST_REMOVE(co, pool_next); + s->pool_size--; + } else { + co = coroutine_new(); + } + return co; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineThreadState *s = coroutine_get_thread_state(); + CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_); + + if (s->pool_size < POOL_MAX_SIZE) { + QLIST_INSERT_HEAD(&s->pool, &co->base, pool_next); + co->base.caller = NULL; + s->pool_size++; + return; + } + + qemu_free(co->stack); + qemu_free(co); +} + +CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); + CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); + CoroutineThreadState *s = coroutine_get_thread_state(); + int ret; + + s->current = to_; + + ret = setjmp(from->env); + if (ret == 0) { + longjmp(to->env, action); + } + return ret; +} + +Coroutine *qemu_coroutine_self(void) +{ + CoroutineThreadState *s = coroutine_get_thread_state(); + + return s->current; +} + +bool qemu_in_coroutine(void) +{ + CoroutineThreadState *s = pthread_getspecific(thread_state_key); + + return s && s->current->caller; +} diff --git a/coroutine-win32.c b/coroutine-win32.c new file mode 100644 index 0000000..0e29448 --- /dev/null +++ b/coroutine-win32.c @@ -0,0 +1,92 @@ +/* + * Win32 coroutine initialization code + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu-common.h" +#include "qemu-coroutine-int.h" + +typedef struct +{ + Coroutine base; + + LPVOID fiber; + CoroutineAction action; +} CoroutineWin32; + +static __thread CoroutineWin32 leader; +static __thread Coroutine *current; + +CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, + CoroutineAction action) +{ + CoroutineWin32 *from = DO_UPCAST(CoroutineWin32, base, from_); + CoroutineWin32 *to = DO_UPCAST(CoroutineWin32, base, to_); + + current = to_; + + to->action = action; + SwitchToFiber(to->fiber); + return from->action; +} + +static void CALLBACK coroutine_trampoline(void *co_) +{ + Coroutine *co = co_; + + while (true) { + co->entry(co->entry_arg); + qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); + } +} + +Coroutine *qemu_coroutine_new(void) +{ + const size_t stack_size = 1 << 20; + CoroutineWin32 *co; + + co = qemu_mallocz(sizeof(*co)); + co->fiber = CreateFiber(stack_size, coroutine_trampoline, &co->base); + return &co->base; +} + +void qemu_coroutine_delete(Coroutine *co_) +{ + CoroutineWin32 *co = DO_UPCAST(CoroutineWin32, base, co_); + + DeleteFiber(co->fiber); + qemu_free(co); +} + +Coroutine *qemu_coroutine_self(void) +{ + if (!current) { + current = &leader.base; + leader.fiber = ConvertThreadToFiber(NULL); + } + return current; +} + +bool qemu_in_coroutine(void) +{ + return current && current->caller; +} diff --git a/qemu-coroutine-int.h b/qemu-coroutine-int.h new file mode 100644 index 0000000..64915c2 --- /dev/null +++ b/qemu-coroutine-int.h @@ -0,0 +1,48 @@ +/* + * Coroutine internals + * + * Copyright (c) 2011 Kevin Wolf + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#ifndef QEMU_COROUTINE_INT_H +#define QEMU_COROUTINE_INT_H + +#include "qemu-queue.h" +#include "qemu-coroutine.h" + +typedef enum { + COROUTINE_YIELD = 1, + COROUTINE_TERMINATE = 2, +} CoroutineAction; + +struct Coroutine { + CoroutineEntry *entry; + void *entry_arg; + Coroutine *caller; + QLIST_ENTRY(Coroutine) pool_next; +}; + +Coroutine *qemu_coroutine_new(void); +void qemu_coroutine_delete(Coroutine *co); +CoroutineAction qemu_coroutine_switch(Coroutine *from, Coroutine *to, + CoroutineAction action); + +#endif diff --git a/qemu-coroutine.c b/qemu-coroutine.c new file mode 100644 index 0000000..600be26 --- /dev/null +++ b/qemu-coroutine.c @@ -0,0 +1,75 @@ +/* + * QEMU coroutines + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * Kevin Wolf + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#include "trace.h" +#include "qemu-common.h" +#include "qemu-coroutine.h" +#include "qemu-coroutine-int.h" + +Coroutine *qemu_coroutine_create(CoroutineEntry *entry) +{ + Coroutine *co = qemu_coroutine_new(); + co->entry = entry; + return co; +} + +static void coroutine_swap(Coroutine *from, Coroutine *to) +{ + CoroutineAction ret; + + ret = qemu_coroutine_switch(from, to, COROUTINE_YIELD); + + switch (ret) { + case COROUTINE_YIELD: + return; + case COROUTINE_TERMINATE: + trace_qemu_coroutine_terminate(to); + qemu_coroutine_delete(to); + return; + default: + abort(); + } +} + +void qemu_coroutine_enter(Coroutine *co, void *opaque) +{ + Coroutine *self = qemu_coroutine_self(); + + trace_qemu_coroutine_enter(self, co, opaque); + + if (co->caller) { + fprintf(stderr, "Co-routine re-entered recursively\n"); + abort(); + } + + co->caller = self; + co->entry_arg = opaque; + coroutine_swap(self, co); +} + +void coroutine_fn qemu_coroutine_yield(void) +{ + Coroutine *self = qemu_coroutine_self(); + Coroutine *to = self->caller; + + trace_qemu_coroutine_yield(self, to); + + if (!to) { + fprintf(stderr, "Co-routine is yielding to no one\n"); + abort(); + } + + self->caller = NULL; + coroutine_swap(self, to); +} diff --git a/qemu-coroutine.h b/qemu-coroutine.h new file mode 100644 index 0000000..08255c7 --- /dev/null +++ b/qemu-coroutine.h @@ -0,0 +1,95 @@ +/* + * QEMU coroutine implementation + * + * Copyright IBM, Corp. 2011 + * + * Authors: + * Stefan Hajnoczi + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + * + */ + +#ifndef QEMU_COROUTINE_H +#define QEMU_COROUTINE_H + +#include + +/** + * Coroutines are a mechanism for stack switching and can be used for + * cooperative userspace threading. These functions provide a simple but + * useful flavor of coroutines that is suitable for writing sequential code, + * rather than callbacks, for operations that need to give up control while + * waiting for events to complete. + * + * These functions are re-entrant and may be used outside the global mutex. + */ + +/** + * Mark a function that executes in coroutine context + * + * Functions that execute in coroutine context cannot be called directly from + * normal functions. In the future it would be nice to enable compiler or + * static checker support for catching such errors. This annotation might make + * it possible and in the meantime it serves as documentation. + * + * For example: + * + * static void coroutine_fn foo(void) { + * .... + * } + */ +#define coroutine_fn + +typedef struct Coroutine Coroutine; + +/** + * Coroutine entry point + * + * When the coroutine is entered for the first time, opaque is passed in as an + * argument. + * + * When this function returns, the coroutine is destroyed automatically and + * execution continues in the caller who last entered the coroutine. + */ +typedef void coroutine_fn CoroutineEntry(void *opaque); + +/** + * Create a new coroutine + * + * Use qemu_coroutine_enter() to actually transfer control to the coroutine. + */ +Coroutine *qemu_coroutine_create(CoroutineEntry *entry); + +/** + * Transfer control to a coroutine + * + * The opaque argument is passed as the argument to the entry point when + * entering the coroutine for the first time. It is subsequently ignored. + */ +void qemu_coroutine_enter(Coroutine *coroutine, void *opaque); + +/** + * Transfer control back to a coroutine's caller + * + * This function does not return until the coroutine is re-entered using + * qemu_coroutine_enter(). + */ +void coroutine_fn qemu_coroutine_yield(void); + +/** + * Get the currently executing coroutine + */ +Coroutine *coroutine_fn qemu_coroutine_self(void); + +/** + * Return whether or not currently inside a coroutine + * + * This can be used to write functions that work both when in coroutine context + * and when not in coroutine context. Note that such functions cannot use the + * coroutine_fn annotation since they work outside coroutine context. + */ +bool qemu_in_coroutine(void); + +#endif /* QEMU_COROUTINE_H */ diff --git a/trace-events b/trace-events index 0f8ff55..63f1c3e 100644 --- a/trace-events +++ b/trace-events @@ -159,3 +159,8 @@ disable qed_aio_write_data(void *s, void *acb, int ret, uint64_t offset, size_t disable qed_aio_write_prefill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64"" disable qed_aio_write_postfill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64"" disable qed_aio_write_main(void *s, void *acb, int ret, uint64_t offset, size_t len) "s %p acb %p ret %d offset %"PRIu64" len %zu" + +# qemu-coroutine.c +disable qemu_coroutine_enter(void *from, void *to, void *opaque) "from %p to %p opaque %p" +disable qemu_coroutine_yield(void *from, void *to) "from %p to %p" +disable qemu_coroutine_terminate(void *co) "self %p" -- 1.7.7.5