feat(boost): implement multi-threading support using Boost

To use Boost instead of the standard library, we must use the cmake option
    -D BOOST=ON

Signed-off-by: Leonardo de Moura <leonardo@microsoft.com>
This commit is contained in:
Leonardo de Moura 2013-12-09 16:55:13 -08:00
parent 533ed51f51
commit e7ae749221
12 changed files with 139 additions and 59 deletions

View file

@ -8,7 +8,7 @@ enable_testing()
option(TRACK_MEMORY_USAGE "TRACK_MEMORY_USAGE" ON)
option(MULTI_THREAD "MULTI_THREAD" ON)
option(BOOST "BOOST" OFF)
# Added for CTest
INCLUDE(CTest)
CONFIGURE_FILE(${LEAN_SOURCE_DIR}/CTestCustom.cmake.in
@ -67,6 +67,14 @@ else ()
message(FATAL_ERROR "Your C++ compiler does not support C++11.")
endif ()
# BOOST
if (("${BOOST}" MATCHES "ON") AND ("${MULTI_THREAD}" MATCHES "ON"))
find_package(Boost 1.54 COMPONENTS system thread atomic chrono REQUIRED)
message(STATUS "Boost library will be used to implement multi-threading support")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D LEAN_USE_BOOST")
set(EXTRA_LIBS ${EXTRA_LIBS} -lboost_system -lboost_chrono -lboost_atomic -lboost_thread)
endif()
# MPFR
find_package(MPFR 3.1.0)
include_directories(${MPFR_INCLUDES})

View file

@ -5,7 +5,6 @@ Released under Apache 2.0 license as described in the file LICENSE.
Author: Leonardo de Moura
*/
#include <algorithm>
#include <chrono>
#include "util/thread.h"
#include "util/test.h"
#include "util/trace.h"
@ -219,7 +218,7 @@ static void tst5() {
env.add_var("f", Bool >> (Bool >> Bool));
env.add_var("a", Bool);
normalizer proc(env);
std::chrono::milliseconds dura(50);
chrono::milliseconds dura(50);
interruptible_thread thread([&]() {
try {
proc(t);

View file

@ -5,7 +5,6 @@ Released under Apache 2.0 license as described in the file LICENSE.
Author: Leonardo de Moura
*/
#include <iostream>
#include <chrono>
#include <string>
#include "util/thread.h"
#include "util/test.h"
@ -227,7 +226,7 @@ static void tst12() {
env.add_var("f", Int >> (Int >> Int));
env.add_var("a", Int);
type_checker checker(env);
std::chrono::milliseconds dura(100);
chrono::milliseconds dura(100);
interruptible_thread thread([&]() {
try {
std::cout << checker.infer_type(t) << "\n";

View file

@ -21,7 +21,7 @@ static void tst1() {
lean_assert(lean::get_allocated_memory() >= old_mem + N * 2 * sizeof(N));
std::cout << "Total: " << static_cast<size_t>(lean::get_allocated_memory()) << "\n";
std::cout << "Thread: " << static_cast<size_t>(lean::get_thread_allocated_memory()) << "\n";
#if !defined(HAS_TCMALLOC)
#if !defined(HAS_TCMALLOC) && !defined(LEAN_USE_BOOST)
// When TCMALLOC is used, there is a problem during initialization, and the value of get_thread_allocated_memory is off.
lean_assert_eq(lean::get_allocated_memory(), static_cast<size_t>(lean::get_thread_allocated_memory()));
#endif

View file

@ -57,7 +57,7 @@ static void tst3() {
shared_mutex mutex;
atomic<bool> t2_started(false);
atomic<bool> t2_done(false);
std::chrono::milliseconds small_delay(10);
chrono::milliseconds small_delay(10);
thread t1([&]() {
while (!t2_started) {
@ -92,7 +92,7 @@ static void tst4() {
shared_mutex mutex;
atomic<bool> t2_started(false);
atomic<bool> t2_done(false);
std::chrono::milliseconds small_delay(10);
chrono::milliseconds small_delay(10);
thread t1([&]() {
while (!t2_started) {
@ -127,7 +127,7 @@ static void tst5() {
shared_mutex mutex;
atomic<bool> t2_started(false);
atomic<bool> t1_done(false);
std::chrono::milliseconds small_delay(10);
chrono::milliseconds small_delay(10);
thread t1([&]() {
while (!t2_started) {
@ -161,7 +161,7 @@ static void tst6() {
interruptible_thread t1([]() {
try {
// Remark: this_thread::sleep_for does not check whether the thread has been interrupted or not.
// this_thread::sleep_for(std::chrono::milliseconds(1000000));
// this_thread::sleep_for(chrono::milliseconds(1000000));
sleep_for(1000000);
} catch (interrupted &) {
std::cout << "interrupted...\n";

View file

@ -2,6 +2,6 @@ add_library(util trace.cpp debug.cpp name.cpp name_set.cpp
exception.cpp interrupt.cpp hash.cpp escaped.cpp bit_tricks.cpp
safe_arith.cpp ascii.cpp memory.cpp shared_mutex.cpp realpath.cpp
script_state.cpp script_exception.cpp splay_map.cpp lua.cpp
luaref.cpp stackinfo.cpp)
luaref.cpp stackinfo.cpp thread.cpp)
target_link_libraries(util ${LEAN_LIBS})

View file

@ -4,7 +4,7 @@ Released under Apache 2.0 license as described in the file LICENSE.
Author: Leonardo de Moura
*/
#include <chrono>
#include "util/thread.h"
#include "util/interrupt.h"
#include "util/exception.h"
@ -34,8 +34,8 @@ void sleep_for(unsigned ms, unsigned step_ms) {
if (step_ms == 0)
step_ms = 1;
unsigned rounds = ms / step_ms;
std::chrono::milliseconds c(step_ms);
std::chrono::milliseconds r(ms % step_ms);
chrono::milliseconds c(step_ms);
chrono::milliseconds r(ms % step_ms);
for (unsigned i = 0; i < rounds; i++) {
this_thread::sleep_for(c);
check_interrupted();
@ -62,7 +62,7 @@ void interruptible_thread::request_interrupt(unsigned try_ms) {
f->store(true);
return;
}
this_thread::sleep_for(std::chrono::milliseconds(try_ms));
this_thread::sleep_for(chrono::milliseconds(try_ms));
check_interrupted();
}
}

View file

@ -45,6 +45,53 @@ void sleep_for(unsigned ms, unsigned step_ms = g_small_sleep);
\brief Thread that provides a method for setting its interrupt flag.
*/
class interruptible_thread {
public:
#if !defined(LEAN_USE_BOOST)
template<typename Function, typename... Args>
interruptible_thread(Function && fun, Args &&... args):
m_flag_addr(nullptr),
m_thread(
[&](Function&& fun, Args&&... args) {
m_flag_addr.store(get_flag_addr());
save_stack_info(false);
fun(std::forward<Args>(args)...);
m_flag_addr.store(&m_dummy_addr); // see comment before m_dummy_addr
},
std::forward<Function>(fun),
std::forward<Args>(args)...)
{}
#else
// Simpler version that works with Boost, and set stack size
private:
std::function<void()> m_fun;
static void execute(interruptible_thread * _this) {
_this->m_flag_addr.store(get_flag_addr());
save_stack_info(false);
_this->m_fun();
_this->m_flag_addr.store(&(_this->m_dummy_addr)); // see comment before m_dummy_addr
}
public:
template<typename Function>
interruptible_thread(Function && fun):m_fun(fun), m_flag_addr(nullptr), m_thread(get_thread_attributes(), boost::bind(execute, this)) {}
#endif
/**
\brief Return true iff an interrupt request has been made to the current thread.
*/
bool interrupted() const;
/**
\brief Send a interrupt request to the current thread. Return
true iff the request has been successfully performed.
\remark The main thread may have to wait the interrupt flag of this thread to
be initialized. If the flag was not initialized, then the main thread will be put
to sleep for \c try_ms milliseconds until it tries to set the flag again.
*/
void request_interrupt(unsigned try_ms = g_small_sleep);
void join();
bool joinable();
private:
atomic<atomic_bool*> m_flag_addr;
/*
The following auxiliary field is used to workaround a nasty bug
@ -64,37 +111,6 @@ class interruptible_thread {
atomic_bool m_dummy_addr;
thread m_thread;
static atomic_bool * get_flag_addr();
public:
template<typename Function, typename... Args>
interruptible_thread(Function && fun, Args &&... args):
m_flag_addr(nullptr),
m_thread(
[&](Function&& fun, Args&&... args) {
m_flag_addr.store(get_flag_addr());
save_stack_info(false);
fun(std::forward<Args>(args)...);
m_flag_addr.store(&m_dummy_addr); // see comment before m_dummy_addr
},
std::forward<Function>(fun),
std::forward<Args>(args)...)
{}
/**
\brief Return true iff an interrupt request has been made to the current thread.
*/
bool interrupted() const;
/**
\brief Send a interrupt request to the current thread. Return
true iff the request has been successfully performed.
\remark The main thread may have to wait the interrupt flag of this thread to
be initialized. If the flag was not initialized, then the main thread will be put
to sleep for \c try_ms milliseconds until it tries to set the flag again.
*/
void request_interrupt(unsigned try_ms = g_small_sleep);
void join();
bool joinable();
};
#if !defined(LEAN_MULTI_THREAD)

View file

@ -241,12 +241,12 @@ lazy_list<T> timeout(lazy_list<T> const & l, unsigned ms, unsigned check_ms = g_
done = true;
});
try {
auto start = std::chrono::steady_clock::now();
std::chrono::milliseconds d(ms);
std::chrono::milliseconds small(check_ms);
auto start = chrono::steady_clock::now();
chrono::milliseconds d(ms);
chrono::milliseconds small(check_ms);
while (!done) {
auto curr = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(curr - start) > d)
auto curr = chrono::steady_clock::now();
if (chrono::duration_cast<chrono::milliseconds>(curr - start) > d)
break;
check_interrupted();
this_thread::sleep_for(small);
@ -301,7 +301,7 @@ lazy_list<T> par(lazy_list<T> const & l1, lazy_list<T> const & l2, unsigned chec
done2 = true;
});
try {
std::chrono::milliseconds small(check_ms);
chrono::milliseconds small(check_ms);
while (!done1 && !done2) {
check_interrupted();
this_thread::sleep_for(small);

View file

@ -5,7 +5,6 @@ Released under Apache 2.0 license as described in the file LICENSE.
Author: Leonardo de Moura
*/
#include <iostream>
#include <chrono>
#include <string>
#include <vector>
#include "util/thread.h"
@ -304,7 +303,7 @@ static void open_state(lua_State * L) {
// TODO(Leo): allow the user to change it?
#define SMALL_DELAY 10 // in ms
std::chrono::milliseconds g_small_delay(SMALL_DELAY);
chrono::milliseconds g_small_delay(SMALL_DELAY);
#if defined(LEAN_MULTI_THREAD)
/**
@ -350,7 +349,7 @@ public:
lua_State * channel = m_channel.m_ptr->m_state;
if (i > 0) {
// i is the position of the timeout argument
std::chrono::milliseconds dura(luaL_checkinteger(tgt, i));
chrono::milliseconds dura(luaL_checkinteger(tgt, i));
if (lua_gettop(channel) == m_ini)
m_cv.wait_for(lock, dura);
if (lua_gettop(channel) == m_ini) {

27
src/util/thread.cpp Normal file
View file

@ -0,0 +1,27 @@
/*
Copyright (c) 2013 Microsoft Corporation. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Author: Leonardo de Moura
*/
#include "util/thread.h"
namespace lean {
#if defined(LEAN_USE_BOOST)
static boost::thread::attributes g_thread_attributes;
class init_thread_attributes {
public:
init_thread_attributes() {
g_thread_attributes.set_stack_size(8192*1024); // 8Mb
}
};
static init_thread_attributes g_init_thread_attributes;
void set_thread_stack_size(size_t sz) {
g_thread_attributes.set_stack_size(sz);
}
boost::thread::attributes const & get_thread_attributes() {
return g_thread_attributes;
}
#endif
}

View file

@ -6,12 +6,16 @@ Author: Leonardo de Moura
*/
#pragma once
#if defined(LEAN_MULTI_THREAD)
#if !defined(LEAN_USE_BOOST)
// MULTI THREADING SUPPORT BASED ON THE STANDARD LIBRARY
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
#define LEAN_THREAD_LOCAL thread_local
namespace lean {
inline void set_thread_stack_size(size_t ) {}
using std::thread;
using std::mutex;
using std::atomic;
@ -24,13 +28,42 @@ using std::atomic_load;
using std::atomic_fetch_add_explicit;
using std::atomic_fetch_sub_explicit;
using std::memory_order_relaxed;
namespace chrono = std::chrono;
namespace this_thread = std::this_thread;
}
#else
// MULTI THREADING SUPPORT BASED ON THE BOOST LIBRARY
#include <boost/thread.hpp>
#define LEAN_THREAD_LOCAL thread_local
namespace lean {
void set_thread_stack_size(size_t );
boost::thread::attributes const & get_thread_attributes();
using boost::thread;
using boost::mutex;
using boost::atomic;
using boost::memory_order_relaxed;
using boost::condition_variable;
using boost::unique_lock;
using boost::lock_guard;
namespace chrono = boost::chrono;
namespace this_thread = boost::this_thread;
typedef atomic<bool> atomic_bool;
typedef atomic<unsigned short> atomic_ushort;
template<typename T> T atomic_load(atomic<T> const * a) { return a->load(); }
template<typename T> T atomic_fetch_add_explicit(atomic<T> * a, T v, boost::memory_order mo) { return a->fetch_add(v, mo); }
template<typename T> T atomic_fetch_sub_explicit(atomic<T> * a, T v, boost::memory_order mo) { return a->fetch_sub(v, mo); }
}
#endif
#else
// NO MULTI THREADING SUPPORT
#include <utility>
#include <chrono>
#include <cstdlib>
#define LEAN_THREAD_LOCAL
namespace lean {
inline void set_thread_stack_size(size_t ) {}
namespace chrono {
typedef unsigned milliseconds;
}
constexpr int memory_order_relaxed = 0;
template<typename T>
class atomic {
@ -73,8 +106,7 @@ public:
};
class this_thread {
public:
template<typename Rep, typename Period>
static void sleep_for(std::chrono::duration<Rep, Period> const &) {}
static void sleep_for(chrono::milliseconds const &) {}
static thread::id get_id() { return 0; }
static void yield() {}
};