Skip to content

Commit 9698987

Browse files
committed
[occ] New OCCLite plugin, without Protobuf
1 parent 1aec6a5 commit 9698987

18 files changed

Lines changed: 1360 additions & 249 deletions

occ/CMakeLists.txt

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,17 @@ set(CMAKE_INSTALL_LIBDIR lib)
3939
set(OCC_ORGANIZATION_NAME "O²")
4040
set(OCC_ORGANIZATION_DOMAIN "github.com/AliceO2Group/Control")
4141

42+
set(OCCLIB_PRODUCT_NAME "OCC Library")
43+
set(OCCLIB_PRODUCT_MAINTAINER "Teo Mrnjavac <teo.m@cern.ch>")
44+
set(OCCLIB_DESCRIPTION_SUMMARY "The O² Configuration and Control library")
45+
4246
set(OCCPLUGIN_PRODUCT_NAME "OCC Plugin")
43-
set(OCCPLUGIN_PRODUCT_MAINTAINER "Teo Mrnjavac <teo.mrnjavac@cern.ch>")
47+
set(OCCPLUGIN_PRODUCT_MAINTAINER "Teo Mrnjavac <teo.m@cern.ch>")
4448
set(OCCPLUGIN_DESCRIPTION_SUMMARY "The O² Configuration and Control plugin for FairMQ devices")
4549

46-
set(OCCLIB_PRODUCT_NAME "OCC Library")
47-
set(OCCLIB_PRODUCT_MAINTAINER "Teo Mrnjavac <teo.mrnjavac@cern.ch>")
48-
set(OCCLIB_DESCRIPTION_SUMMARY "The O² Configuration and Control library")
50+
set(OCCLITE_PRODUCT_NAME "OCClite Plugin")
51+
set(OCCLITE_PRODUCT_MAINTAINER "Teo Mrnjavac <teo.m@cern.ch>")
52+
set(OCCLITE_DESCRIPTION_SUMMARY "The O² Configuration and Control plugin for FairMQ devices")
4953

5054
# Read version number from VERSION file in parent dir
5155
set(OCC_VERSION_MAJOR 0)
@@ -99,7 +103,7 @@ project(occ
99103
###
100104
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
101105

102-
set(CMAKE_CXX_STANDARD 14)
106+
set(CMAKE_CXX_STANDARD 17)
103107
set(CMAKE_CXX_STANDARD_REQUIRED ON)
104108
set(CMAKE_C_STANDARD 99)
105109
set(CMAKE_C_STANDARD_REQUIRED ON)
@@ -136,7 +140,8 @@ list(APPEND CMAKE_PREFIX_PATH
136140
${FAIRLOGGERPATH}
137141
${PROTOBUFPATH}
138142
${GRPCPATH}
139-
${BOOSTPATH})
143+
${BOOSTPATH}
144+
${RAPIDJSON_ROOT})
140145

141146

142147
###
@@ -175,6 +180,9 @@ message(STATUS "Using gRPC ${gRPC_VERSION}")
175180
# gRPC C++ plugin
176181
set(gRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
177182

183+
# RapidJSON for OCClite
184+
find_package(RapidJSON CONFIG REQUIRED)
185+
178186
###
179187
### Status messages for build options
180188
###
@@ -399,7 +407,11 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/OccVersion.h.in
399407

400408
set(OCCPLUGIN_SOURCES
401409
plugin/OccPlugin.cxx
402-
plugin/OccPluginServer.cxx)
410+
plugin/OccPluginServer.cxx
411+
plugin/OccFMQCommon.cxx
412+
plugin/litestructs/JsonMessage.cxx
413+
plugin/litestructs/Transition.cxx
414+
plugin/litestructs/EventStream.cxx)
403415

404416
add_library(${OCCPLUGIN} SHARED
405417
${OCCPLUGIN_SOURCES}
@@ -417,7 +429,8 @@ target_include_directories(${OCCPLUGIN}
417429
${CMAKE_CURRENT_BINARY_DIR}
418430
${PROTO_SOURCE_DIR}
419431
${FAIRROOT_INCLUDE_DIR}
420-
${GRPC_INCLUDE_DIRS})
432+
${GRPC_INCLUDE_DIRS}
433+
${RapidJSON_INCLUDE_DIRS})
421434

422435
set_target_properties(${OCCPLUGIN} PROPERTIES
423436
CXX_VISIBILITY_PRESET hidden
@@ -426,3 +439,47 @@ set_target_properties(${OCCPLUGIN} PROPERTIES
426439

427440
install(TARGETS ${OCCPLUGIN} DESTINATION lib)
428441

442+
443+
###
444+
### Build target OCCLITE
445+
###
446+
447+
set(OCCLITE FairMQPlugin_OCClite)
448+
449+
#set(OCC_LITE_SERVICE 1)
450+
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/OccVersion.h.in
451+
${CMAKE_CURRENT_BINARY_DIR}/OccVersion.h)
452+
453+
set(OCCLITE_SOURCES
454+
plugin/OccPlugin.cxx
455+
plugin/OccLiteServer.cxx
456+
plugin/OccFMQCommon.cxx
457+
plugin/litestructs/JsonMessage.cxx
458+
plugin/litestructs/GetState.cxx
459+
plugin/litestructs/Transition.cxx
460+
plugin/litestructs/EventStream.cxx)
461+
462+
add_library(${OCCLITE} SHARED
463+
${OCCLITE_SOURCES})
464+
465+
target_link_libraries(${OCCLITE} PRIVATE
466+
FairMQ::FairMQ
467+
gRPC::grpc++
468+
Boost::program_options)
469+
470+
target_include_directories(${OCCLITE}
471+
PUBLIC
472+
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
473+
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
474+
PRIVATE
475+
${FAIRROOT_INCLUDE_DIR}
476+
${GRPC_INCLUDE_DIRS}
477+
${RapidJSON_INCLUDE_DIRS})
478+
479+
set_target_properties(${OCCLITE} PROPERTIES
480+
CXX_VISIBILITY_PRESET hidden
481+
VERSION ${OCC_VERSION_SHORT}
482+
SOVERSION ${OCC_VERSION_SHORT})
483+
target_compile_definitions(${OCCLITE} PUBLIC OCC_LITE_SERVICE)
484+
485+
install(TARGETS ${OCCLITE} DESTINATION lib)

occ/OccVersion.h.in

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44
#cmakedefine OCC_ORGANIZATION_NAME "${OCC_ORGANIZATION_NAME}"
55
#cmakedefine OCC_ORGANIZATION_DOMAIN "${OCC_ORGANIZATION_DOMAIN}"
66

7-
#cmakedefine OCCPLUGIN_PRODUCT_NAME "${OCCLIB_PRODUCT_NAME}"
8-
#cmakedefine OCCPLUGIN_PRODUCT_MAINTAINER "${OCCPLUGIN_PRODUCT_MAINTAINER}"
9-
#cmakedefine OCCPLUGIN_DESCRIPTION_SUMMARY "${OCCPLUGIN_DESCRIPTION_SUMMARY}"
10-
117
#cmakedefine OCCLIB_PRODUCT_NAME "${OCCLIB_PRODUCT_NAME}"
128
#cmakedefine OCCLIB_PRODUCT_MAINTAINER "${OCCLIB_PRODUCT_MAINTAINER}"
139
#cmakedefine OCCLIB_DESCRIPTION_SUMMARY "${OCCLIB_DESCRIPTION_SUMMARY}"
1410

11+
#cmakedefine OCCPLUGIN_PRODUCT_NAME "${OCCPLUGIN_PRODUCT_NAME}"
12+
#cmakedefine OCCPLUGIN_PRODUCT_MAINTAINER "${OCCPLUGIN_PRODUCT_MAINTAINER}"
13+
#cmakedefine OCCPLUGIN_DESCRIPTION_SUMMARY "${OCCPLUGIN_DESCRIPTION_SUMMARY}"
14+
15+
#cmakedefine OCCLITE_PRODUCT_NAME "${OCCLITE_PRODUCT_NAME}"
16+
#cmakedefine OCCLITE_PRODUCT_MAINTAINER "${OCCLITE_PRODUCT_MAINTAINER}"
17+
#cmakedefine OCCLITE_DESCRIPTION_SUMMARY "${OCCLITE_DESCRIPTION_SUMMARY}"
18+
1519
#cmakedefine OCC_VERSION "${OCC_VERSION}"
1620
#cmakedefine OCC_VERSION_SHORT "${OCC_VERSION_SHORT}"
1721

occ/plugin/OccFMQCommon.cxx

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2020 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
#include "OccFMQCommon.h"
26+
27+
#include "util/Defer.h"
28+
29+
#include <boost/algorithm/string.hpp>
30+
#include <iomanip>
31+
32+
std::string generateSubscriptionId(const std::string& prefix)
33+
{
34+
std::string id;
35+
try {
36+
boost::uuids::random_generator gen;
37+
id = boost::uuids::to_string(gen());
38+
} catch(const boost::uuids::entropy_error &err) {
39+
OLOG(WARNING) << "[generateSubscriptionId] boost::uuids::entropy_error: " << err.what() << " falling back to std::time";
40+
id = std::to_string(std::time(nullptr));
41+
}
42+
return "OCC_"s + (prefix.size() ? (prefix + "_") : "") + id;
43+
}
44+
45+
bool isIntermediateFMQState(const std::string& state)
46+
{
47+
return state.find("INITIALIZING TASK") != std::string::npos ||
48+
state.find("RESETTING") != std::string::npos ||
49+
state.find("BINDING") != std::string::npos ||
50+
state.find("CONNECTING") != std::string::npos;
51+
}
52+
53+
std::tuple<OccLite::nopb::TransitionResponse, ::grpc::Status> doTransition(fair::mq::PluginServices* m_pluginServices, const OccLite::nopb::TransitionRequest& request)
54+
{
55+
std::string srcState = request.srcState;
56+
std::string event = request.transitionEvent;
57+
auto arguments = request.arguments;
58+
59+
std::string currentState = fair::mq::PluginServices::ToStr(m_pluginServices->GetCurrentDeviceState());
60+
if (srcState != currentState) {
61+
return std::make_tuple(OccLite::nopb::TransitionResponse(), grpc::Status(grpc::INVALID_ARGUMENT,
62+
"transition not possible: state mismatch: source: " + srcState + " current: " +
63+
currentState));
64+
}
65+
66+
OLOG(DEBUG) << "transition src: " << srcState
67+
<< " currentState: " << currentState
68+
<< " event: " << event;
69+
70+
std::vector<std::string> newStates;
71+
const std::string finalState = EXPECTED_FINAL_STATE.at(event);
72+
73+
std::condition_variable cv;
74+
std::mutex cv_mu;
75+
76+
auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) {
77+
// CONFIGURE arguments must be pushed during InitializingDevice
78+
if (reachedState == fair::mq::PluginServices::DeviceState::InitializingDevice) {
79+
80+
// FIXME: workaround which special cases a stoi for certain properties
81+
// which must be pushed as int.
82+
// This should be removed once SetPropertyAsString becomes available.
83+
const std::vector<std::string> intKeys = {
84+
"rateLogging",
85+
"rcvBufSize",
86+
"sndBufSize",
87+
"linger",
88+
"rcvKernelSize",
89+
"sndKernelSize"
90+
};
91+
for (auto it = arguments.cbegin(); it != arguments.cend(); ++it) {
92+
std::string key = it->first;
93+
std::string value = it->second;
94+
if (boost::starts_with(key, "chans.")) {
95+
key.erase(0, 6);
96+
std::vector<std::string> split;
97+
boost::split(split, key, std::bind1st(std::equal_to<char>(), '.'));
98+
if (std::find(intKeys.begin(), intKeys.end(), split.back()) != intKeys.end()) {
99+
auto intValue = std::stoi(value);
100+
m_pluginServices->SetProperty(key, intValue);
101+
}
102+
else {
103+
m_pluginServices->SetProperty(key, value);
104+
}
105+
}
106+
else {
107+
m_pluginServices->SetProperty(key, value);
108+
}
109+
}
110+
}
111+
112+
std::unique_lock<std::mutex> lk(cv_mu);
113+
newStates.push_back(fair::mq::PluginServices::ToStr(reachedState));
114+
OLOG(DEBUG) << "transition newStates vector: " << boost::algorithm::join(newStates, ", ");
115+
cv.notify_one();
116+
};
117+
118+
auto id = generateSubscriptionId("Transition");
119+
120+
m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange);
121+
DEFER({
122+
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
123+
});
124+
125+
try {
126+
auto evt = fair::mq::PluginServices::ToDeviceStateTransition(event);
127+
128+
// FIXME: big ugly workaround over here
129+
// Since FairMQ currently (11/2018) can't yet implicitly create channels when receiving
130+
// chans.* properties during INITIALIZING DEVICE, we must fake a --channel-config cli
131+
// parameter during INIT and before the INIT DEVICE event.
132+
// We extract channel related properties from the OCC transition arguments vector and we
133+
// build up a vector of strings which mimics stuff along the lines of
134+
// --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
135+
// See https://github.com/FairRootGroup/FairMQ/pull/111
136+
// When the relevant FairMQ 1.4.x version implements implicit channel creation, this whole
137+
// block should be removed with no loss of functionality.
138+
if (evt == fair::mq::PluginServices::DeviceStateTransition::InitDevice) {
139+
std::unordered_map<std::string, std::unordered_map<std::string, std::string>> channels;
140+
for (auto it = arguments.cbegin(); it != arguments.cend(); ++it) {
141+
std::string key = it->first;
142+
std::string value = it->second;
143+
if (boost::starts_with(key, "chans.")) {
144+
key.erase(0, 6);
145+
std::vector<std::string> split;
146+
boost::split(split, key, std::bind1st(std::equal_to<char>(), '.'));
147+
if (split.size() != 3)
148+
continue;
149+
auto name = split[0];
150+
auto propKey = split[2];
151+
if (channels.find(name) == channels.end()) // if map for this chan doesn't exist yet
152+
channels[name] = std::unordered_map<std::string, std::string>();
153+
channels[name][propKey] = value;
154+
}
155+
}
156+
157+
std::vector<std::string> channelLines;
158+
for (auto it = channels.cbegin(); it != channels.cend(); ++it) {
159+
std::vector<std::string> line;
160+
line.push_back("name=" + it->first);
161+
for (auto jt = it->second.cbegin(); jt != it->second.cend(); ++jt) {
162+
line.push_back(jt->first + "=" + jt->second);
163+
}
164+
channelLines.push_back(boost::join(line, ","));
165+
OLOG(DEBUG) << "transition pushing channel configuration " << channelLines.back();
166+
}
167+
if (!channelLines.empty()) {
168+
m_pluginServices->SetProperty("channel-config", channelLines);
169+
}
170+
}
171+
// Run number must be pushed immediately before RUN transition
172+
else if (evt == fair::mq::PluginServices::DeviceStateTransition::Run) {
173+
try {
174+
for (auto const& [key, value] : arguments) {
175+
m_pluginServices->SetProperty(key, value);
176+
}
177+
}
178+
catch (std::runtime_error &e) {
179+
OLOG(WARNING) << "transition cannot push RUN transition arguments, reason:" << e.what();
180+
}
181+
}
182+
m_pluginServices->ChangeDeviceState(FMQ_CONTROLLER_NAME, evt);
183+
}
184+
catch (fair::mq::PluginServices::DeviceControlError& e) {
185+
OLOG(ERROR) << "transition cannot request transition: " << e.what();
186+
return std::make_tuple(OccLite::nopb::TransitionResponse(), grpc::Status(grpc::INTERNAL, "cannot request transition, OCC plugin has no device control"));
187+
}
188+
catch (std::out_of_range& e) {
189+
OLOG(ERROR) << "transition invalid event name: " << event;
190+
return std::make_tuple(OccLite::nopb::TransitionResponse(), grpc::Status(grpc::INVALID_ARGUMENT, "argument " + event + " is not a valid transition name"));
191+
}
192+
193+
{
194+
std::unique_lock<std::mutex> lk(cv_mu);
195+
196+
// IF we have no states in list yet, OR
197+
// we have some states, and the last one is an intermediate state (for which an autotransition is presumably about to happen)
198+
if (newStates.empty() || isIntermediateFMQState(newStates.back())) {
199+
// We need to block until the transitions are complete
200+
for (;;) {
201+
cv.wait(lk);
202+
if (newStates.empty()) {
203+
OLOG(ERROR) << "[request Transition] notify condition met but no states written";
204+
break;
205+
}
206+
207+
OLOG(DEBUG) << "transition notify condition met, reached state: " << newStates.back();
208+
if (isIntermediateFMQState(newStates.back())) { //if it's an auto state
209+
continue;
210+
} else {
211+
break;
212+
}
213+
}
214+
}
215+
}
216+
217+
if (newStates.empty()) {
218+
return std::make_tuple(OccLite::nopb::TransitionResponse(), grpc::Status(grpc::INTERNAL,
219+
"no transitions made, current state stays " + srcState));
220+
}
221+
222+
if (srcState == "IDLE" && newStates.back() == "DEVICE READY") {
223+
// Debug: list of FairMQ property keys
224+
auto pk = m_pluginServices->GetPropertyKeys();
225+
for (const auto &k : pk) {
226+
OLOG(DEBUG) << std::setw(30) << k << " = " + m_pluginServices->GetPropertyAsString(k);
227+
}
228+
auto chi = m_pluginServices->GetChannelInfo();
229+
OLOG(DEBUG) << "channel info:";
230+
for (const auto &k : chi) {
231+
OLOG(DEBUG) << k.first << " : " << k.second;
232+
}
233+
}
234+
235+
auto response = OccLite::nopb::TransitionResponse();
236+
response.state = newStates.back();
237+
response.transitionEvent = event;
238+
response.ok = (newStates.back() == finalState);
239+
if (newStates.back() == "ERROR") { // ERROR state
240+
response.trigger = OccLite::nopb::DEVICE_ERROR;
241+
} else if (newStates.back() == finalState) { // correct destination state
242+
response.trigger = OccLite::nopb::EXECUTOR;
243+
} else { // some other state, for whatever reason - we assume DEVICE_INTENTIONAL
244+
response.trigger = OccLite::nopb::DEVICE_INTENTIONAL;
245+
}
246+
247+
OLOG(DEBUG) << "transition done, states visited: " << boost::algorithm::join(newStates, ", ");
248+
return std::make_tuple(response, grpc::Status::OK);
249+
}

0 commit comments

Comments
 (0)