feat(util): add worker queue
Signed-off-by: Leonardo de Moura <leonardo@microsoft.com>
This commit is contained in:
parent
ae2f019c23
commit
0a556c4a91
3 changed files with 148 additions and 0 deletions
|
@ -73,6 +73,9 @@ add_test(trie ${CMAKE_CURRENT_BINARY_DIR}/trie)
|
||||||
add_executable(lru_cache lru_cache.cpp)
|
add_executable(lru_cache lru_cache.cpp)
|
||||||
target_link_libraries(lru_cache ${EXTRA_LIBS})
|
target_link_libraries(lru_cache ${EXTRA_LIBS})
|
||||||
add_test(lru_cache ${CMAKE_CURRENT_BINARY_DIR}/lru_cache)
|
add_test(lru_cache ${CMAKE_CURRENT_BINARY_DIR}/lru_cache)
|
||||||
|
add_executable(worker_queue worker_queue.cpp)
|
||||||
|
target_link_libraries(worker_queue ${EXTRA_LIBS})
|
||||||
|
add_test(worker_queue ${CMAKE_CURRENT_BINARY_DIR}/worker_queue)
|
||||||
# thread.cpp used import_test.lua
|
# thread.cpp used import_test.lua
|
||||||
add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/import_test.lua
|
add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/import_test.lua
|
||||||
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/import_test.lua ${CMAKE_CURRENT_BINARY_DIR}/import_test.lua
|
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/import_test.lua ${CMAKE_CURRENT_BINARY_DIR}/import_test.lua
|
||||||
|
|
26
src/tests/util/worker_queue.cpp
Normal file
26
src/tests/util/worker_queue.cpp
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
/*
|
||||||
|
Copyright (c) 2013 Microsoft Corporation. All rights reserved.
|
||||||
|
Released under Apache 2.0 license as described in the file LICENSE.
|
||||||
|
|
||||||
|
Author: Leonardo de Moura
|
||||||
|
*/
|
||||||
|
#include <vector>
|
||||||
|
#include "util/test.h"
|
||||||
|
#include "util/worker_queue.h"
|
||||||
|
using namespace lean;
|
||||||
|
|
||||||
|
static void tst1() {
|
||||||
|
worker_queue<int> q(10);
|
||||||
|
for (unsigned i = 0; i < 100; i++)
|
||||||
|
q.add([=]() { for (unsigned j = 0; j < 1000000; j++) {} return i; });
|
||||||
|
std::vector<int> const & r = q.join();
|
||||||
|
for (unsigned i = 0; i < r.size(); i++)
|
||||||
|
std::cout << r[i] << " ";
|
||||||
|
std::cout << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
save_stack_info();
|
||||||
|
tst1();
|
||||||
|
return has_violations() ? 1 : 0;
|
||||||
|
}
|
119
src/util/worker_queue.h
Normal file
119
src/util/worker_queue.h
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
/*
|
||||||
|
Copyright (c) 2014 Microsoft Corporation. All rights reserved.
|
||||||
|
Released under Apache 2.0 license as described in the file LICENSE.
|
||||||
|
|
||||||
|
Author: Leonardo de Moura
|
||||||
|
*/
|
||||||
|
#pragma once
|
||||||
|
#include <memory>
|
||||||
|
#include <functional>
|
||||||
|
#include <vector>
|
||||||
|
#include "util/buffer.h"
|
||||||
|
#include "util/thread.h"
|
||||||
|
#include "util/interrupt.h"
|
||||||
|
#include "util/optional.h"
|
||||||
|
#include "util/exception.h"
|
||||||
|
|
||||||
|
namespace lean {
|
||||||
|
template<typename T>
|
||||||
|
class worker_queue {
|
||||||
|
typedef std::function<T()> task;
|
||||||
|
typedef std::unique_ptr<interruptible_thread> thread_ptr;
|
||||||
|
typedef std::unique_ptr<exception> exception_ptr;
|
||||||
|
std::vector<thread_ptr> m_threads;
|
||||||
|
std::vector<exception_ptr> m_thread_exceptions;
|
||||||
|
std::vector<task> m_todo;
|
||||||
|
std::vector<T> m_result;
|
||||||
|
mutex m_result_mutex;
|
||||||
|
mutex m_todo_mutex;
|
||||||
|
condition_variable m_todo_cv;
|
||||||
|
unsigned m_todo_qhead;
|
||||||
|
atomic<bool> m_done;
|
||||||
|
atomic<int> m_failed_thread; // if >= 0, it has the index of a failing thread.
|
||||||
|
atomic<bool> m_interrupted;
|
||||||
|
|
||||||
|
optional<task> next_task() {
|
||||||
|
while (true) {
|
||||||
|
check_interrupted();
|
||||||
|
unique_lock<mutex> lk(m_todo_mutex);
|
||||||
|
if (m_todo_qhead < m_todo.size()) {
|
||||||
|
task r = m_todo[m_todo_qhead];
|
||||||
|
m_todo_qhead++;
|
||||||
|
return optional<task>(r);
|
||||||
|
} else if (m_done) {
|
||||||
|
return optional<task>();
|
||||||
|
} else {
|
||||||
|
m_todo_cv.wait(lk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void add_result(T const & v) {
|
||||||
|
lock_guard<mutex> l(m_result_mutex);
|
||||||
|
m_result.push_back(v);
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
worker_queue(unsigned num_threads):m_todo_qhead(0), m_done(false), m_failed_thread(-1), m_interrupted(false) {
|
||||||
|
#ifndef LEAN_MULTI_THREAD
|
||||||
|
num_threads = 0;
|
||||||
|
#endif
|
||||||
|
for (unsigned i = 0; i < num_threads; i++)
|
||||||
|
m_thread_exceptions.push_back(exception_ptr(nullptr));
|
||||||
|
for (unsigned i = 0; i < num_threads; i++) {
|
||||||
|
m_threads.push_back(std::unique_ptr<interruptible_thread>(new interruptible_thread([=]() {
|
||||||
|
try {
|
||||||
|
while (auto t = next_task()) {
|
||||||
|
add_result((*t)());
|
||||||
|
}
|
||||||
|
m_todo_cv.notify_all();
|
||||||
|
} catch (interrupted &) {
|
||||||
|
} catch (exception & ex) {
|
||||||
|
m_thread_exceptions[i].reset(ex.clone());
|
||||||
|
m_failed_thread = i;
|
||||||
|
} catch (...) {
|
||||||
|
m_thread_exceptions[i].reset(new exception("thread failed for unknown reasons"));
|
||||||
|
m_failed_thread = i;
|
||||||
|
}
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
~worker_queue() { if (!m_done) join(); }
|
||||||
|
|
||||||
|
void add(std::function<T()> const & fn) {
|
||||||
|
lean_assert(!m_done);
|
||||||
|
{
|
||||||
|
lock_guard<mutex> l(m_todo_mutex);
|
||||||
|
m_todo.push_back(fn);
|
||||||
|
}
|
||||||
|
m_todo_cv.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<T> const & join() {
|
||||||
|
lean_assert(!m_done);
|
||||||
|
m_done = true;
|
||||||
|
#ifndef LEAN_MULTI_THREAD
|
||||||
|
for (auto const & fn : m_todo) {
|
||||||
|
m_result.push_back(fn());
|
||||||
|
}
|
||||||
|
m_todo.clear();
|
||||||
|
#else
|
||||||
|
m_todo_cv.notify_all();
|
||||||
|
for (thread_ptr & t : m_threads)
|
||||||
|
t->join();
|
||||||
|
if (m_failed_thread >= 0)
|
||||||
|
m_thread_exceptions[m_failed_thread]->rethrow();
|
||||||
|
if (m_interrupted)
|
||||||
|
throw interrupted();
|
||||||
|
#endif
|
||||||
|
return m_result;
|
||||||
|
}
|
||||||
|
|
||||||
|
void interrupt() {
|
||||||
|
m_interrupted = true;
|
||||||
|
for (auto & t : m_threads)
|
||||||
|
t->request_interrupt();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
Loading…
Reference in a new issue