feat(util/lazy_list): add par template for lazy lists)
Signed-off-by: Leonardo de Moura <leonardo@microsoft.com>
This commit is contained in:
parent
157a2b36db
commit
924187b055
3 changed files with 65 additions and 4 deletions
|
@ -97,6 +97,7 @@ static void tst1() {
|
||||||
display(orelse(filter(take(100, seq(1)), [](int i) { return i < 0; }), take(10, seq(1000))));
|
display(orelse(filter(take(100, seq(1)), [](int i) { return i < 0; }), take(10, seq(1000))));
|
||||||
#ifndef __APPLE__
|
#ifndef __APPLE__
|
||||||
display(timeout(append(append(take(10, seq(1)), loop()), seq(100)), 5));
|
display(timeout(append(append(take(10, seq(1)), loop()), seq(100)), 5));
|
||||||
|
display(take(10, par(seq(1), loop())));
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -223,4 +223,64 @@ lazy_list<T> timeout(lazy_list<T> const & l, unsigned ms, unsigned check_ms = g_
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
\brief Similar to interleave, but the heads are computed in parallel.
|
||||||
|
Moreover, when pulling results from the lists, if one finishes before the other,
|
||||||
|
then the other one is interrupted.
|
||||||
|
*/
|
||||||
|
template<typename T>
|
||||||
|
lazy_list<T> par(lazy_list<T> const & l1, lazy_list<T> const & l2, unsigned check_ms = g_small_sleep) {
|
||||||
|
return lazy_list<T>([=]() {
|
||||||
|
typename lazy_list<T>::maybe_pair r1;
|
||||||
|
typename lazy_list<T>::maybe_pair r2;
|
||||||
|
std::atomic<bool> done1(false);
|
||||||
|
std::atomic<bool> done2(false);
|
||||||
|
interruptible_thread th1([&]() {
|
||||||
|
try {
|
||||||
|
r1 = l1.pull();
|
||||||
|
} catch (...) {
|
||||||
|
r1 = typename lazy_list<T>::maybe_pair();
|
||||||
|
}
|
||||||
|
done1 = true;
|
||||||
|
});
|
||||||
|
interruptible_thread th2([&]() {
|
||||||
|
try {
|
||||||
|
r2 = l2.pull();
|
||||||
|
} catch (...) {
|
||||||
|
r2 = typename lazy_list<T>::maybe_pair();
|
||||||
|
}
|
||||||
|
done2 = true;
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
std::chrono::milliseconds small(check_ms);
|
||||||
|
while (!done1 && !done2) {
|
||||||
|
check_interrupted();
|
||||||
|
std::this_thread::sleep_for(small);
|
||||||
|
}
|
||||||
|
th1.request_interrupt();
|
||||||
|
th2.request_interrupt();
|
||||||
|
th1.join();
|
||||||
|
th2.join();
|
||||||
|
// TODO(Leo): check why the following commented code does not work
|
||||||
|
// if (r1 && r2) {
|
||||||
|
// lazy_list<T> tail([=]() { return some(mk_pair(r2->first, par(r1->second, r2->second))); });
|
||||||
|
// return some(mk_pair(r1->first, tail));
|
||||||
|
// } else
|
||||||
|
if (r1) {
|
||||||
|
return some(mk_pair(r1->first, par(r1->second, l2)));
|
||||||
|
} else if (r2) {
|
||||||
|
return some(mk_pair(r2->first, par(l1, r2->second)));
|
||||||
|
} else {
|
||||||
|
return r2;
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
th1.request_interrupt();
|
||||||
|
th2.request_interrupt();
|
||||||
|
th1.join();
|
||||||
|
th2.join();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,10 +23,6 @@ public:
|
||||||
optional(T const & v):m_some(true) {
|
optional(T const & v):m_some(true) {
|
||||||
new (&m_value) T(v);
|
new (&m_value) T(v);
|
||||||
}
|
}
|
||||||
template<typename... Args>
|
|
||||||
optional(Args&&... args):m_some(true) {
|
|
||||||
new (&m_value) T(args...);
|
|
||||||
}
|
|
||||||
optional(optional const & other):m_some(other.m_some) {
|
optional(optional const & other):m_some(other.m_some) {
|
||||||
if (m_some)
|
if (m_some)
|
||||||
new (&m_value) T(other.m_value);
|
new (&m_value) T(other.m_value);
|
||||||
|
@ -35,6 +31,10 @@ public:
|
||||||
if (m_some)
|
if (m_some)
|
||||||
m_value = std::move(other.m_value);
|
m_value = std::move(other.m_value);
|
||||||
}
|
}
|
||||||
|
template<typename... Args>
|
||||||
|
optional(Args&&... args):m_some(true) {
|
||||||
|
new (&m_value) T(args...);
|
||||||
|
}
|
||||||
~optional() {
|
~optional() {
|
||||||
if (m_some)
|
if (m_some)
|
||||||
m_value.~T();
|
m_value.~T();
|
||||||
|
|
Loading…
Reference in a new issue