From bcfff2a9677cab63b98f7014fe3fb3d9847f7f99 Mon Sep 17 00:00:00 2001 From: Erik Hofman Date: Thu, 15 Apr 2021 09:45:17 +0200 Subject: [PATCH] Bi-directional I/O using Data Distribution Services requires a separate reader and writer. --- simgear/io/SGDataDistributionService.cxx | 36 ++++++++++++++---------- simgear/io/SGDataDistributionService.hxx | 10 +++++-- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/simgear/io/SGDataDistributionService.cxx b/simgear/io/SGDataDistributionService.cxx index 9bcfa235..184b9706 100644 --- a/simgear/io/SGDataDistributionService.cxx +++ b/simgear/io/SGDataDistributionService.cxx @@ -24,6 +24,7 @@ # include #endif +#include #include #include @@ -62,6 +63,7 @@ SG_DDS_Topic::setup(const char *topic, const dds_topic_descriptor_t *desc, size_ // If specified as a server (in direction) create a subscriber. // If specified as a client (out direction), create a publisher. +// If sepcified as bi-directional create a publisher and a subscriber. bool SG_DDS_Topic::open(SGProtocolDir direction) { @@ -117,10 +119,10 @@ SG_DDS_Topic::open(dds_entity_t p, dds_guid_t& g, SGProtocolDir direction) dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1); - entry = dds_create_reader(participant, topic, qos, NULL); - if (entry < 0) { + reader = dds_create_reader(participant, topic, qos, NULL); + if (reader < 0) { SG_LOG(SG_IO, SG_ALERT, "dds_create_reader: " - << dds_strretcode(-entry)); + << dds_strretcode(-reader)); return false; } @@ -131,14 +133,14 @@ SG_DDS_Topic::open(dds_entity_t p, dds_guid_t& g, SGProtocolDir direction) { dds_return_t rc; - entry = dds_create_writer(participant, topic, NULL, NULL); - if (entry < 0) { + writer = dds_create_writer(participant, topic, NULL, NULL); + if (writer < 0) { SG_LOG(SG_IO, SG_ALERT, "dds_create_writer: " - << dds_strretcode(-entry)); + << dds_strretcode(-writer)); return false; } - rc = dds_set_status_mask(entry, DDS_PUBLICATION_MATCHED_STATUS); + rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS); if (rc != DDS_RETCODE_OK) { SG_LOG(SG_IO, SG_ALERT, "dds_set_status_mask: " << dds_strretcode(-rc)); @@ -157,12 +159,14 @@ SG_DDS_Topic::read(char *buf, int length) { int result; - if (entry < 0 || length < (int)packet_size) { + if (reader < 0 || length < (int)packet_size) { + SG_LOG(SG_IO, SG_ALERT, "SG_DDS_Topic: data reader not properly set up."); return 0; } dds_sample_info_t info[1]; - result = dds_take(entry, (void**)&buf, info, 1, 1); + memset(buf, 0, length); + result = dds_take(reader, (void**)&buf, info, 1, 1); if(result < 0) { errno = -result; @@ -184,13 +188,14 @@ SG_DDS_Topic::write(const char *buf, const int length) { int result; - if (entry < 0 || length < (int)packet_size) { + if (writer < 0 || length < (int)packet_size) { + SG_LOG(SG_IO, SG_ALERT, "SG_DDS_Topic: data writer not properly set up."); return 0; } if (!status) { - dds_return_t rc = dds_get_status_changes(entry, &status); + dds_return_t rc = dds_get_status_changes(writer, &status); if (rc != DDS_RETCODE_OK) { SG_LOG(SG_IO, SG_ALERT, "dds_get_status_changes: " << dds_strretcode(-rc)); @@ -198,12 +203,12 @@ SG_DDS_Topic::write(const char *buf, const int length) } if (!(status & DDS_PUBLICATION_MATCHED_STATUS)) { - SG_LOG(SG_IO, SG_INFO, "DDS skipping write: no readers."); + SG_LOG(SG_IO, SG_INFO, "SG_DDS_Topic: skipping write: no readers."); return length; // no readers yet. } } - result = dds_write(entry, buf); + result = dds_write(writer, buf); if(result != DDS_RETCODE_OK) { errno = -result; @@ -229,7 +234,8 @@ SG_DDS_Topic::close() shared_participant = true; participant = -1; topic = -1; - entry = -1; + reader = -1; + writer = -1; return true; } @@ -292,7 +298,7 @@ SG_DDS::add(SG_DDS_Topic *topic, const SGProtocolDir d) { readers.push_back(topic); - dds_entity_t entry = topic->get(); + dds_entity_t entry = topic->get_reader(); dds_entity_t rdcond = dds_create_readcondition(entry, DDS_NOT_READ_SAMPLE_STATE); int status = dds_waitset_attach(waitset, rdcond, entry); diff --git a/simgear/io/SGDataDistributionService.hxx b/simgear/io/SGDataDistributionService.hxx index ae286d86..b9a08096 100644 --- a/simgear/io/SGDataDistributionService.hxx +++ b/simgear/io/SGDataDistributionService.hxx @@ -55,7 +55,8 @@ private: bool shared_participant = true; dds_entity_t participant = -1; dds_entity_t topic = -1; - dds_entity_t entry = -1; + dds_entity_t reader = -1; + dds_entity_t writer = -1; dds_guid_t guid; @@ -144,7 +145,12 @@ public: // close the participant. bool close(); - dds_entity_t get() { return entry; } + dds_entity_t get_reader() { return reader; } + dds_entity_t get_writer() { return writer; } + dds_entity_t get(const SGProtocolDir d = SG_IO_IN) { + return (d == SG_IO_OUT) ? writer : reader; + } + const dds_guid_t& get_guid() { return guid; } };