feat(library/module): add module import procedure

The modules are processed in parallel.

Signed-off-by: Leonardo de Moura <leonardo@microsoft.com>
This commit is contained in:
Leonardo de Moura 2014-05-21 18:45:19 -07:00
parent ce634d4459
commit 21905289fa
2 changed files with 240 additions and 21 deletions

View file

@ -5,20 +5,29 @@ Released under Apache 2.0 license as described in the file LICENSE.
Author: Leonardo de Moura Author: Leonardo de Moura
*/ */
#include <unordered_map> #include <unordered_map>
#include <vector>
#include <utility> #include <utility>
#include <string> #include <string>
#include <sstream>
#include <fstream>
#include <algorithm>
#include "util/hash.h" #include "util/hash.h"
#include "util/thread.h"
#include "util/lean_path.h"
#include "util/sstream.h"
#include "util/buffer.h"
#include "util/interrupt.h"
#include "kernel/type_checker.h" #include "kernel/type_checker.h"
#include "library/module.h" #include "library/module.h"
#include "library/kernel_serializer.h" #include "library/kernel_serializer.h"
#include "version.h"
namespace lean { namespace lean {
typedef std::pair<std::string, std::function<void(serializer &)>> writer; typedef std::pair<std::string, std::function<void(serializer &)>> writer;
typedef std::pair<std::string, unsigned> import_info; // imported module and its hashcode
struct module_ext : public environment_extension { struct module_ext : public environment_extension {
list<import_info> m_direct_imports; list<std::string> m_direct_imports;
list<writer> m_writers; list<writer> m_writers;
}; };
struct module_ext_reg { struct module_ext_reg {
@ -34,9 +43,12 @@ static environment update(environment const & env, module_ext const & ext) {
return env.update(g_ext.m_ext_id, std::make_shared<module_ext>(ext)); return env.update(g_ext.m_ext_id, std::make_shared<module_ext>(ext));
} }
static char const * g_olean_end_file = "EndFile";
static char const * g_olean_header = "oleanfile";
void export_module(std::ostream & out, environment const & env) { void export_module(std::ostream & out, environment const & env) {
module_ext const & ext = get_extension(env); module_ext const & ext = get_extension(env);
buffer<import_info> imports; buffer<std::string> imports;
buffer<writer const *> writers; buffer<writer const *> writers;
to_buffer(ext.m_direct_imports, imports); to_buffer(ext.m_direct_imports, imports);
std::reverse(imports.begin(), imports.end()); std::reverse(imports.begin(), imports.end());
@ -44,23 +56,26 @@ void export_module(std::ostream & out, environment const & env) {
writers.push_back(&w); writers.push_back(&w);
std::reverse(writers.begin(), writers.end()); std::reverse(writers.begin(), writers.end());
std::string r; std::ostringstream out1(std::ios_base::binary);
std::ostringstream out1(r);
serializer s1(out1); serializer s1(out1);
// store imported files
s1 << imports.size();
for (auto p : imports)
s1 << p.first << p.second;
// store objects // store objects
for (auto p : writers) { for (auto p : writers) {
s1 << p->first; s1 << p->first;
p->second(s1); p->second(s1);
} }
s1 << g_olean_end_file;
serializer s2(out); serializer s2(out);
unsigned h = hash_str(r.size(), r.c_str(), 13); std::string r = out1.str();
unsigned h = hash(r.size(), [&](unsigned i) { return r[i]; });
s2 << g_olean_header << LEAN_VERSION_MAJOR << LEAN_VERSION_MINOR;
s2 << h;
// store imported files
s2 << imports.size();
for (auto m : imports)
s2 << m;
// store object code
s2 << h; s2 << h;
for (unsigned i = 0; i < r.size(); i++) for (unsigned i = 0; i < r.size(); i++)
s2.write_char(r[i]); s2.write_char(r[i]);
@ -89,7 +104,7 @@ environment add(environment const & env, std::string const & k, std::function<vo
static std::string g_decl("decl"); static std::string g_decl("decl");
static void declaration_reader(deserializer & d, module_idx midx, shared_environment & senv, static void declaration_reader(deserializer & d, module_idx midx, shared_environment & senv,
std::function<void(asynch_update_fn const &)> & add_asynch_update, std::function<void(asynch_update_fn const &)> & add_asynch_update,
std::function<void(delayed_update_fn const &)> &) { std::function<void(delayed_update_fn const &)> &) {
declaration decl = read_declaration(d, midx); declaration decl = read_declaration(d, midx);
environment env = senv.env(); environment env = senv.env();
@ -118,13 +133,217 @@ environment add(environment const & env, certified_declaration const & d) {
return new_env; return new_env;
} }
environment import_modules(environment const & env, unsigned num_modules, std::string const * modules) { struct import_modules_fn {
// TODO(Leo) typedef std::tuple<module_idx, unsigned, delayed_update_fn> delayed_update;
for (unsigned i = 0; i < num_modules; i++) std::cout << modules[i]; shared_environment m_senv;
return env; unsigned m_num_threads;
mutex m_asynch_mutex;
condition_variable m_asynch_cv;
std::vector<asynch_update_fn> m_asynch_tasks;
mutex m_delayed_mutex;
std::vector<delayed_update> m_delayed_tasks;
atomic<unsigned> m_import_counter; // number of modules to be processed
atomic<bool> m_all_modules_imported;
struct module_info {
std::string m_name;
std::string m_fname;
atomic<unsigned> m_counter; // number of dependencies to be processed
unsigned m_module_idx;
std::vector<std::shared_ptr<module_info>> m_dependents;
std::vector<char> m_obj_code;
module_info():m_counter(0), m_module_idx(0) {}
};
typedef std::shared_ptr<module_info> module_info_ptr;
std::unordered_map<std::string, module_info_ptr> m_module_info;
import_modules_fn(environment const & env, unsigned num_threads):
m_senv(env), m_num_threads(num_threads),
m_import_counter(0), m_all_modules_imported(false) {
if (m_num_threads == 0)
m_num_threads = 1;
}
module_info_ptr load_module_file(std::string const & mname) {
auto it = m_module_info.find(mname);
if (it != m_module_info.end())
return it->second;
std::string fname = find_file(mname, {".olean"});
std::ifstream in(fname, std::ifstream::binary);
if (!in.good())
throw exception(sstream() << "failed to open file '" << fname << "'");
deserializer d1(in);
std::string header;
d1 >> header;
if (header != g_olean_header)
throw exception(sstream() << "file '" << fname << "' does not seem to be a valid object Lean file");
unsigned major, minor, claimed_hash;
d1 >> major >> minor >> claimed_hash;
// Enforce version?
unsigned num_imports = d1.read_unsigned();
buffer<std::string> imports;
for (unsigned i = 0; i < num_imports; i++)
imports.push_back(d1.read_string());
unsigned code_size = d1.read_unsigned();
std::vector<char> code(code_size);
for (unsigned i = 0; i < code_size; i++)
code[i] = d1.read_char();
unsigned computed_hash = hash(code_size, [&](unsigned i) { return code[i]; });
if (claimed_hash != computed_hash)
throw exception(sstream() << "file '" << fname << "' has been corrupted");
module_info_ptr r = std::make_shared<module_info>();
r->m_name = mname;
r->m_fname = fname;
r->m_counter = imports.size();
r->m_module_idx = m_import_counter;
m_import_counter++;
std::swap(r->m_obj_code, code);
for (auto i : imports) {
r->m_dependents.push_back(load_module_file(i));
}
if (imports.empty())
add_import_module_task(r);
return r;
}
void add_asynch_task(asynch_update_fn const & f) {
{
lock_guard<mutex> l(m_asynch_mutex);
m_asynch_tasks.push_back(f);
}
m_asynch_cv.notify_one();
}
void add_import_module_task(module_info_ptr const & r) {
add_asynch_task([=](shared_environment &) { import_module(r); });
}
void import_module(module_info_ptr const & r) {
std::string s(r->m_obj_code.data(), r->m_obj_code.size());
std::istringstream in(s, std::ios_base::binary);
deserializer d(in);
unsigned obj_counter = 0;
std::function<void(asynch_update_fn const &)> add_asynch_update([&](asynch_update_fn const & f) {
add_asynch_task(f);
});
std::function<void(delayed_update_fn const &)> add_delayed_update([&](delayed_update_fn const & f) {
lock_guard<mutex> lk(m_delayed_mutex);
m_delayed_tasks.push_back(std::make_tuple(r->m_module_idx, obj_counter, f));
});
while (true) {
check_interrupted();
std::string k;
d >> k;
if (k == g_olean_end_file) {
break;
} else {
object_readers & readers = get_object_readers();
auto it = readers.find(k);
if (it == readers.end())
throw exception(sstream() << "file '" << r->m_fname << "' has been corrupted");
it->second(d, r->m_module_idx, m_senv, add_asynch_update, add_delayed_update);
obj_counter++;
}
}
if (atomic_fetch_sub_explicit(&m_import_counter, 1u, memory_order_relaxed) == 1u)
m_all_modules_imported = true;
// Module was successfully imported, we should notify descendents.
for (module_info_ptr const & d : r->m_dependents) {
if (atomic_fetch_sub_explicit(&(d->m_counter), 1u, memory_order_relaxed) == 1u) {
// all d's dependencies have been processed
add_import_module_task(d);
}
}
}
optional<asynch_update_fn> next_task() {
while (true) {
check_interrupted();
unique_lock<mutex> lk(m_asynch_mutex);
if (m_all_modules_imported)
return optional<asynch_update_fn>();
if (!m_asynch_tasks.empty()) {
asynch_update_fn r = m_asynch_tasks.back();
m_asynch_tasks.pop_back();
return optional<asynch_update_fn>(r);
} else {
m_asynch_cv.wait(lk);
}
}
}
void process_asynch_tasks() {
std::vector<std::unique_ptr<interruptible_thread>> extra_threads;
std::vector<std::unique_ptr<exception>> thread_exceptions(m_num_threads - 1);
for (unsigned i = 0; i < m_num_threads - 1; i++) {
extra_threads.push_back(std::unique_ptr<interruptible_thread>(new interruptible_thread([=, &thread_exceptions]() {
try {
while (auto t = next_task()) {
(*t)(m_senv);
}
} catch (exception ex) {
thread_exceptions[i].reset(ex.clone());
} catch (...) {
thread_exceptions[i].reset(new exception("module import thread failed for unknown reasons"));
}
})));
}
try {
while (auto t = next_task()) {
(*t)(m_senv);
}
for (auto & th : extra_threads)
th->join();
} catch (...) {
for (auto & th : extra_threads)
th->request_interrupt();
for (auto & th : extra_threads)
th->join();
throw;
}
for (auto const & ex : thread_exceptions) {
if (ex.get())
ex->rethrow();
}
}
environment process_delayed_tasks() {
environment env = m_senv.env();
// Sort delayed tasks using lexicographical order on (module-idx, obj-idx).
// obj-idx is the object's position in the module.
std::sort(m_delayed_tasks.begin(), m_delayed_tasks.end(),
[](delayed_update const & u1, delayed_update const & u2) {
if (std::get<0>(u1) != std::get<0>(u2))
return std::get<0>(u1) < std::get<0>(u2);
else
return std::get<1>(u1) < std::get<1>(u2);
});
for (auto const & d : m_delayed_tasks) {
env = std::get<2>(d)(env);
}
return env;
}
environment operator()(unsigned num_modules, std::string const * modules) {
for (unsigned i = 0; i < num_modules; i++)
load_module_file(modules[i]);
process_asynch_tasks();
return process_delayed_tasks();
}
};
environment import_modules(environment const & env, unsigned num_modules, std::string const * modules, unsigned num_threads) {
return import_modules_fn(env, num_threads)(num_modules, modules);
} }
environment import_module(environment const & env, std::string const & module) { environment import_module(environment const & env, std::string const & module, unsigned num_threads) {
return import_modules(env, 1, &module); return import_modules(env, 1, &module, num_threads);
} }
} }

View file

@ -16,8 +16,8 @@ namespace lean {
Modules included directly or indirectly by them are also imported. Modules included directly or indirectly by them are also imported.
The environment \c env is usually an empty environment. The environment \c env is usually an empty environment.
*/ */
environment import_modules(environment const & env, unsigned num_modules, std::string const * modules); environment import_modules(environment const & env, unsigned num_modules, std::string const * modules, unsigned num_threads = 1);
environment import_module(environment const & env, std::string const & module); environment import_module(environment const & env, std::string const & module, unsigned num_threads = 1);
/** /**
\brief Store/Export module using \c env to the output stream \c out. \brief Store/Export module using \c env to the output stream \c out.