Publisher-Subscriber Layer

eProsima Fast RTPS provides a high-level Publisher-Subscriber Layer, which is a simple to use abstraction over the RTPS protocol. By using this layer, you can code a straight-to-the-point application while letting the library take care of the lower level configuration.

How to use the Publisher-Subscriber Layer

We are going to use the example built in the previous section to explain how this layer works.

The first step is to create a Participant instance, which will act as a container for the Publishers and Subscribers our application needs. For this we use Domain, a static class that manages RTPS entities. We also need to pass a configuration structure for the Participant, which can be left in its default configuration for now:

ParticipantAttributes participant_attr; //Configuration structure
Participant *participant = Domain::createParticipant(participant_attr);

The default configuration provides a basic working set of options with predefined ports for communications. During this tutorial, you will learn to tune eProsima Fast RTPS.

In order to use our topic, we have to register it within the Participant using the code generated with fastrtpsgen (see Introduction. Once again, this is done by using the Domain class:

HelloWorldPubSubType m_type; //Auto-generated type from FastRTPSGen
Domain::registerType(participant, &m_type);

Once set up, we instantiate a Publisher within our Participant:

PublisherAttributes publisher_attr; //Configuration structure
PubListener publisher_listener; //Class that implements callbacks from the publisher
Publisher *publisher = Domain::createPublisher(participant, publisher_attr, &publisher_listener);

Once the Publisher is functional, posting data is a simple process:

HelloWorld sample; //Auto-generated container class for topic data from FastRTPSGen
sample.msg("Hello there!"); // Add contents to the message
publisher->write(&sample); //Publish

The Publisher has a set of optional callback functions that are triggered when events happen. An example is when a Subscriber starts listening to our topic.

To implement these callbacks we create the class PubListener, which inherits from the base class PublisherListener. We pass an instance to this class during the creation of the Publisher.

class PubListener : public PublisherListener
{
    public:

        PubListener() {}
        ~PubListener() {}

        void onPublicationmatched(Publisher* pub, MatchingInfo& info)
        {
            //Callback implementation. This is called each time the Publisher finds a Subscriber on the network that listens to the same topic.
        }
};

The Subscriber creation and implementation are symmetric.

SubscriberAttributes subscriber_attr; //Configuration structure
SubListener subscriber_listener; //Class that implements callbacks from the Subscriber
Subscriber *subscriber = Domain::createSubscriber(participant, subscriber_attr, &subscriber_listener);

Incoming messages are processed within the callback that is called when a new message is received:

class SubListener: public SubscriberListener
{
    public:

        SubListener() {}

        ~SubListener() {}

        void onNewDataMessage(Subscriber * sub)
        {
            if(sub->takeNextData((void*)&sample, &sample_info))
            {
                if(sample_info.sampleKind == ALIVE)
                {
                    std::cout << "New message: " << sample.msg() << std::endl;
                }
            }
        }

        HelloWorld sample; //Storage for incoming messages

        SampleInfo_t sample_info; //Auxiliary structure with meta-data on the message
};

Configuration

eProsima Fast RTPS entities can be configured through the code or XML profiles. This section will show both alternatives.

Participant configuration

The Participant can be configured via the ParticipantAttributes structure. createParticipant function accepts an instance of this structure.

ParticipantAttributes participant_attr;

participant_attr.rtps.setName("my_participant");
participant_attr.rtps.builtin.domainId = 80;

Participant *participant = Domain::createParticipant(participant_attr);

Also, it can be configured through an XML profile. createParticipant function accepts a name of an XML profile.

Participant *participant = Domain::createParticipant("participant_xml_profile");

About XML profiles you can learn more in XML profiles. This is an example of a participant XML profile.

<participant profile_name="participant_xml_conf_profile">
    <rtps>
        <name>my_participant</name>
        <builtin>
            <domainId>80</domainId>
        </builtin>
    </rtps>
</participant>

We will now go over the most common configuration options.

  • Participant name: the name of the Participant forms part of the meta-data of the RTPS protocol.

    C++
    participant_attr.rtps.setName("my_participant");
    
    XML
    <participant profile_name="participant_xml_conf_name_profile">
        <rtps>
            <name>my_participant</name>
        </rtps>
    </participant>
    
  • DomainId: Publishers and Subscribers can only talk to each other if their Participants belong to the same DomainId.

    C++
    participant_attr.rtps.builtin.domainId = 80;
    
    XML
    <participant profile_name="participant_xml_conf_domain_profile">
        <rtps>
            <builtin>
                <domainId>80</domainId>
            </builtin>
        </rtps>
    </participant>
    
  • Mutation Tries: The reader’s physical port could be already bound. In that case, the Participant uses its mutation_tries attribute to determine how many different ports must try before failing. These mutated ports will modify the locator’s information. By default, its value is 100.

    C++
    participant_attr.rtps.builtin.mutation_tries = 55;
    
    XML
    <participant profile_name="participant_xml_conf_mutation_tries_profile">
        <rtps>
            <builtin>
                <mutation_tries>55</mutation_tries>
            </builtin>
        </rtps>
    </participant>
    

Publisher and Subscriber configuration

The Publisher can be configured via the PublisherAttributes structure and createPublisher function accepts an instance of this structure. The Subscriber can be configured via the SubscriberAttributes structure and createSubscriber function accepts an instance of this structure.

PublisherAttributes publisher_attr;
Publisher *publisher = Domain::createPublisher(participant, publisher_attr);

SubscriberAttributes subscriber_attr;
Subscriber *subscriber = Domain::createSubscriber(participant, subscriber_attr);

Also, these entities can be configured through an XML profile. createPublisher and createSubscriber functions accept the name of an XML profile.

Publisher *publisher = Domain::createPublisher(participant, "publisher_xml_profile");
Subscriber *subscriber = Domain::createSubscriber(participant, "subscriber_xml_profile");

We will now go over the most common configuration options.

Topic information

The topic name and data type are used as meta-data to determine whether Publishers and Subscribers can exchange messages.

C++
publisher_attr.topic.topicDataType = "HelloWorldType";
publisher_attr.topic.topicName = "HelloWorldTopic";

subscriber_attr.topic.topicDataType = "HelloWorldType";
subscriber_attr.topic.topicName = "HelloWorldTopic";
XML
<publisher profile_name="publisher_xml_conf_topic_profile">
    <topic>
        <dataType>HelloWorldType</dataType>
        <name>HelloWorldTopic</name>
    </topic>
</publisher>

<subscriber profile_name="subscriber_xml_conf_topic_profile">
    <topic>
        <dataType>HelloWorldType</dataType>
        <name>HelloWorldTopic</name>
    </topic>
</subscriber>

Reliability

The RTPS standard defines two behavior modes for message delivery:

  • Best-Effort (default): Messages are sent without arrival confirmation from the receiver (subscriber). It is fast, but messages can be lost.
  • Reliable: The sender agent (publisher) expects arrival confirmation from the receiver (subscriber). It is slower but prevents data loss.
C++
publisher_attr.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;

subscriber_attr.qos.m_reliability.kind = BEST_EFFORT_RELIABILITY_QOS;
XML
<publisher profile_name="publisher_xml_conf_reliability_profile">
    <qos>
        <reliability>
            <kind>RELIABLE</kind>
        </reliability>
    </qos>
</publisher>

<subscriber profile_name="subscriber_xml_conf_reliability_profile">
    <qos>
        <reliability>
            <kind>BEST_EFFORT</kind>
        </reliability>
    </qos>
</subscriber>

Some reliability combinations make a publisher and a subscriber incompatible and unable to talk to each other. Next table shows the incompatibilities.

Publisher \ Subscriber Best Effort Reliable
Best Effort
Reliable

History

There are two policies for sample storage:

  • Keep-All: Store all samples in memory.
  • Keep-Last (Default): Store samples up to a maximum depth. When this limit is reached, they start to become overwritten.
C++
publisher_attr.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS;

subscriber_attr.topic.historyQos.kind =   KEEP_LAST_HISTORY_QOS;
subscriber_attr.topic.historyQos.depth = 5;
XML
<publisher profile_name="publisher_xml_conf_history_profile">
    <topic>
        <historyQos>
            <kind>KEEP_ALL</kind>
        </historyQos>
    </topic>
</publisher>

<subscriber profile_name="subscriber_xml_conf_history_profile">
    <topic>
        <historyQos>
            <kind>KEEP_LAST</kind>
            <depth>5</depth>
        </historyQos>
    </topic>
</subscriber>

Durability

Durability configuration of the endpoint defines how it behaves regarding samples that existed on the topic before a subscriber joins

  • Volatile: Past samples are ignored, a joining subscriber receives samples generated after the moment it matches.
  • Transient Local (Default): When a new subscriber joins, its History is filled with past samples.
  • Transient: When a new subscriber joins, its History is filled with past samples, which are stored on persistent storage (see Persistence).
C++
publisher_attr.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;

subscriber_attr.qos.m_durability.kind = VOLATILE_DURABILITY_QOS;
XML
<publisher profile_name="publisher_xml_conf_durability_profile">
    <qos>
        <durability>
            <kind>TRANSIENT_LOCAL</kind>
         </durability>
    </qos>
</publisher>

<subscriber profile_name="subscriber_xml_conf_durability_profile">
    <qos>
        <durability>
            <kind>VOLATILE</kind>
        </durability>
    </qos>
</subscriber>

Deadline

The deadline QoS raises an alarm when the frequency of new samples falls below a certain threshold. It is useful for cases where data is expected to be updated periodically, requiring that each instance is updated periodically for topics with key.

On the publishing side, the deadline QoS defines the maximum period in which the application is expected to supply a new sample. On the subscribing side, it defines the maximum period in which new samples should be received. For publishers and subscribers to match, the offered deadline period must be less than or equal to the requested deadline period, otherwise the entities are considered to be incompatible.

C++
publisher_attr.qos.m_deadline.period = 1;

subscriber_attr.qos.m_deadline.period = 1;
XML
<publisher profile_name="publisher_xml_conf_deadline_profile">
    <qos>
        <deadline>
            <period>
                <sec>1</sec>
            </period>
        </deadline>
    </qos>
</publisher>

<subscriber profile_name="subscriber_xml_conf_deadline_profile">
    <qos>
        <deadline>
            <period>
                <sec>1</sec>
            </period>
        </deadline>
    </qos>
</subscriber>

Lifespan

Specifies the maximum duration of validity of the data written by the publisher. When the lifespan period expires, data is removed from the history.

C++
publisher_attr.qos.m_lifespan.duration = 1;

subscriber_attr.qos.m_lifespan.duration = 1;
XML
<publisher profile_name="publisher_xml_conf_lifespan_profile">
    <qos>
        <lifespan>
            <duration>
                <sec>1</sec>
            </duration>
        </lifespan>
    </qos>
</publisher>

<subscriber profile_name="subscriber_xml_conf_lifespan_profile">
    <qos>
        <lifespan>
            <duration>
                <sec>1</sec>
            </duration>
        </lifespan>
    </qos>
</subscriber>

Resource limits

Allow controlling the maximum size of the History and other resources.

C++
publisher_attr.topic.resourceLimitsQos.max_samples = 200;

subscriber_attr.topic.resourceLimitsQos.max_samples = 200;
XML
<publisher profile_name="publisher_xml_conf_resource_limits_profile">
    <topic>
        <resourceLimitsQos>
            <max_samples>200</max_samples>
        </resourceLimitsQos>
    </topic>
</publisher>

<subscriber profile_name="subscriber_xml_conf_resource_limits_profile">
    <topic>
        <resourceLimitsQos>
            <max_samples>200</max_samples>
        </resourceLimitsQos>
    </topic>
</subscriber>

Disable positive acks

This is an additional QoS that allows reducing network traffic when strict reliable communication is not required and bandwidth is limited. It consists in changing the default behavior by which positive acks are sent from readers to writers. Instead, only negative acks will be sent when a reader is missing a sample, but writers will keep data for a sufficient keep duration before considering it as acknowledged. A writer and a reader are incompatible (i.e. they will not match) if the latter is using this QoS but the former is not.

C++
publisher_attr.qos.m_disablePositiveACKs.enabled = true;
publisher_attr.qos.m_disablePositiveACKs.duration = 1;

subscriber_attr.qos.m_disablePositiveACKs.enabled = true;
XML
<publisher profile_name="publisher_xml_conf_disable_positive_acks_profile">
    <qos>
        <disablePositiveAcks>
            <enabled>true</enabled>
            <duration>
                <sec>1</sec>
            </duration>
        </disablePositiveAcks>
    </qos>
</publisher>

<subscriber profile_name="subscriber_xml_conf_disable_positive_acks_profile">
    <qos>
        <disablePositiveAcks>
            <enabled>true</enabled>
        </disablePositiveAcks>
    </qos>
</subscriber>

Unicast locators

They are network endpoints where the entity will receive data. For more information about the network, see Transports. Publishers and subscribers inherit unicast locators from the participant. You can set a different set of locators through this attribute.

C++
Locator_t new_locator;
new_locator.port = 7800;

subscriber_attr.unicastLocatorList.push_back(new_locator);

publisher_attr.unicastLocatorList.push_back(new_locator);
XML
<publisher profile_name="publisher_xml_conf_unicast_locators_profile">
    <unicastLocatorList>
        <locator>
            <udpv4>
                <port>7800</port>
            </udpv4>
        </locator>
    </unicastLocatorList>
</publisher>

<subscriber profile_name="subscriber_xml_conf_unicast_locators_profile">
    <unicastLocatorList>
        <locator>
            <udpv4>
                <port>7800</port>
            </udpv4>
        </locator>
    </unicastLocatorList>
</subscriber>

Multicast locators

They are network endpoints where the entity will receive data. For more information about network configuration, see Transports. By default publishers and subscribers don’t use any multicast locator. This attribute is useful when you have a lot of entities and you want to reduce the network usage.

C++
Locator_t new_locator;

IPLocator::setIPv4(new_locator, "239.255.0.4");
new_locator.port = 7900;

subscriber_attr.multicastLocatorList.push_back(new_locator);

publisher_attr.multicastLocatorList.push_back(new_locator);
XML
<publisher profile_name="publisher_xml_conf_multicast_locators_profile">
    <multicastLocatorList>
        <locator>
            <udpv4>
                <address>239.255.0.4</address>
                <port>7900</port>
            </udpv4>
        </locator>
    </multicastLocatorList>
</publisher>

<subscriber profile_name="subscriber_xml_conf_multicast_locators_profile">
    <multicastLocatorList>
        <locator>
            <udpv4>
                <address>239.255.0.4</address>
                <port>7900</port>
            </udpv4>
        </locator>
    </multicastLocatorList>
</subscriber>

Additional Concepts

Using message meta-data

When a message is taken from the Subscriber, an auxiliary SampleInfo_t structure instance is also returned.

Static types
HelloWorld sample;
SampleInfo_t sample_info;
subscriber->takeNextData((void*)&sample, &sample_info);
Dynamic types
// input_type is an instance of DynamicPubSubType of out current dynamic type
DynamicPubSubType *pst = dynamic_cast<DynamicPubSubType*>(input_type);
DynamicData *sample = DynamicDataFactory::get_instance()->create_data(pst->GetDynamicType());
subscriber->takeNextData(sample, &sample_info);

This SampleInfo_t structure contains meta-data on the incoming message:

  • sampleKind: type of the sample, as defined by the RTPS Standard. Healthy messages from a topic are always ALIVE.
  • WriterGUID: Signature of the sender (Publisher) the message comes from.
  • OwnershipStrength: When several senders are writing the same data, this field can be used to determine which data is more reliable.
  • SourceTimestamp: A timestamp on the sender side that indicates the moment the sample was encapsulated and sent.

This meta-data can be used to implement filters:

if( (sample_info.sampleKind == ALIVE) & (sample_info.ownershipStrength > 25) )
{
    //Process data
}

Defining callbacks

As we saw in the example, both the Publisher and Subscriber have a set of callbacks you can use in your application. These callbacks are to be implemented within classes that derive from SubscriberListener or PublisherListener. The following table gathers information about the possible callbacks that can be implemented in both cases:

Callback Publisher Subscriber
onNewDataMessage N Y
onSubscriptionMatched N Y
onPublicationMatched Y N
on_offered_deadline_missed Y N
on_requested_deadline_missed N Y