#include "common.hh" #include "logger.hh" #include "looper.hh" #include "task_runner.hh" #include "task_runner_reply.hh" #include #include #include #include #include namespace { class TaskRunnerTest : public testing::Test { protected: virtual TaskRunner* runner() = 0; virtual std::shared_ptr shared_runner() = 0; virtual void run_until_idle() = 0; }; class TaskRunnerLooper : public TaskRunnerTest { protected: TaskRunnerLooper() : logger_(Logger::create_null()), looper_(Looper::create()), runner_(TaskRunner::create(looper_)) { } TaskRunner* runner() override { return runner_.get(); } std::shared_ptr shared_runner() override { return runner_; } void run_until_idle() override { runner_->post(std::bind(&Looper::quit, looper_.get())); looper_->run(logger_.get()); } private: std::unique_ptr logger_; std::shared_ptr looper_; std::shared_ptr runner_; }; class TaskRunnerThread : public TaskRunnerTest, public testing::WithParamInterface { protected: void SetUp() override { runner_ = TaskRunner::create(GetParam()); } TaskRunner* runner() override { return runner_.get(); } std::shared_ptr shared_runner() override { return runner_; } void run_until_idle() override { std::promise done; auto done_future = done.get_future(); runner_->post([&done] { done.set_value(true); }); done_future.wait(); // Done above makes sure all of the queue has been finished // but we also need to run the destructor for runner_ to make // sure all threads have finished running the callback they have. runner_ = TaskRunner::create(GetParam()); } private: std::shared_ptr runner_; }; } // namespace TEST_F(TaskRunnerLooper, sanity) { auto value = std::make_unique(0); auto* value_ptr = value.get(); for (int i = 0; i < 100; ++i) runner()->post([value_ptr] { (*value_ptr)++; }); run_until_idle(); EXPECT_EQ(100, *value); } TEST_F(TaskRunnerLooper, thread) { auto value = std::make_shared(0); auto const main_thread_id = std::this_thread::get_id(); auto* tmp = runner(); std::vector threads; for (size_t i = 0; i < 10; ++i) { threads.emplace_back([&value, tmp, main_thread_id] { tmp->post([&value, main_thread_id] { if (std::this_thread::get_id() == main_thread_id) { (*value)++; } }); }); } for (auto& thread : threads) thread.join(); run_until_idle(); EXPECT_EQ(10, *value); } TEST_F(TaskRunnerLooper, reply) { int result = 0; std::function callback = [] () -> int { return 10; }; std::function reply = [&result] (int value) { result = value; }; post_and_reply( runner(), std::move(callback), shared_runner(), std::move(reply)); run_until_idle(); EXPECT_EQ(10, result); } /* TEST_F(TaskRunnerLooper, reply_unique) { int result = 0; std::function()> callback = [] () -> std::unique_ptr { return std::make_unique(10); }; std::function)> reply = [&result] (std::unique_ptr value) { result = *value; }; post_and_reply( runner(), std::move(callback), shared_runner(), std::move(reply)); run_until_idle(); EXPECT_EQ(10, result); } */ TEST_P(TaskRunnerThread, sanity) { std::atomic value(0); for (int i = 0; i < 100; ++i) runner()->post([&value] { value++; }); run_until_idle(); EXPECT_EQ(100, value); } TEST_P(TaskRunnerThread, thread) { std::mutex mutex; std::set threads; for (int i = 0; i < 100; ++i) runner()->post([&threads, &mutex] { bool new_thread; { std::unique_lock lock(mutex); auto pair = threads.insert(std::this_thread::get_id()); new_thread = pair.second; } if (new_thread) std::this_thread::sleep_for(std::chrono::milliseconds(10)); }); run_until_idle(); EXPECT_EQ(GetParam(), threads.size()); } INSTANTIATE_TEST_SUITE_P(Threads, TaskRunnerThread, testing::Values(1, 2, 10));