summaryrefslogtreecommitdiff
path: root/test/test_task_runner.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_task_runner.cc')
-rw-r--r--test/test_task_runner.cc174
1 files changed, 174 insertions, 0 deletions
diff --git a/test/test_task_runner.cc b/test/test_task_runner.cc
new file mode 100644
index 0000000..d259b6a
--- /dev/null
+++ b/test/test_task_runner.cc
@@ -0,0 +1,174 @@
+#include "common.hh"
+
+#include "logger.hh"
+#include "looper.hh"
+#include "task_runner.hh"
+#include "task_runner_reply.hh"
+
+#include <atomic>
+#include <future>
+#include <gtest/gtest.h>
+#include <thread>
+#include <vector>
+
+namespace {
+
+class TaskRunnerTest : public testing::Test {
+protected:
+ virtual TaskRunner* runner() = 0;
+ virtual std::shared_ptr<TaskRunner> shared_runner() = 0;
+ virtual void run_until_idle() = 0;
+};
+
+class TaskRunnerLooper : public TaskRunnerTest {
+protected:
+ TaskRunnerLooper()
+ : logger_(Logger::create_null()),
+ looper_(Looper::create()),
+ runner_(TaskRunner::create(looper_)) {
+ }
+
+ TaskRunner* runner() override {
+ return runner_.get();
+ }
+
+ std::shared_ptr<TaskRunner> shared_runner() override {
+ return runner_;
+ }
+
+ void run_until_idle() override {
+ runner_->post(std::bind(&Looper::quit, looper_.get()));
+ looper_->run(logger_.get());
+ }
+
+private:
+ std::unique_ptr<Logger> logger_;
+ std::shared_ptr<Looper> looper_;
+ std::shared_ptr<TaskRunner> runner_;
+};
+
+class TaskRunnerThread : public TaskRunnerTest,
+ public testing::WithParamInterface<int> {
+protected:
+ void SetUp() override {
+ runner_ = TaskRunner::create(GetParam());
+ }
+
+ TaskRunner* runner() override {
+ return runner_.get();
+ }
+
+ std::shared_ptr<TaskRunner> shared_runner() override {
+ return runner_;
+ }
+
+ void run_until_idle() override {
+ std::promise<bool> done;
+ auto done_future = done.get_future();
+ runner_->post([&done] { done.set_value(true); });
+ done_future.wait();
+ // Done above makes sure all of the queue has been finished
+ // but we also need to run the destructor for runner_ to make
+ // sure all threads have finished running the callback they have.
+ runner_ = TaskRunner::create(GetParam());
+ }
+
+private:
+ std::shared_ptr<TaskRunner> runner_;
+};
+
+} // namespace
+
+TEST_F(TaskRunnerLooper, sanity) {
+ auto value = std::make_unique<int>(0);
+ auto* value_ptr = value.get();
+ for (int i = 0; i < 100; ++i)
+ runner()->post([value_ptr] { (*value_ptr)++; });
+ run_until_idle();
+ EXPECT_EQ(100, *value);
+}
+
+TEST_F(TaskRunnerLooper, thread) {
+ auto value = std::make_shared<int>(0);
+ auto const main_thread_id = std::this_thread::get_id();
+ auto* tmp = runner();
+ std::vector<std::thread> threads;
+ for (size_t i = 0; i < 10; ++i) {
+ threads.emplace_back([&value, tmp, main_thread_id] {
+ tmp->post([&value, main_thread_id] {
+ if (std::this_thread::get_id() == main_thread_id) {
+ (*value)++;
+ }
+ });
+ });
+ }
+ for (auto& thread : threads) thread.join();
+ run_until_idle();
+ EXPECT_EQ(10, *value);
+}
+
+TEST_F(TaskRunnerLooper, reply) {
+ int result = 0;
+ std::function<int()> callback = [] () -> int {
+ return 10;
+ };
+ std::function<void(int)> reply = [&result] (int value) {
+ result = value;
+ };
+ post_and_reply(
+ runner(),
+ std::move(callback),
+ shared_runner(),
+ std::move(reply));
+ run_until_idle();
+ EXPECT_EQ(10, result);
+}
+
+/*
+TEST_F(TaskRunnerLooper, reply_unique) {
+ int result = 0;
+ std::function<std::unique_ptr<int>()> callback =
+ [] () -> std::unique_ptr<int> {
+ return std::make_unique<int>(10);
+ };
+ std::function<void(std::unique_ptr<int>)> reply =
+ [&result] (std::unique_ptr<int> value) {
+ result = *value;
+ };
+ post_and_reply(
+ runner(),
+ std::move(callback),
+ shared_runner(),
+ std::move(reply));
+ run_until_idle();
+ EXPECT_EQ(10, result);
+}
+*/
+
+TEST_P(TaskRunnerThread, sanity) {
+ std::atomic<int> value(0);
+ for (int i = 0; i < 100; ++i)
+ runner()->post([&value] { value++; });
+ run_until_idle();
+ EXPECT_EQ(100, value);
+}
+
+TEST_P(TaskRunnerThread, thread) {
+ std::mutex mutex;
+ std::set<std::thread::id> threads;
+ for (int i = 0; i < 100; ++i)
+ runner()->post([&threads, &mutex] {
+ bool new_thread;
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ auto pair = threads.insert(std::this_thread::get_id());
+ new_thread = pair.second;
+ }
+ if (new_thread)
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ });
+ run_until_idle();
+ EXPECT_EQ(GetParam(), threads.size());
+}
+
+INSTANTIATE_TEST_SUITE_P(Threads, TaskRunnerThread, testing::Values(1, 2, 10));