From 21905289fa0f99816e36fdc1340c4aecd7b3ca1f Mon Sep 17 00:00:00 2001 From: Leonardo de Moura Date: Wed, 21 May 2014 18:45:19 -0700 Subject: [PATCH] feat(library/module): add module import procedure The modules are processed in parallel. Signed-off-by: Leonardo de Moura --- src/library/module.cpp | 257 ++++++++++++++++++++++++++++++++++++++--- src/library/module.h | 4 +- 2 files changed, 240 insertions(+), 21 deletions(-) diff --git a/src/library/module.cpp b/src/library/module.cpp index 8bdf354c2..729b85593 100644 --- a/src/library/module.cpp +++ b/src/library/module.cpp @@ -5,20 +5,29 @@ Released under Apache 2.0 license as described in the file LICENSE. Author: Leonardo de Moura */ #include +#include #include #include +#include +#include +#include #include "util/hash.h" +#include "util/thread.h" +#include "util/lean_path.h" +#include "util/sstream.h" +#include "util/buffer.h" +#include "util/interrupt.h" #include "kernel/type_checker.h" #include "library/module.h" #include "library/kernel_serializer.h" +#include "version.h" namespace lean { typedef std::pair> writer; -typedef std::pair import_info; // imported module and its hashcode struct module_ext : public environment_extension { - list m_direct_imports; - list m_writers; + list m_direct_imports; + list m_writers; }; struct module_ext_reg { @@ -34,9 +43,12 @@ static environment update(environment const & env, module_ext const & ext) { return env.update(g_ext.m_ext_id, std::make_shared(ext)); } +static char const * g_olean_end_file = "EndFile"; +static char const * g_olean_header = "oleanfile"; + void export_module(std::ostream & out, environment const & env) { module_ext const & ext = get_extension(env); - buffer imports; + buffer imports; buffer writers; to_buffer(ext.m_direct_imports, imports); std::reverse(imports.begin(), imports.end()); @@ -44,23 +56,26 @@ void export_module(std::ostream & out, environment const & env) { writers.push_back(&w); std::reverse(writers.begin(), writers.end()); - std::string r; - std::ostringstream out1(r); + std::ostringstream out1(std::ios_base::binary); serializer s1(out1); - // store imported files - s1 << imports.size(); - for (auto p : imports) - s1 << p.first << p.second; - // store objects for (auto p : writers) { s1 << p->first; p->second(s1); } + s1 << g_olean_end_file; serializer s2(out); - unsigned h = hash_str(r.size(), r.c_str(), 13); + std::string r = out1.str(); + unsigned h = hash(r.size(), [&](unsigned i) { return r[i]; }); + s2 << g_olean_header << LEAN_VERSION_MAJOR << LEAN_VERSION_MINOR; + s2 << h; + // store imported files + s2 << imports.size(); + for (auto m : imports) + s2 << m; + // store object code s2 << h; for (unsigned i = 0; i < r.size(); i++) s2.write_char(r[i]); @@ -89,7 +104,7 @@ environment add(environment const & env, std::string const & k, std::function & add_asynch_update, + std::function & add_asynch_update, std::function &) { declaration decl = read_declaration(d, midx); environment env = senv.env(); @@ -118,13 +133,217 @@ environment add(environment const & env, certified_declaration const & d) { return new_env; } -environment import_modules(environment const & env, unsigned num_modules, std::string const * modules) { - // TODO(Leo) - for (unsigned i = 0; i < num_modules; i++) std::cout << modules[i]; - return env; +struct import_modules_fn { + typedef std::tuple delayed_update; + shared_environment m_senv; + unsigned m_num_threads; + mutex m_asynch_mutex; + condition_variable m_asynch_cv; + std::vector m_asynch_tasks; + mutex m_delayed_mutex; + std::vector m_delayed_tasks; + atomic m_import_counter; // number of modules to be processed + atomic m_all_modules_imported; + + struct module_info { + std::string m_name; + std::string m_fname; + atomic m_counter; // number of dependencies to be processed + unsigned m_module_idx; + std::vector> m_dependents; + std::vector m_obj_code; + module_info():m_counter(0), m_module_idx(0) {} + }; + typedef std::shared_ptr module_info_ptr; + std::unordered_map m_module_info; + + import_modules_fn(environment const & env, unsigned num_threads): + m_senv(env), m_num_threads(num_threads), + m_import_counter(0), m_all_modules_imported(false) { + if (m_num_threads == 0) + m_num_threads = 1; + } + + module_info_ptr load_module_file(std::string const & mname) { + auto it = m_module_info.find(mname); + if (it != m_module_info.end()) + return it->second; + std::string fname = find_file(mname, {".olean"}); + std::ifstream in(fname, std::ifstream::binary); + if (!in.good()) + throw exception(sstream() << "failed to open file '" << fname << "'"); + deserializer d1(in); + std::string header; + d1 >> header; + if (header != g_olean_header) + throw exception(sstream() << "file '" << fname << "' does not seem to be a valid object Lean file"); + unsigned major, minor, claimed_hash; + d1 >> major >> minor >> claimed_hash; + // Enforce version? + + unsigned num_imports = d1.read_unsigned(); + buffer imports; + for (unsigned i = 0; i < num_imports; i++) + imports.push_back(d1.read_string()); + + unsigned code_size = d1.read_unsigned(); + std::vector code(code_size); + for (unsigned i = 0; i < code_size; i++) + code[i] = d1.read_char(); + + unsigned computed_hash = hash(code_size, [&](unsigned i) { return code[i]; }); + if (claimed_hash != computed_hash) + throw exception(sstream() << "file '" << fname << "' has been corrupted"); + + module_info_ptr r = std::make_shared(); + r->m_name = mname; + r->m_fname = fname; + r->m_counter = imports.size(); + r->m_module_idx = m_import_counter; + m_import_counter++; + std::swap(r->m_obj_code, code); + + for (auto i : imports) { + r->m_dependents.push_back(load_module_file(i)); + } + + if (imports.empty()) + add_import_module_task(r); + + return r; + } + + void add_asynch_task(asynch_update_fn const & f) { + { + lock_guard l(m_asynch_mutex); + m_asynch_tasks.push_back(f); + } + m_asynch_cv.notify_one(); + } + + void add_import_module_task(module_info_ptr const & r) { + add_asynch_task([=](shared_environment &) { import_module(r); }); + } + + void import_module(module_info_ptr const & r) { + std::string s(r->m_obj_code.data(), r->m_obj_code.size()); + std::istringstream in(s, std::ios_base::binary); + deserializer d(in); + unsigned obj_counter = 0; + std::function add_asynch_update([&](asynch_update_fn const & f) { + add_asynch_task(f); + }); + std::function add_delayed_update([&](delayed_update_fn const & f) { + lock_guard lk(m_delayed_mutex); + m_delayed_tasks.push_back(std::make_tuple(r->m_module_idx, obj_counter, f)); + }); + while (true) { + check_interrupted(); + std::string k; + d >> k; + if (k == g_olean_end_file) { + break; + } else { + object_readers & readers = get_object_readers(); + auto it = readers.find(k); + if (it == readers.end()) + throw exception(sstream() << "file '" << r->m_fname << "' has been corrupted"); + it->second(d, r->m_module_idx, m_senv, add_asynch_update, add_delayed_update); + obj_counter++; + } + } + if (atomic_fetch_sub_explicit(&m_import_counter, 1u, memory_order_relaxed) == 1u) + m_all_modules_imported = true; + // Module was successfully imported, we should notify descendents. + for (module_info_ptr const & d : r->m_dependents) { + if (atomic_fetch_sub_explicit(&(d->m_counter), 1u, memory_order_relaxed) == 1u) { + // all d's dependencies have been processed + add_import_module_task(d); + } + } + } + + optional next_task() { + while (true) { + check_interrupted(); + unique_lock lk(m_asynch_mutex); + if (m_all_modules_imported) + return optional(); + if (!m_asynch_tasks.empty()) { + asynch_update_fn r = m_asynch_tasks.back(); + m_asynch_tasks.pop_back(); + return optional(r); + } else { + m_asynch_cv.wait(lk); + } + } + } + + void process_asynch_tasks() { + std::vector> extra_threads; + std::vector> thread_exceptions(m_num_threads - 1); + for (unsigned i = 0; i < m_num_threads - 1; i++) { + extra_threads.push_back(std::unique_ptr(new interruptible_thread([=, &thread_exceptions]() { + try { + while (auto t = next_task()) { + (*t)(m_senv); + } + } catch (exception ex) { + thread_exceptions[i].reset(ex.clone()); + } catch (...) { + thread_exceptions[i].reset(new exception("module import thread failed for unknown reasons")); + } + }))); + } + try { + while (auto t = next_task()) { + (*t)(m_senv); + } + for (auto & th : extra_threads) + th->join(); + } catch (...) { + for (auto & th : extra_threads) + th->request_interrupt(); + for (auto & th : extra_threads) + th->join(); + throw; + } + for (auto const & ex : thread_exceptions) { + if (ex.get()) + ex->rethrow(); + } + } + + environment process_delayed_tasks() { + environment env = m_senv.env(); + // Sort delayed tasks using lexicographical order on (module-idx, obj-idx). + // obj-idx is the object's position in the module. + std::sort(m_delayed_tasks.begin(), m_delayed_tasks.end(), + [](delayed_update const & u1, delayed_update const & u2) { + if (std::get<0>(u1) != std::get<0>(u2)) + return std::get<0>(u1) < std::get<0>(u2); + else + return std::get<1>(u1) < std::get<1>(u2); + }); + for (auto const & d : m_delayed_tasks) { + env = std::get<2>(d)(env); + } + return env; + } + + environment operator()(unsigned num_modules, std::string const * modules) { + for (unsigned i = 0; i < num_modules; i++) + load_module_file(modules[i]); + process_asynch_tasks(); + return process_delayed_tasks(); + } +}; + +environment import_modules(environment const & env, unsigned num_modules, std::string const * modules, unsigned num_threads) { + return import_modules_fn(env, num_threads)(num_modules, modules); } -environment import_module(environment const & env, std::string const & module) { - return import_modules(env, 1, &module); +environment import_module(environment const & env, std::string const & module, unsigned num_threads) { + return import_modules(env, 1, &module, num_threads); } } diff --git a/src/library/module.h b/src/library/module.h index 53cc5ff79..f61d8b29a 100644 --- a/src/library/module.h +++ b/src/library/module.h @@ -16,8 +16,8 @@ namespace lean { Modules included directly or indirectly by them are also imported. The environment \c env is usually an empty environment. */ -environment import_modules(environment const & env, unsigned num_modules, std::string const * modules); -environment import_module(environment const & env, std::string const & module); +environment import_modules(environment const & env, unsigned num_modules, std::string const * modules, unsigned num_threads = 1); +environment import_module(environment const & env, std::string const & module, unsigned num_threads = 1); /** \brief Store/Export module using \c env to the output stream \c out.