Skip to content

Commit c9e52e8

Browse files
committed
Add implementation to DataWriter.
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
1 parent 9e61720 commit c9e52e8

4 files changed

Lines changed: 65 additions & 27 deletions

File tree

include/fastdds/dds/topic/TopicDataType.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,13 @@ class TopicDataType
399399
register_type_object_representation();
400400
}
401401

402+
FASTDDS_EXPORTED_API virtual inline uint32_t get_max_serialized_size(
403+
const std::shared_ptr<Context>& context)
404+
{
405+
static_cast<void>(context);
406+
return max_serialized_type_size;
407+
}
408+
402409
//! Maximum serialized size of the type in bytes.
403410
//! If the type has unbounded fields, and therefore cannot have a maximum size, use 0.
404411
uint32_t max_serialized_type_size {0};

src/cpp/fastdds/publisher/DataWriter.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,13 @@ ReturnCode_t DataWriter::set_related_datareader(
336336
ReturnCode_t DataWriter::set_type_support_context(
337337
const std::shared_ptr<TopicDataType::Context>& context)
338338
{
339-
static_cast<void>(context);
340-
return RETCODE_UNSUPPORTED;
339+
if (enable_)
340+
{
341+
return RETCODE_ILLEGAL_OPERATION;
342+
}
343+
344+
impl_->set_type_support_context(context);
345+
return RETCODE_OK;
341346
}
342347

343348
} // namespace dds

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ void DataWriterImpl::create_history(
234234
qos_.history(),
235235
qos_.resource_limits(),
236236
(type_->is_compute_key_provided ? WITH_KEY : NO_KEY),
237-
type_->max_serialized_type_size,
237+
type_->get_max_serialized_size(type_support_context_),
238238
qos_.endpoint().history_memory_policy,
239239
[this](
240240
const InstanceHandle_t& handle) -> void
@@ -252,14 +252,16 @@ ReturnCode_t DataWriterImpl::enable()
252252

253253
auto history_att = DataWriterHistory::to_history_attributes(
254254
qos_.history(),
255-
qos_.resource_limits(), (type_->is_compute_key_provided ? WITH_KEY : NO_KEY), type_->max_serialized_type_size,
255+
qos_.resource_limits(),
256+
(type_->is_compute_key_provided ? WITH_KEY : NO_KEY),
257+
type_->get_max_serialized_size(type_support_context_),
256258
qos_.endpoint().history_memory_policy);
257259
pool_config_ = PoolConfig::from_history_attributes(history_att);
258260

259261
// When the user requested PREALLOCATED_WITH_REALLOC, but we know the type cannot
260262
// grow, we translate the policy into bare PREALLOCATED
261263
if (PREALLOCATED_WITH_REALLOC_MEMORY_MODE == pool_config_.memory_policy &&
262-
(type_->is_bounded() || type_->is_plain(data_representation_)))
264+
(type_->is_bounded(type_support_context_) || type_->is_plain(type_support_context_, data_representation_)))
263265
{
264266
pool_config_.memory_policy = PREALLOCATED_MEMORY_MODE;
265267
}
@@ -536,8 +538,8 @@ ReturnCode_t DataWriterImpl::loan_sample(
536538
microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
537539

538540
// Type should be plain and have space for the representation header
539-
if (!type_->is_plain(data_representation_) ||
540-
SerializedPayload_t::representation_header_size > type_->max_serialized_type_size)
541+
if (!type_->is_plain(type_support_context_, data_representation_) ||
542+
SerializedPayload_t::representation_header_size > type_->get_max_serialized_size(type_support_context_))
541543
{
542544
return RETCODE_ILLEGAL_OPERATION;
543545
}
@@ -561,7 +563,7 @@ ReturnCode_t DataWriterImpl::loan_sample(
561563

562564
// Get one payload from the pool
563565
SerializedPayload_t payload;
564-
uint32_t size = type_->max_serialized_type_size;
566+
uint32_t size = type_->get_max_serialized_size(type_support_context_);
565567
if (!get_free_payload_from_pool(size, payload))
566568
{
567569
return RETCODE_OUT_OF_RESOURCES;
@@ -603,7 +605,7 @@ ReturnCode_t DataWriterImpl::loan_sample(
603605
break;
604606

605607
case LoanInitializationKind::CONSTRUCTED_LOAN_INITIALIZATION:
606-
if (!type_->construct_sample(sample))
608+
if (!type_->construct_sample(type_support_context_, sample))
607609
{
608610
check_and_remove_loan(sample, payload);
609611
payload_pool_->release_payload(payload);
@@ -624,8 +626,8 @@ ReturnCode_t DataWriterImpl::discard_loan(
624626
void*& sample)
625627
{
626628
// Type should be plain and have space for the representation header
627-
if (!type_->is_plain(data_representation_) ||
628-
SerializedPayload_t::representation_header_size > type_->max_serialized_type_size)
629+
if (!type_->is_plain(type_support_context_, data_representation_) ||
630+
SerializedPayload_t::representation_header_size > type_->get_max_serialized_size(type_support_context_))
629631
{
630632
return RETCODE_ILLEGAL_OPERATION;
631633
}
@@ -687,13 +689,14 @@ ReturnCode_t DataWriterImpl::check_write_preconditions(
687689
return RETCODE_NOT_ENABLED;
688690
}
689691

690-
if (type_.get()->is_compute_key_provided)
692+
if (type_->is_compute_key_provided)
691693
{
692694
bool is_key_protected = false;
693695
#if HAVE_SECURITY
694696
is_key_protected = writer_->getAttributes().security_attributes().is_key_protected;
695697
#endif // if HAVE_SECURITY
696-
if (!type_->compute_key(data, instance_handle, is_key_protected) || !instance_handle.isDefined())
698+
if (!type_->compute_key(type_support_context_, data, instance_handle, is_key_protected) ||
699+
!instance_handle.isDefined())
697700
{
698701
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not compute key for data");
699702
return RETCODE_PRECONDITION_NOT_MET;
@@ -785,7 +788,8 @@ ReturnCode_t DataWriterImpl::check_instance_preconditions(
785788
#if HAVE_SECURITY
786789
is_key_protected = writer_->getAttributes().security_attributes().is_key_protected;
787790
#endif // if HAVE_SECURITY
788-
if (!type_->compute_key(data, instance_handle, is_key_protected) || !instance_handle.isDefined())
791+
if (!type_->compute_key(type_support_context_, data, instance_handle, is_key_protected) ||
792+
!instance_handle.isDefined())
789793
{
790794
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not compute key for data");
791795
return RETCODE_PRECONDITION_NOT_MET;
@@ -861,10 +865,11 @@ InstanceHandle_t DataWriterImpl::do_register_instance(
861865
assert(nullptr != payload);
862866
if (0 == payload->length || nullptr == payload->data)
863867
{
864-
uint32_t size = fixed_payload_size_ ? fixed_payload_size_ : type_->calculate_serialized_size(key,
865-
data_representation_);
868+
uint32_t size = fixed_payload_size_
869+
? fixed_payload_size_
870+
: type_->calculate_serialized_size(type_support_context_, key, data_representation_);
866871
payload->reserve(size);
867-
if (!type_->serialize(key, *payload, data_representation_))
872+
if (!type_->serialize(type_support_context_, key, *payload, data_representation_))
868873
{
869874
EPROSIMA_LOG_WARNING(DATA_WRITER, "Key data serialization failed");
870875

@@ -980,7 +985,7 @@ ReturnCode_t DataWriterImpl::get_key_value(
980985
return RETCODE_BAD_PARAMETER;
981986
}
982987

983-
type_->deserialize(*payload, key_holder);
988+
type_->deserialize(type_support_context_, *payload, key_holder);
984989
return RETCODE_OK;
985990
}
986991

@@ -1041,8 +1046,9 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10411046
bool was_loaned = check_and_remove_loan(data, payload);
10421047
if (!was_loaned)
10431048
{
1044-
uint32_t payload_size = fixed_payload_size_ ? fixed_payload_size_ : type_->calculate_serialized_size(
1045-
data, data_representation_);
1049+
uint32_t payload_size = fixed_payload_size_
1050+
? fixed_payload_size_
1051+
: type_->calculate_serialized_size(type_support_context_, data, data_representation_);
10461052
// Initialize payload to null state
10471053
payload.length = 0;
10481054
payload.max_size = 0;
@@ -1058,7 +1064,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10581064
return RETCODE_OUT_OF_RESOURCES;
10591065
}
10601066

1061-
if (!type_->serialize(data, payload, data_representation_))
1067+
if (!type_->serialize(type_support_context_, data, payload, data_representation_))
10621068
{
10631069
EPROSIMA_LOG_WARNING(DATA_WRITER, "Data serialization returned false");
10641070
payload_pool_->release_payload(payload);
@@ -1161,7 +1167,7 @@ ReturnCode_t DataWriterImpl::create_new_change_with_params(
11611167
#if HAVE_SECURITY
11621168
is_key_protected = writer_->getAttributes().security_attributes().is_key_protected;
11631169
#endif // if HAVE_SECURITY
1164-
if (!type_->compute_key(data, handle, is_key_protected) || !handle.isDefined())
1170+
if (!type_->compute_key(type_support_context_, data, handle, is_key_protected) || !handle.isDefined())
11651171
{
11661172
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not compute key for data");
11671173
return RETCODE_PRECONDITION_NOT_MET;
@@ -1570,6 +1576,12 @@ ReturnCode_t DataWriterImpl::set_related_datareader(
15701576
return ret;
15711577
}
15721578

1579+
void DataWriterImpl::set_type_support_context(
1580+
const std::shared_ptr<TopicDataType::Context>& context)
1581+
{
1582+
type_support_context_ = context;
1583+
}
1584+
15731585
bool DataWriterImpl::deadline_timer_reschedule()
15741586
{
15751587
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
@@ -1869,7 +1881,7 @@ ReturnCode_t DataWriterImpl::get_publication_builtin_topic_data(
18691881

18701882
qos_.endpoint().unicast_locator_list.copy_to(publication_data.remote_locators.unicast);
18711883
qos_.endpoint().multicast_locator_list.copy_to(publication_data.remote_locators.multicast);
1872-
publication_data.max_serialized_size = type_->max_serialized_type_size;
1884+
publication_data.max_serialized_size = type_->get_max_serialized_size(type_support_context_);
18731885
publication_data.loopback_transformation =
18741886
writer_->get_participant_impl()->network_factory().network_configuration();
18751887

@@ -2270,7 +2282,7 @@ std::shared_ptr<IPayloadPool> DataWriterImpl::get_payload_pool()
22702282
}
22712283

22722284
// Prepare loans collection for plain types only
2273-
if (type_->is_plain(data_representation_))
2285+
if (type_->is_plain(type_support_context_, data_representation_))
22742286
{
22752287
loans_.reset(new LoanCollection(pool_config_));
22762288
}
@@ -2347,7 +2359,7 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible(
23472359
bool has_bound_payload_size =
23482360
(qos_.endpoint().history_memory_policy == eprosima::fastdds::rtps::PREALLOCATED_MEMORY_MODE ||
23492361
qos_.endpoint().history_memory_policy == eprosima::fastdds::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE) &&
2350-
type_.is_bounded();
2362+
type_->is_bounded(type_support_context_);
23512363

23522364
bool has_key = type_->is_compute_key_provided;
23532365

@@ -2374,7 +2386,8 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible(
23742386
if (!has_bound_payload_size)
23752387
{
23762388
EPROSIMA_LOG_ERROR(DATA_WRITER, "Data sharing cannot be used with "
2377-
<< (type_.is_bounded() ? "memory policies other than PREALLOCATED" : "unbounded data types"));
2389+
<< (type_->is_bounded(type_support_context_) ? "memory policies other than PREALLOCATED" :
2390+
"unbounded data types"));
23782391
return RETCODE_BAD_PARAMETER;
23792392
}
23802393

@@ -2404,7 +2417,8 @@ ReturnCode_t DataWriterImpl::check_datasharing_compatible(
24042417
if (!has_bound_payload_size)
24052418
{
24062419
EPROSIMA_LOG_INFO(DATA_WRITER, "Data sharing disabled because "
2407-
<< (type_.is_bounded() ? "memory policy is not PREALLOCATED" : "data type is not bounded"));
2420+
<< (type_->is_bounded(type_support_context_) ? "memory policy is not PREALLOCATED" :
2421+
"data type is not bounded"));
24082422
return RETCODE_OK;
24092423
}
24102424

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,16 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
452452
ReturnCode_t set_related_datareader(
453453
const DataReader* related_reader);
454454

455+
/**
456+
* @brief Set the type support context to be used when serializing data for this DataWriter.
457+
*
458+
* @param context Shared pointer to the type support context to be used for serialization.
459+
*
460+
* @pre The DataWriter must not be enabled.
461+
*/
462+
void set_type_support_context(
463+
const std::shared_ptr<TopicDataType::Context>& context);
464+
455465
protected:
456466

457467
using IChangePool = eprosima::fastdds::rtps::IChangePool;
@@ -575,6 +585,8 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
575585
mutable std::mutex filters_mtx_;
576586
std::shared_ptr<IContentFilter> sample_prefilter_;
577587

588+
std::shared_ptr<TopicDataType::Context> type_support_context_ {};
589+
578590
ReturnCode_t check_write_preconditions(
579591
const void* const data,
580592
const InstanceHandle_t& handle,

0 commit comments

Comments
 (0)