Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "AnalysisCCDBHelpers.h"
#include "CCDBFetcherHelper.h"
#include "Framework/ArrowTypes.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/DeviceSpec.h"
#include "Framework/TimingInfo.h"
Expand All @@ -29,6 +30,8 @@
#include <arrow/table.h>
#include <arrow/array.h>
#include <arrow/builder.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/writer.h>
#include <fmt/base.h>
#include <ctime>
#include <memory>
Expand Down Expand Up @@ -109,7 +112,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
auto it = ccdbUrls.find(m.name);
fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString());
auto columnName = m.name.substr(strlen("ccdb:"));
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, fieldMetadata));
fields.emplace_back(std::make_shared<arrow::Field>(columnName, soa::asArrowDataType<int64_t[2]>(), false, fieldMetadata));
}
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
}
Expand All @@ -122,6 +125,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
auto pool = arrow::MemoryPool::CreateDefault();
for (auto& schema : schemas) {
std::vector<CCDBFetcherHelper::FetchOp> ops;
auto inputBinding = *schema->metadata()->Get("sourceTable");
Expand All @@ -143,9 +147,23 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
}
int outputRouteIndex = bindings.at(outRouteDesc);
auto& spec = helper->routes[outputRouteIndex].matcher;
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
for (auto const& _ : schema->fields()) {
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
std::vector<std::shared_ptr<arrow::FixedSizeListBuilder>> builders;
builders.resize(schema->fields().size());

for (auto i = 0U; i < schema->fields().size(); ++i) {
auto valueBuilder = std::make_shared<arrow::Int64Builder>();
builders[i] = std::make_shared<arrow::FixedSizeListBuilder>(pool.get(), valueBuilder, 2);
}

auto reserveSize = timestampColumn->length();
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
"* reserving for size: %lld (has: %lld)",
reserveSize, builders[0]->capacity());
arrow::Status status;
for (auto i = 0U; i < builders.size(); ++i) {
if (builders[i]->capacity() < reserveSize) {
status &= builders[i]->Reserve(reserveSize - builders[i]->capacity());
}
}

for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
Expand Down Expand Up @@ -175,11 +193,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
}
arrow::Status result;
int64_t values[2];
for (size_t bi = 0; bi < responses.size(); bi++) {
auto& builder = builders[bi];
auto* value_builder = static_cast<arrow::Int64Builder*>(builder->value_builder());
auto& response = responses[bi];
char const* address = reinterpret_cast<char const*>(response.id.value);
result &= builder->Append(std::string_view(address, response.size));
values[0] = response.id.value;
values[1] = response.size;
result &= builder->Append();
result &= value_builder->AppendValues(&values[0], 2, nullptr);
LOGP(info, "P: {}; S: {}", values[0], values[1]);
}
if (!result.ok()) {
LOGP(fatal, "Error adding results from CCDB");
Expand All @@ -192,6 +215,17 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
arrays.push_back(*builder->Finish());
}
auto outTable = arrow::Table::Make(schema, arrays);

auto mock = std::make_shared<arrow::io::MockOutputStream>();
int64_t expectedSize = 0;
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), outTable->schema());
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteTable(*(outTable.get()));

expectedSize = mock->Tell().ValueOrDie();
assert(outTable->num_rows() == reserveSize);
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
"* sending a table of size: %lld",
expectedSize);
auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
}
Expand Down
14 changes: 8 additions & 6 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -2455,15 +2455,15 @@ consteval static std::string_view namespace_prefix()
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }

#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
struct _Name_ : o2::soa::Column<int64_t[2], _Name_> { \
static constexpr const char* mLabel = _Label_; \
static constexpr const char* query = _CCDBQuery_; \
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
using base = o2::soa::Column<std::span<std::byte>, _Name_>; \
using type = std::span<std::byte>; \
using base = o2::soa::Column<int64_t[2], _Name_>; \
using type = int64_t[2]; \
using column_t = _Name_; \
_Name_(arrow::ChunkedArray const* column) \
: o2::soa::Column<std::span<std::byte>, _Name_>(o2::soa::ColumnIterator<std::span<std::byte>>(column)) \
: o2::soa::Column<int64_t[2], _Name_>(o2::soa::ColumnIterator<type>(column)) \
{ \
} \
\
Expand All @@ -2473,13 +2473,15 @@ consteval static std::string_view namespace_prefix()
\
decltype(auto) _Getter_() const \
{ \
auto a = *mColumnIterator; \
LOGP(info, "P: {}; S: {}", a[0], a[1]); \
auto span = std::span<std::byte>{reinterpret_cast<std::byte*>(a[0]), static_cast<size_t>(a[1])}; \
if constexpr (std::same_as<_ConcreteType_, std::span<std::byte>>) { \
return *mColumnIterator; \
return span; \
} else { \
static std::byte* payload = nullptr; \
static _ConcreteType_* deserialised = nullptr; \
static TClass* c = TClass::GetClass(#_ConcreteType_); \
auto span = *mColumnIterator; \
if (payload != (std::byte*)span.data()) { \
payload = (std::byte*)span.data(); \
delete deserialised; \
Expand Down
8 changes: 4 additions & 4 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,17 +667,17 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
}
// execute process()
if constexpr (requires { &T::process; }) {
constexpr auto phash = o2::framework::TypeIdHelpers::uniqueId<decltype(&T::process)>();
auto matchers = std::ranges::find_if(inputInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers;
auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(&T::process)>(); });
auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin);
}
// execute optional process()
homogeneous_apply_refs_sized<numElements>(
[&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) {
if constexpr (is_process_configurable<decltype(x)>) {
if (x.value == true) {
constexpr auto phash = o2::framework::TypeIdHelpers::uniqueId<decltype(x.process)>();
auto matchers = std::ranges::find_if(inputInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers;
auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(x.process)>(); });
auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin);
return true;
}
Expand Down
10 changes: 10 additions & 0 deletions Framework/Core/include/Framework/ArrowTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ struct arrow_array_for<int8_t[N]> {
using type = arrow::FixedSizeListArray;
using value_type = int8_t;
};
template <int N>
struct arrow_array_for<int64_t[N]> {
using type = arrow::FixedSizeListArray;
using value_type = int64_t;
};
template <int N>
struct arrow_array_for<uint64_t[N]> {
using type = arrow::FixedSizeListArray;
using value_type = uint64_t;
};

#define ARROW_VECTOR_FOR(_type_) \
template <> \
Expand Down