1111
1212#include " AnalysisCCDBHelpers.h"
1313#include " CCDBFetcherHelper.h"
14+ #include " Framework/ArrowTypes.h"
1415#include " Framework/DataProcessingStats.h"
1516#include " Framework/DeviceSpec.h"
1617#include " Framework/TimingInfo.h"
2930#include < arrow/table.h>
3031#include < arrow/array.h>
3132#include < arrow/builder.h>
33+ #include < arrow/io/memory.h>
34+ #include < arrow/ipc/writer.h>
3235#include < fmt/base.h>
3336#include < ctime>
3437#include < memory>
@@ -109,7 +112,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
109112 auto it = ccdbUrls.find (m.name );
110113 fieldMetadata->Append (" url" , it != ccdbUrls.end () ? it->second : m.defaultValue .asString ());
111114 auto columnName = m.name .substr (strlen (" ccdb:" ));
112- fields.emplace_back (std::make_shared<arrow::Field>(columnName, arrow::binary_view (), false , fieldMetadata));
115+ fields.emplace_back (std::make_shared<arrow::Field>(columnName, soa::asArrowDataType< int64_t [ 2 ]> (), false , fieldMetadata));
113116 }
114117 schemas.emplace_back (std::make_shared<arrow::Schema>(fields, schemaMetadata));
115118 }
@@ -122,6 +125,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
122125 return adaptStateless ([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
123126 O2_SIGNPOST_ID_GENERATE (sid, ccdb);
124127 O2_SIGNPOST_START (ccdb, sid, " fetchFromAnalysisCCDB" , " Fetching CCDB objects for analysis%" PRIu64, (uint64_t )timingInfo.timeslice );
128+ auto pool = arrow::MemoryPool::CreateDefault ();
125129 for (auto & schema : schemas) {
126130 std::vector<CCDBFetcherHelper::FetchOp> ops;
127131 auto inputBinding = *schema->metadata ()->Get (" sourceTable" );
@@ -143,15 +147,23 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
143147 }
144148 int outputRouteIndex = bindings.at (outRouteDesc);
145149 auto & spec = helper->routes [outputRouteIndex].matcher ;
146- std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
147- for (auto const & _ : schema->fields ()) {
148- builders.emplace_back (std::make_shared<arrow::BinaryViewBuilder>());
150+ std::vector<std::shared_ptr<arrow::FixedSizeListBuilder>> builders;
151+ builders.resize (schema->fields ().size ());
152+
153+ for (auto i = 0U ; i < schema->fields ().size (); ++i) {
154+ auto valueBuilder = std::make_shared<arrow::Int64Builder>();
155+ builders[i] = std::make_shared<arrow::FixedSizeListBuilder>(pool.get (), valueBuilder, 2 );
149156 }
150157
151158 auto reserveSize = timestampColumn->length ();
159+ O2_SIGNPOST_EVENT_EMIT_INFO (ccdb, sid, " fetchFromAnalysisCCDB" ,
160+ " * reserving for size: %lld (has: %lld)" ,
161+ reserveSize, builders[0 ]->capacity ());
152162 arrow::Status status;
153163 for (auto i = 0U ; i < builders.size (); ++i) {
154- status &= builders[i]->Reserve (reserveSize);
164+ if (builders[i]->capacity () < reserveSize) {
165+ status &= builders[i]->Reserve (reserveSize - builders[i]->capacity ());
166+ }
155167 }
156168
157169 for (auto ci = 0 ; ci < timestampColumn->num_chunks (); ++ci) {
@@ -181,11 +193,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
181193 LOGP (fatal, " Not enough responses (expected {}, found {})" , builders.size (), responses.size ());
182194 }
183195 arrow::Status result;
196+ int64_t values[2 ];
184197 for (size_t bi = 0 ; bi < responses.size (); bi++) {
185198 auto & builder = builders[bi];
199+ auto * value_builder = static_cast <arrow::Int64Builder*>(builder->value_builder ());
186200 auto & response = responses[bi];
187- char const * address = reinterpret_cast <char const *>(response.id .value );
188- result &= builder->Append (std::string_view (address, response.size ));
201+ values[0 ] = response.id .value ;
202+ values[1 ] = response.size ;
203+ result &= builder->Append ();
204+ result &= value_builder->AppendValues (&values[0 ], 2 , nullptr );
205+ LOGP (info, " P: {}; S: {}" , values[0 ], values[1 ]);
189206 }
190207 if (!result.ok ()) {
191208 LOGP (fatal, " Error adding results from CCDB" );
@@ -198,6 +215,17 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
198215 arrays.push_back (*builder->Finish ());
199216 }
200217 auto outTable = arrow::Table::Make (schema, arrays);
218+
219+ auto mock = std::make_shared<arrow::io::MockOutputStream>();
220+ int64_t expectedSize = 0 ;
221+ auto mockWriter = arrow::ipc::MakeStreamWriter (mock.get (), outTable->schema ());
222+ arrow::Status outStatus = mockWriter.ValueOrDie ()->WriteTable (*(outTable.get ()));
223+
224+ expectedSize = mock->Tell ().ValueOrDie ();
225+ assert (outTable->num_rows () == reserveSize);
226+ O2_SIGNPOST_EVENT_EMIT_INFO (ccdb, sid, " fetchFromAnalysisCCDB" ,
227+ " * sending a table of size: %lld" ,
228+ expectedSize);
201229 auto concrete = DataSpecUtils::asConcreteDataMatcher (spec);
202230 allocator.adopt (Output{concrete.origin , concrete.description , concrete.subSpec }, outTable);
203231 }
0 commit comments