-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathTaskSink.cpp
More file actions
100 lines (87 loc) · 2.96 KB
/
TaskSink.cpp
File metadata and controls
100 lines (87 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Copyright (C) 2020-2022 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/
#include "TaskSink.h"
#include <olp/core/logging/Log.h>
namespace olp {
namespace dataservice {
namespace read {
namespace {
constexpr auto kLogTag = "TaskSink";
void ExecuteTask(client::TaskContext task) { task.Execute(); }
} // namespace
TaskSink::TaskSink(std::shared_ptr<thread::TaskScheduler> task_scheduler)
: task_scheduler_(std::move(task_scheduler)),
pending_requests_(std::make_shared<client::PendingRequests>()),
closed_(false) {}
TaskSink::~TaskSink() {
{
std::lock_guard<std::mutex> lock(mutex_);
closed_ = true;
const auto task_count = pending_requests_->GetTaskCount();
if (task_count > 0) {
OLP_SDK_LOG_INFO_F(kLogTag, "Finishing, canceling %" PRIu64 " tasks.",
static_cast<std::uint64_t>(task_count));
}
}
// CancelAllAndWait method should be called without mutex, since potentially
// there might be new added tasks, it may result in deadlock.
pending_requests_->CancelAllAndWait();
}
void TaskSink::CancelTasks() { pending_requests_->CancelAll(); }
client::CancellationToken TaskSink::AddTask(
std::function<void(client::CancellationContext)> func, uint32_t priority,
client::CancellationContext context) {
auto task = client::TaskContext::Create(
[](client::CancellationContext)
-> client::ApiResponse<bool, client::ApiError> {
return client::ApiError();
},
[=](client::ApiResponse<bool, client::ApiError>) { func(context); },
context);
AddTaskImpl(task, priority);
return task.CancelToken(task_scheduler_);
}
bool TaskSink::AddTaskImpl(client::TaskContext task, uint32_t priority) {
if (task_scheduler_) {
return ScheduleTask(std::move(task), priority);
} else {
ExecuteTask(std::move(task));
return true;
}
}
bool TaskSink::ScheduleTask(client::TaskContext task, uint32_t priority) {
std::lock_guard<std::mutex> lock(mutex_);
if (closed_) {
OLP_SDK_LOG_WARNING(
kLogTag, "Attempt to add a task when the sink is already closed");
return false;
}
pending_requests_->Insert(task);
auto pending_requests = pending_requests_;
task_scheduler_->ScheduleTask(
[=] {
task.Execute();
pending_requests->Remove(task);
},
priority);
return true;
}
} // namespace read
} // namespace dataservice
} // namespace olp