From 0a556c4a9116b644669dbb0eca538b0a14dd1719 Mon Sep 17 00:00:00 2001 From: Leonardo de Moura Date: Sat, 12 Jul 2014 21:59:22 +0100 Subject: [PATCH] feat(util): add worker queue Signed-off-by: Leonardo de Moura --- src/tests/util/CMakeLists.txt | 3 + src/tests/util/worker_queue.cpp | 26 +++++++ src/util/worker_queue.h | 119 ++++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 src/tests/util/worker_queue.cpp create mode 100644 src/util/worker_queue.h diff --git a/src/tests/util/CMakeLists.txt b/src/tests/util/CMakeLists.txt index d801dce92..33b971353 100644 --- a/src/tests/util/CMakeLists.txt +++ b/src/tests/util/CMakeLists.txt @@ -73,6 +73,9 @@ add_test(trie ${CMAKE_CURRENT_BINARY_DIR}/trie) add_executable(lru_cache lru_cache.cpp) target_link_libraries(lru_cache ${EXTRA_LIBS}) add_test(lru_cache ${CMAKE_CURRENT_BINARY_DIR}/lru_cache) +add_executable(worker_queue worker_queue.cpp) +target_link_libraries(worker_queue ${EXTRA_LIBS}) +add_test(worker_queue ${CMAKE_CURRENT_BINARY_DIR}/worker_queue) # thread.cpp used import_test.lua add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/import_test.lua COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/import_test.lua ${CMAKE_CURRENT_BINARY_DIR}/import_test.lua diff --git a/src/tests/util/worker_queue.cpp b/src/tests/util/worker_queue.cpp new file mode 100644 index 000000000..2695da0d1 --- /dev/null +++ b/src/tests/util/worker_queue.cpp @@ -0,0 +1,26 @@ +/* +Copyright (c) 2013 Microsoft Corporation. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Author: Leonardo de Moura +*/ +#include +#include "util/test.h" +#include "util/worker_queue.h" +using namespace lean; + +static void tst1() { + worker_queue q(10); + for (unsigned i = 0; i < 100; i++) + q.add([=]() { for (unsigned j = 0; j < 1000000; j++) {} return i; }); + std::vector const & r = q.join(); + for (unsigned i = 0; i < r.size(); i++) + std::cout << r[i] << " "; + std::cout << "\n"; +} + +int main() { + save_stack_info(); + tst1(); + return has_violations() ? 1 : 0; +} diff --git a/src/util/worker_queue.h b/src/util/worker_queue.h new file mode 100644 index 000000000..542ed730c --- /dev/null +++ b/src/util/worker_queue.h @@ -0,0 +1,119 @@ +/* +Copyright (c) 2014 Microsoft Corporation. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Author: Leonardo de Moura +*/ +#pragma once +#include +#include +#include +#include "util/buffer.h" +#include "util/thread.h" +#include "util/interrupt.h" +#include "util/optional.h" +#include "util/exception.h" + +namespace lean { +template +class worker_queue { + typedef std::function task; + typedef std::unique_ptr thread_ptr; + typedef std::unique_ptr exception_ptr; + std::vector m_threads; + std::vector m_thread_exceptions; + std::vector m_todo; + std::vector m_result; + mutex m_result_mutex; + mutex m_todo_mutex; + condition_variable m_todo_cv; + unsigned m_todo_qhead; + atomic m_done; + atomic m_failed_thread; // if >= 0, it has the index of a failing thread. + atomic m_interrupted; + + optional next_task() { + while (true) { + check_interrupted(); + unique_lock lk(m_todo_mutex); + if (m_todo_qhead < m_todo.size()) { + task r = m_todo[m_todo_qhead]; + m_todo_qhead++; + return optional(r); + } else if (m_done) { + return optional(); + } else { + m_todo_cv.wait(lk); + } + } + } + + void add_result(T const & v) { + lock_guard l(m_result_mutex); + m_result.push_back(v); + } + +public: + worker_queue(unsigned num_threads):m_todo_qhead(0), m_done(false), m_failed_thread(-1), m_interrupted(false) { +#ifndef LEAN_MULTI_THREAD + num_threads = 0; +#endif + for (unsigned i = 0; i < num_threads; i++) + m_thread_exceptions.push_back(exception_ptr(nullptr)); + for (unsigned i = 0; i < num_threads; i++) { + m_threads.push_back(std::unique_ptr(new interruptible_thread([=]() { + try { + while (auto t = next_task()) { + add_result((*t)()); + } + m_todo_cv.notify_all(); + } catch (interrupted &) { + } catch (exception & ex) { + m_thread_exceptions[i].reset(ex.clone()); + m_failed_thread = i; + } catch (...) { + m_thread_exceptions[i].reset(new exception("thread failed for unknown reasons")); + m_failed_thread = i; + } + }))); + } + } + + ~worker_queue() { if (!m_done) join(); } + + void add(std::function const & fn) { + lean_assert(!m_done); + { + lock_guard l(m_todo_mutex); + m_todo.push_back(fn); + } + m_todo_cv.notify_one(); + } + + std::vector const & join() { + lean_assert(!m_done); + m_done = true; +#ifndef LEAN_MULTI_THREAD + for (auto const & fn : m_todo) { + m_result.push_back(fn()); + } + m_todo.clear(); +#else + m_todo_cv.notify_all(); + for (thread_ptr & t : m_threads) + t->join(); + if (m_failed_thread >= 0) + m_thread_exceptions[m_failed_thread]->rethrow(); + if (m_interrupted) + throw interrupted(); +#endif + return m_result; + } + + void interrupt() { + m_interrupted = true; + for (auto & t : m_threads) + t->request_interrupt(); + } +}; +}