Introduction to OpenDDS
from http://www.opendds.org/Article-Intro.htmlDon Busch, Principal Software Engineer and Partner
Object Computing, Inc. (OCI)
Introduction
Distributed real-time applications are sometimes more data-centric than service-centric, meaning that the primary objective of the participants in the distributed system is the dissemination of application data rather than access to shared services. The set of suppliers and/or consumers of application data may not be known at design time, and may change through the execution lifetime of the application. Typically, the data-centric paradigm is most efficiently realized by a publish/subscribe communication model rather than a request/response model.
The OMG Data Distribution Service (DDS) for Real-Time Systems addresses the performance requirements and hard real-time requirements of distributed data-centric applications. DDS increases the range of publish/subscribe options available to developers of distributed real-time systems. For convenience, the DDS interfaces are defined using OMG Interface Definition Language (IDL). However, most details are left to the implementation, most significantly how data transfer occurs between the publishers and subscribers. A DDS implementer decides on the underlying communication mechanism that moves data from a publisher to a subscriber -- via TCP, UDP, UDP multicast, shared memory, etc. An implementation of the DDS specification is not required to use CORBA or the IIOP protocol to transfer data from a publisher to a subscriber.
OpenDDS is an open source, C++ implementation of the OMG Data Distribution Service specification. Recent versions of OpenDDS, including OpenDDS1.0, include a file-based configuration mechanism. Through a configuration file, an OpenDDS user may configure a publisher's or subscriber's transport(s), debugging output, memory allocation, the location of the DCPSInfoRepo broker process, and many other settings. The complete set of configuration settings is described in the OpenDDS chapter of the TAO Developer's Guide.
In this article, we cover the following topics:
The OpenDDS Implementation of OMG DDS
DDS Architecture
Stock Quoter Example
IDL Types
Publisher
Subscriber
Subscriber's Listeners
Building the Publisher and Subscriber
Configuring the Stock Quoter
Running the Stock Quoter over a TCP Transport
Running the Stock Quoter over a UDP Transport
Summary
References
The OpenDDS Implementation of OMG DDS
OpenDDS leverages a pluggable transport architecture, enabling data delivery through transport and marshaling implementations of the application developer's choosing. Conceptually, the architecture borrows from TAO's Pluggable Protocols Framework. OpenDDS currently supports TCP and UDP point-to-point transports as well as unreliable and reliable multicast, and uses a high-performance marshaling implementation.
This pluggable transport architecture permits a DDS user to optimize a DDS installation based on the desired transport and the homogeneous or heterogeneous nature of the application's deployment. These choices can be made without affecting the application code itself.
Marshaling code is generated by a specialized OpenDDS IDL compiler. A single, separate DCPS Information Repository (DCPSInfoRepo) process acts as a central clearinghouse, associating publishers and subscribers. Under the covers, OpenDDS uses CORBA to communicate with the DCPSInfoRepo process to associate publishers and subscribers. Data transfer between publishers and subscribers is direct between the publishing and subscribing processes. OpenDDS creates its own threads for the RB and for the non-CORBA I/O that takes place when sending or receiving DDS data.
DDS Architecture
The OMG Data Distribution Service specification separates DDS into two separate architectural layers. The lower layer is the Data-Centric Publish and Subscribe (DCPS) layer, containing type-safe interfaces to a publish/subscribe communication mechanism. The upper layer is the Data Local Reconstruction Layer (DLRL), which enables an application developer to construct a local object model on top of the DCPS layer, shielding the application from DCPS knowledge. Each layer has its own set of concepts and usage patterns, and thus the concepts and terminology of the two layers can be discussed separately.
Data-Centric Publish and Subscribe - DCPS
The DCPS layer is responsible for efficiently disseminating data from publishers to interested subscribers. It is implemented using the concepts of publisher and data writer on the sending side and subscriber and data reader on the receiving side. The DCPS layer consists of one or more data domains, each of which contains a set of participants (publishers and subscribers) that communicate via DDS. Each entity (i.e. publisher or subscriber) belongs to a domain. Each process has one domain participant for each data domain of which it is a member.
Within any data domain, data is identified by a topic, which is a type-specific domain segment that allows publishers and subscribers to refer to data unambiguously. Within a domain, a topic associates a unique topic name, data type, and a set of Quality of Service (QoS) policies with the data itself. Each topic is associated with only one data type, although many different topics can publish the same data type. Behavior of publishers is determined by the QoS policies associated with the publisher, data writer, and topic elements for a particular data source. Likewise, behavior of subscribers is determined by the QoS policies associated with the subscriber, data reader, and topic elements for a particular data sink.
For more information on DCPS terminology, please see the OpenDDS Chapter of the TAO Developer's Guide.
The DDS specification defines a number of Quality of Service (QoS) policies that applications use to specify their reliability, resource usage, fault tolerance, and other requirements to the service. Participants specify the behavior that they require from the service; the service decides how to achieve these behaviors. These policies may be applied to the various DCPS entities (Topic, Data Writer, Data Reader, Publisher, Subscriber, and Domain Participant) although not all policies are valid for all types of entities.
Subscribers and publishers collaborate to specify QoS policies through an offer-request paradigm. A publisher offers a set of QoS policies to all subscribers; a subscriber requests the set of QoS policies that it requires. The DDS implementation then attempts to match the requested policies with the offered policies. If the policies are consistent, then the publication and the subscription are matched.
OpenDDS supports the following DCPS Quality-of-Service (QoS) Policies:
QoS Policy
Description
Liveliness
Controls liveliness checks to make sure expected entities in the system are still alive
Reliability
Determines whether the service is allowed to drop samples
History
Controls what happens to an instance whose value changes before it is communicated to all Subscribers
Resource Limits
Controls resources that the service can use to meet other QoS requirements
See the Object Management Group's Introduction to DDS Whitepaper, Appendix A, for a more complete list of Quality-of-Service policies and more detailed Quality-of-Service definitions.
Data-Local Reconstruction Layer - DLRL
The Data-Local Reconstruction Layer (DLRL) is an object-oriented layer on top of DCPS. A DLRL object is a native-language (i.e. C++) object with one or more shared attributes. Each DLRL class is mapped to one or more DCPS topics; each shared attribute value is mapped to a field in a topic's data type, and its value is distributed across the application via DCPS. A DLRL participant communicates data to the rest of the application by modifying a DLRL object, resulting in the the publication of a data sample on the associated topic. A DLRL shared attribute may be a simple value or structure, a reference to another DLRL object, or a collection (list, map) of those. DLRL supports complex object graphs and complex relationships between DLRL objects.
The developer is responsible for deciding how DCPS entities are mapped to DLRL objects. The model is specified in OMG Interface Definition Language (IDL) using IDL valuetypes. The mapping is conceptually similar to an object-relational database mapping, which maps an object model onto relational database tables. We think of each DCPS topic as analogous to a relational database table, and each sample as a row in that table. The DDS specification has a default mapping from DCPS to DLRL. Or, a developer can choose to specify his own custom mapping via an XML mapping file.
OpenDDS 1.0 does not implement the DLRL.
Table of Contents
OpenDDS Stock Quoter Example
Our example illustrates publication of and subscription to data samples though the DDS DCPS layer. The example contains two DCPS topics, both related to the stock market.
A stock quote publisher publishes stock quote samples to interested subscribers; each quote contains the ticker symbol of a security, its value, and a time stamp. Quotes are published periodically throughout the trading day, as buy and sell transactions affect the underlying value of the security. In addition, a stock exchange event publisher publishes important events relating to a stock exchange, namely when the exchange opens, closes, and when trading is suspended or resumed.
Our subscriber subscribes to both stock quotes and stock exchange events. The subscriber prints the ticker symbol and value of each quote it sees. When the subscriber receives an event indicating that the stock exchange has closed for the day, it gracefully shuts down. Thus, the receipt of a "closed" stock exchange event is the subscriber's signal to stop expecting stock quote samples.
We will demonstrate how to use the same publisher and subscriber code to communicate via the TCP and UDP transports. The transport configuration is isolated in a set of configuration files, allowing us to switch transports without making any code changes.
Table of Contents
IDL Types
First, we define our published DDS data types in IDL: #include "orbsvcs/TimeBase.idl"
module StockQuoter
{
#pragma DCPS_DATA_TYPE "StockQuoter::Quote"
#pragma DCPS_DATA_KEY "StockQuoter::Quote ticker"
struct Quote {
string ticker;
string exchange;
string full_name;
double value;
TimeBase::TimeT timestamp;
};
#pragma DCPS_DATA_TYPE "StockQuoter::ExchangeEvent"
#pragma DCPS_DATA_KEY "StockQuoter::ExchangeEvent exchange"
enum ExchangeEventType { TRADING_OPENED,
TRADING_CLOSED,
TRADING_SUSPENDED,
TRADING_RESUMED };
struct ExchangeEvent {
string exchange;
ExchangeEventType event;
TimeBase::TimeT timestamp;
};
};
We publish two data types: a Quote type for each stock quote, and an ExchangeEvent type to indicate when the stock exchange is opened, closed, and when trading is suspended or resumed. The DCPS_DATA_TYPE pragma marks a type for use with DDS. The DCPS_DATA_KEY defined for each type is a unique identifier for each instance of the data type. Our Quote type's key is the ticker symbol of the stock. Throughout the day, we would expect to publish many values, or samples, for each ticker symbol. The set of published samples for each ticker symbol belongs to the same instance. In our example, we'll publish two ticker symbols, and thus two instances: SPY (S&P Depository Receipts, i.e. the S&P 500) and MDY (S&P Midcap Depository Receipts, i.e. the S&P Midcap 400).
Next, we compile the IDL with OpenDDS's dcps_ts.pl script to generate type support code. The type support code consists of generated DCPS data writer and data reader C++ classes and additional IDL code. DDS uses type-safe interfaces for publication and subscription. Type-safe interfaces have several advantages: first, programming errors are more likely to be caught at compile time; second, generated marshaling code can be made very efficient when the marshaled data type is known at compile time; and third, we can avoid the use of inefficient types such as the CORBA any in data transfer.
The command to generate type support code for the Stock Quoter's IDL types is as follows: $DDS_ROOT/bin/dcps_ts.pl StockQuoter.idl
This command generates the following files: QuoteTypeSupport.idl
QuoteTypeSupportImpl.h
QuoteTypeSupportImpl.cpp
ExchangeEventTypeSupport.idl
ExchangeEventTypeSupportImpl.h
ExchangeEventTypeSupportImpl.cpp
Note that dcps_ts.pl generates a set of type support files for each DDS type in the StockQuoter.idl file.
However, we don't need to run the dcps_ts.pl script manually. Later, we'll use a Make Project Creator (MPC) project to automate the build steps for us.
Next, we use TAO's IDL compiler to compile all three IDL files -- the StockQuoter.idl file we wrote manually, plus the two type support files generated by dcps_ts.pl. tao_idl -Gdcps -I $DDS_ROOT -I$(TAO_ROOT)/orbsvcs StockQuoter.idl
tao_idl -Gdcps -I $DDS_ROOT -I$(TAO_ROOT)/orbsvcs StockQuoteTypeSupport.idl
tao_idl -Gdcps -I $DDS_ROOT -I$(TAO_ROOT)/orbsvcs ExchnageEventTypeSupport.idl
The -Gdcps command-line argument causes the TAO IDL compiler to generate code supporting DCPS type definitions. Again, we'll automate this later with MPC.
Table of Contents
Publisher
Next, we write a publisher to publish stock quotes and stock exchange events via DDS. First, we include the two type support header files generated by the dcps_ts.pl script. #include "QuoteTypeSupportImpl.h"
#include "ExchangeEventTypeSupportImpl.h"
We also include DCPS publisher, service participant, and QoS header files. #include "dds/DCPS/Service_Participant.h"
#include "dds/DCPS/Marked_Default_Qos.h"
#include "dds/DCPS/PublisherImpl.h"
#include "dds/DCPS/transport/framework/TheTransportFactory.h"
#include "ace/streams.h"
#include "orbsvcs/Time_Utilities.h"
We will configure the publisher's transport via a configuration file. However, we need an identifier to tie a transport entry in the configuration file to the transport configured in the code. This transport id value must match the integer identifier defined later in the transport configuration file. Each topic -- in fact, each publisher within a topic -- can, in theory, have its own transport. // constant used by this publisher for transport; the
// TRANSPORT_IMPL_ID must match the value in the
// configuration file.
const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
The following constants are used for our domain, type names and topic names. Each type is published on a separate topic. The subscriber must use the same values for its domain, type names and topic names. // constants for Stock Quoter domain Id, types, and topic
DDS::DomainId_t QUOTER_DOMAIN_ID = 1066;
const char* QUOTER_QUOTE_TYPE = "Quote Type";
const char* QUOTER_QUOTE_TOPIC = "Stock Quotes";
const char* QUOTER_EXCHANGE_EVENT_TYPE =
"Exchange Event Type";
const char* QUOTER_EXCHANGE_EVENT_TOPIC =
"Stock Exchange Events";
When a stock exchange event - i.e. opened, closed, suspended, or resumed -- is published, we also publish the name of the stock exchange to which the event applies. const char* STOCK_EXCHANGE_NAME = "Test Stock Exchange";
This is a simple helper method to get the current date and time. TimeBase::TimeT get_timestamp()
{
TimeBase::TimeT retval;
ACE_hrtime_t t = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (retval, t );
return retval;
}
The remainder of the publisher's source code file contains its main(). We enter the publisher's main() int main (int argc, char *argv[])
{
DDS::DomainParticipantFactory_var dpf =
DDS::DomainParticipantFactory::_nil();
DDS::DomainParticipant_var participant =
DDS::DomainParticipant::_nil();
try
{
First, we create a domain participant. A DDS publisher may publish on many independent domains, but our example only publishes on one domain. We use the TheDomainParticipantFactoryWithArgs macro to pass command-line arguments into DCPS and get the singleton domain participant factory. We create one domain participant, for the "Quote" domain, using the default Quality-of-Service policies for a domain participant. The value of QUOTER_DOMAIN_ID passed into the factory must be identical in the publisher and the subscriber. // Initialize, and create a DomainParticipant
dpf = TheParticipantFactoryWithArgs(argc, argv);
participant = dpf->create_participant(
QUOTER_DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil());
if (CORBA::is_nil (participant.in ()))
{
cerr << "create_participant failed." << endl;
ACE_OS::exit(1);
}
Then, we create a publisher through the domain participant with default Quality-of-Service values. We can attach a PublisherListener that is called by DCPS when certain publication-related events happen. However, we don't care about those events, so we attach a nil listener. // Create a publisher for the two topics
// (PUBLISHER_QOS_DEFAULT is defined in
// Marked_Default_Qos.h)
DDS::Publisher_var pub =
participant->create_publisher(
PUBLISHER_QOS_DEFAULT,
DDS::PublisherListener::_nil());
if (CORBA::is_nil (pub.in ()))
{
cerr << "create_publisher failed." << endl;
ACE_OS::exit(1);
}
Next, we create a transport implementation to attach to the publisher. We get the transport implementation from the singleton transport factory, called TheTransportFactory. The value of the TRANSPORT_IMPL_ID identifier must match the transport id value in our configuration file (more on that later). The OpenDDS::DCPS::AUTO_CONFIG argument indicates that we're using a configuration file to configure the transport implementation. Note that the code itself does not need to know any details about the transport implementation -- whether it uses TCP, UDP, what its endpoints are, etc. // Initialize the transport; the TRANSPORT_IMPL_ID
// must match the value in the configuration file.
OpenDDS::DCPS::TransportImpl_rch trans_impl =
TheTransportFactory->create_transport_impl (
TRANSPORT_IMPL_ID,
OpenDDS::DCPS::AUTO_CONFIG);
We get the publisher's implementation object and attach the transport to it. Each publisher may have a different transport implementation, or several publishers may share the same transport implementation. In our case, we have one publisher, and thus one transport implementation. // Attach the publisher to the TCP transport.
OpenDDS::DCPS::PublisherImpl* pub_impl =
OpenDDS::DCPS::reference_to_servant<
OpenDDS::DCPS::PublisherImpl>(pub.in ());
if (0 == pub_impl)
{
cerr << "Failed to obtain publisher servant" << endl;
ACE_OS::exit(1);
}
OpenDDS::DCPS::AttachStatus status =
pub_impl->attach_transport(trans_impl.in());
if (status != OpenDDS::DCPS::ATTACH_OK)
{
cerr << "Failed to attach to the transport. "
<< "Status == "
<< status << endl;
ACE_OS::exit(1);
}
There are three steps involved in publishing through DCPS. First, we register each type for the published data samples. Our example publishes samples of two IDL types, Quote and ExchangeEvent. Second, we create one or more topics upon which we publish. Each topic can only be bound to one type; thus we create a topic for each of our two types. Third, we create a data writer for each topic, and publish samples through the data writer.
We first register the IDL Quote type with the domain participant, passing an instance of the generated QuoteTypeSupportImpl class for the Quote type. The name that we use for the Quote type, which is stored in the constant value QUOTER_QUOTE_TYPE, must match the name used on the subscriber. When we create a topic, we specify this type name, enabling DCPS to later create the appropriate type of data writer for the topic. // Register the Quote type
StockQuoter::QuoteTypeSupport_var quote_servant
= new StockQuoter::QuoteTypeSupportImpl();
if (DDS::RETCODE_OK !=
quote_servant->register_type(participant.in (),
QUOTER_QUOTE_TYPE))
{
cerr << "register_type for " << QUOTER_QUOTE_TYPE
<< " failed." << endl;
ACE_OS::exit(1);
}
We then register the IDL ExchangeEvent type with the domain participant in the same way, using the generated ExchangeEventTypeSupportImpl class. Our DCPS domain participant is able to publish on topics for Quote or ExchangeEvent types. // Register the ExchangeEvent type
StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant
= new StockQuoter::ExchangeEventTypeSupportImpl();
if (DDS::RETCODE_OK !=
exchange_evt_servant->register_type(
participant.in (),
QUOTER_EXCHANGE_EVENT_TYPE))
{
cerr << "register_type for "
<< QUOTER_EXCHANGE_EVENT_TYPE
<< " failed." << endl;
ACE_OS::exit(1);
}
We create a topic for our Quote samples, indicating the topic name and the registered name of the Quote data type and using the default Quality-of-Service settings. Again, the Quote topic and type names must match on the publisher and the subscriber. // Get QoS to use for our two topics
// Could also use TOPIC_QOS_DEFAULT instead
DDS::TopicQos default_topic_qos;
participant->get_default_topic_qos(default_topic_qos);
// Create a topic for the Quote type...
DDS::Topic_var quote_topic =
participant->create_topic (QUOTER_QUOTE_TOPIC,
QUOTER_QUOTE_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (quote_topic.in ()))
{
cerr << "create_topic for "
<< QUOTER_QUOTE_TOPIC
<< " failed." << endl;
ACE_OS::exit(1);
}
Similarly, we create a topic for our ExchangeEvent samples, indicating the topic name and the registered name of the ExchangeEvent type, and using the default Quality-of-Service settings. Again, the stock exchange event topic and type names must match on the publisher and on the subscriber. // .. and another topic for the Exchange Event type
DDS::Topic_var exchange_evt_topic =
participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,
QUOTER_EXCHANGE_EVENT_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (exchange_evt_topic.in ()))
{
cerr << "create_topic for "
<< QUOTER_EXCHANGE_EVENT_TOPIC
<< " failed."
<< endl;
ACE_OS::exit(1);
}
We create two data writers, one for each topic. We pass in the topic created above; the topic knows its type. Each data writer is associated to exactly one publisher and publishes on one topic. Later, our publisher publishes on each topic by writing data samples to each data writer. The following code creates a data writer for the "Stock Quotes" topic. // Get QoS to use for our two DataWriters
// Could also use DATAWRITER_QOS_DEFAULT
DDS::DataWriterQos dw_default_qos;
pub->get_default_datawriter_qos (dw_default_qos);
// Create a DataWriter for the Quote topic
DDS::DataWriter_var quote_base_dw =
pub->create_datawriter(quote_topic.in (),
dw_default_qos,
DDS::DataWriterListener::_nil());
if (CORBA::is_nil (quote_base_dw.in ()))
{
cerr << "create_datawriter for "
<< QUOTER_QUOTE_TOPIC
<< " failed." << endl;
ACE_OS::exit(1);
}
StockQuoter::QuoteDataWriter_var quote_dw
= StockQuoter::QuoteDataWriter::_narrow(quote_base_dw.in());
if (CORBA::is_nil (quote_dw.in ()))
{
cerr << "QuoteDataWriter could not be narrowed"
<< endl;
ACE_OS::exit(1);
}
We then create a data writer for the "Stock Exchange Event" topic. Again, we pass in the topic created above, and the topic knows its type. // Create a DataWriter for the Exchange Event topic
DDS::DataWriter_var exchange_evt_base_dw =
pub->create_datawriter(exchange_evt_topic.in (),
dw_default_qos,
DDS::DataWriterListener::_nil());
if (CORBA::is_nil (exchange_evt_base_dw.in ()))
{
cerr << "create_datawriter for "
<< QUOTER_EXCHANGE_EVENT_TOPIC
<< " failed." << endl;
ACE_OS::exit(1);
}
StockQuoter::ExchangeEventDataWriter_var exchange_evt_dw =
StockQuoter::ExchangeEventDataWriter::_narrow(
exchange_evt_base_dw.in());
if (CORBA::is_nil (exchange_evt_dw.in ()))
{
cerr << "ExchangeEventDataWriter could not "
<< "be narrowed"<< endl;
ACE_OS::exit(1);
}
We may choose to register each data instance. Registering each data instance will slightly improve latency while writing samples of that instance.
A publisher may publish many data samples on each data instance. A data instance is identified by a unique key. For the Quote type, we identified ticker as the key field in its IDL type definition. Each Quote data sample with the same key value is considered part of the same data instance. In other words, each Quote sample published on the ticker symbol "SPY" is part of the same instance.
We have two Quote instances, for tickers symbols "SPY" (S&P Depository Receipts, i.e. the S&P 500) and "MDY" (S&P Midcap Depository Receipts, i.e. the S&P Midcap 400), and one ExchangeEvent instance, for the "Test Stock Exchange". We register each instance with the appropriate data writer. The registration method is actually called _cxx_register because register is a reserved word in C++. // Register the Exchange Event and the two
// Quoted securities (SPY and MDY) with the
// appropriate data writer
StockQuoter::Quote spy;
spy.ticker = CORBA::string_dup("SPY");
DDS::InstanceHandle_t spy_handle =
quote_dw->_cxx_register(spy);
StockQuoter::Quote mdy;
mdy.ticker = CORBA::string_dup("MDY");
DDS::InstanceHandle_t mdy_handle =
quote_dw->_cxx_register(mdy);
StockQuoter::ExchangeEvent ex_evt;
ex_evt.exchange = STOCK_EXCHANGE_NAME;
DDS::InstanceHandle_t exchange_handle =
exchange_evt_dw->_cxx_register(ex_evt);
Finally, we publish. First, we publish a TRADING_OPENED event on the "Stock Exchange Event" topic. // Publish...
StockQuoter::ExchangeEvent opened;
opened.exchange = STOCK_EXCHANGE_NAME;
opened.event = StockQuoter::TRADING_OPENED;
opened.timestamp = get_timestamp();
cout << "Publishing TRADING_OPENED" << endl;
DDS::ReturnCode_t ret =
exchange_evt_dw->write(opened, exchange_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: OPEN write returned %d.\n"),
ret));
}
Then, we publish several stock quote data samples for the "SPY" and "MDY" instances on the "Stock Quote" topic. We simply loop, increasing the quoted value for each ticker symbol a bit each time to simulate active trading on a really good day. ACE_Time_Value quarterSecond( 0, 250000 );
for ( int i = 0; i < 20; ++i )
{
//
// SPY
//
StockQuoter::Quote spy_quote;
spy_quote.exchange = STOCK_EXCHANGE_NAME;
spy_quote.ticker = CORBA::string_dup("SPY");
spy_quote.full_name =
CORBA::string_dup("S&P Depository Receipts");
spy_quote.value = 1200.0 + 10.0*i;
spy_quote.timestamp = get_timestamp();
cout << "Publishing SPY Quote: "
<< spy_quote.value << endl;
ret = quote_dw->write(spy_quote, spy_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: SPY write returned %d.\n"),
ret));
}
ACE_OS::sleep( quarterSecond );
//
// MDY
//
StockQuoter::Quote mdy_quote;
mdy_quote.exchange = STOCK_EXCHANGE_NAME;
mdy_quote.ticker = CORBA::string_dup("MDY");
mdy_quote.full_name =
CORBA::string_dup("S&P Midcap Depository Receipts");
mdy_quote.value = 1400.0 + 10.0*i;
mdy_quote.timestamp = get_timestamp();
cout << "Publishing MDY Quote: "
<< mdy_quote.value << endl;
ret = quote_dw->write(mdy_quote, mdy_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: MDY write returned %d.\n"),
ret));
}
ACE_OS::sleep( quarterSecond );
}
Last, we publish a TRADING_CLOSED event on the "Stock Exchange Event" topic to indicate that the stock exchange is closed for the day. StockQuoter::ExchangeEvent closed;
closed.exchange = STOCK_EXCHANGE_NAME;
closed.event = StockQuoter::TRADING_CLOSED;
closed.timestamp = get_timestamp();
cout << "Publishing TRADING_CLOSED" << endl;
ret = exchange_evt_dw->write(closed, exchange_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: CLOSED write returned %d.\n"),
ret));
}
cout << "Exiting..." << endl;
} catch (CORBA::Exception& e) {
cerr << "Exception caught in main.cpp:" << endl
<< e << endl;
ACE_OS::exit(1);
}
Finally, we clean up after ourselves before exiting. // Cleanup
try {
if (!CORBA::is_nil (participant.in ())) {
participant->delete_contained_entities();
}
if (!CORBA::is_nil (dpf.in ())) {
dpf->delete_participant(participant.in ());
}
} catch (CORBA::Exception& e) {
cerr << "Exception caught in cleanup."
<< endl
<< e << endl;
ACE_OS::exit(1);
}
TheTransportFactory->release();
TheServiceParticipant->shutdown ();
return 0;
}
This completes the C++ code for the publisher.
Table of Contents
Subscriber
Our subscriber subscribes to the stock quotes and stock exchange events, receiving data samples from the publisher. We use the publisher's TRADING_CLOSED event to indicate that trading has finished for the day, triggering the subscriber's graceful shutdown.
Much of the code in the subscriber is similar to that in the publisher. We get a domain participant, register types, etc. in the same way we did in the publisher. The main difference is that the subscriber is passive, waiting to receive samples, while the publisher is active. The subscriber uses listener objects to receive samples from the publisher.
First, we include the two type support header files generated by the dcps_ts.pl script. We also included these files in the publisher. However, we also include two listener header files, one for each published type. A listener is called by DDS when a data sample is published on the associated topic. #include "QuoteTypeSupportImpl.h"
#include "QuoteDataReaderListenerImpl.h"
#include "ExchangeEventTypeSupportImpl.h"
#include "ExchangeEventDataReaderListenerImpl.h"
We also include DCPS subscriber, service participant, and QoS header files. #include "dds/DCPS/Service_Participant.h"
#include "dds/DCPS/Marked_Default_Qos.h"
#include "dds/DCPS/SubscriberImpl.h"
#include "dds/DCPS/transport/framework/TheTransportFactory.h"
#include "dds/DCPS/BuiltinTopicUtils.h"
#include "ace/streams.h"
#include "orbsvcs/Time_Utilities.h"
As in the publisher, we will configure the subscriber's transport via a configuration file. Again, we need an identifier to tie a transport entry in the configuration file to the transport configured in the code. It is important that this number matches the value in the subscriber's configuration file; it doesn't matter if it matches the same number in the publisher. We'll use it to create the transport implementation that we attach to our subscriber. // constants used by this publisher for transport; the
// TRANSPORT_IMPL_ID must match the value in the
// configuration file.
const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
The following constants are used for our domain, type names and topic names. These must match the domain, type names, and topic names used by the publisher. // constants for Stock Quoter domain Id, types, and topic
// (same as publisher)
DDS::DomainId_t QUOTER_DOMAIN_ID = 1066;
const char* QUOTER_QUOTE_TYPE = "Quote Type";
const char* QUOTER_QUOTE_TOPIC = "Stock Quotes";
const char* QUOTER_EXCHANGE_EVENT_TYPE =
"Exchange Event Type";
const char* QUOTER_EXCHANGE_EVENT_TOPIC =
"Stock Exchange Events";
The remainder of the subscriber's source code file contains its main(). We enter the subscriber's main(). int main (int argc, char *argv[])
{
DDS::DomainParticipantFactory_var dpf =
DDS::DomainParticipantFactory::_nil();
DDS::DomainParticipant_var participant =
DDS::DomainParticipant::_nil();
try {
We create a domain participant, just as we did in the publisher. The specified domain matches the publisher's domain. // Initialize, and create a DomainParticipant
// (same code as publisher)
dpf = TheParticipantFactoryWithArgs(argc, argv);
participant = dpf->create_participant(
QUOTER_DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil());
if (CORBA::is_nil (participant.in ()))
{
cerr << "create_participant failed." << endl;
ACE_OS::exit(1);
}
Then, we create a subscriber through the domain participant with default Quality-of-Service policy values. We can attach a SubscriberListener that is called by DCPS when certain subscription-related events happen. However, we don't care about those events, so we attach a nil. This is almost the same as what we did in the publisher when we called create_publisher. // Create a subscriber for the two topics
// (SUBSCRIBER_QOS_DEFAULT is defined
// in Marked_Default_Qos.h)
DDS::Subscriber_var sub =
participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil());
if (CORBA::is_nil (sub.in ()))
{
cerr << "create_subscriber failed." << endl;
ACE_OS::exit(1);
}
As in the publisher, we create a transport implementation. We get the transport implementation from the singleton transport factory, called TheTransportFactory. Again, the value of the TRANSPORT_IMPL_ID identifier must match the transport id value in our configuration file (more on that later). The OpenDDS::DCPS::AUTO_CONFIG argument indicates that we're using a configuration file to configure the transport implementation. // Initialize the transport; the TRANSPORT_IMPL_ID
// must match the value in the configuration file.
OpenDDS::DCPS::TransportImpl_rch trans_impl =
TheTransportFactory->create_transport_impl (
TRANSPORT_IMPL_ID,
OpenDDS::DCPS::AUTO_CONFIG);
We get the subscriber's implementation object and attach the transport to it. Each subscriber may have a different transport, or several subscribers may share the same transport. In our case, we have one subscriber, and thus one transport implementation. // Attach the subscriber to the TCP transport.
// (almost identical to the publisher)
OpenDDS::DCPS::SubscriberImpl* sub_impl =
OpenDDS::DCPS::reference_to_servant<
OpenDDS::DCPS::SubscriberImpl>(sub.in ());
if (0 == sub_impl)
{
cerr << "Failed to obtain subscriber servant" << endl;
ACE_OS::exit(1);
}
OpenDDS::DCPS::AttachStatus status =
sub_impl->attach_transport(trans_impl.in());
if (status != OpenDDS::DCPS::ATTACH_OK)
{
cerr << "Failed to attach to the transport. "
<< "Status == "
<< status << endl;
ACE_OS::exit(1);
}
As in the publisher, we must register the IDL Quote and ExchangeEvent types with the domain participant to subscribe to topics on those types. // Register the Quote type
// (same code as publisher)
StockQuoter::QuoteTypeSupport_var quote_servant
= new StockQuoter::QuoteTypeSupportImpl();
if (DDS::RETCODE_OK !=
quote_servant->register_type(participant.in (),
QUOTER_QUOTE_TYPE))
{
cerr << "register_type for " << QUOTER_QUOTE_TYPE
<< " failed." << endl;
ACE_OS::exit(1);
}
// Register the ExchangeEvent type
// (same code as publisher)
StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant
= new StockQuoter::ExchangeEventTypeSupportImpl();
if (DDS::RETCODE_OK !=
exchange_evt_servant->register_type(
participant.in (),
QUOTER_EXCHANGE_EVENT_TYPE))
{
cerr << "register_type for "
<< QUOTER_EXCHANGE_EVENT_TYPE
<< " failed." << endl;
ACE_OS::exit(1);
}
As in the publisher, we create a topic for our stock quotes, indicating the topic name and the registered name of the Quote type and using the default Quality-of-Service settings. Again, the stock quote topic name must match on the publisher and the subscriber. // Get QoS to use for our two topics
// Could also use TOPIC_QOS_DEFAULT instead
// (same code as publisher)
DDS::TopicQos default_topic_qos;
participant->get_default_topic_qos(default_topic_qos);
// Create a topic for the Quote type...
// (same code as publisher)
DDS::Topic_var quote_topic =
participant->create_topic (QUOTER_QUOTE_TOPIC,
QUOTER_QUOTE_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (quote_topic.in ()))
{
cerr << "create_topic for "
<< QUOTER_QUOTE_TOPIC
<< " failed." << endl;
ACE_OS::exit(1);
}
Similarly, we create a topic for our ExchangeEvent samples, indicating the topic name and the registered name of the ExchangeEvent type, and using the default Quality-of-Service settings. Again, the stock exchange event topic name must match on the publisher and the subscriber. // .. and another topic for the Exchange Event type
// (same code as publisher)
DDS::Topic_var exchange_evt_topic =
participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,
QUOTER_EXCHANGE_EVENT_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (exchange_evt_topic.in ()))
{
cerr << "create_topic for "
<< QUOTER_EXCHANGE_EVENT_TOPIC
<< " failed."
<< endl;
ACE_OS::exit(1);
}
On the publisher, we created two data writers, one for each topic. On the subscriber, we'll create two data readers, one for each topic. Each data reader has exactly one subscriber and subscribes to one topic. We also attach a listener to each data reader to receive notification of published data samples. This is where the publisher and subscriber code diverges.
The following code creates a listener for the "Stock Quotes" topic. The listener is a local CORBA object, implementing the DDS::DataReaderListener IDL interface. We use OpenDDS's convenient servant_to_reference function template to obtain a reference of the interface type. // Create DataReaders and DataReaderListeners for the
// Quote and ExchangeEvent
// Create a Quote listener
QuoteDataReaderListenerImpl quote_listener_servant;
DDS::DataReaderListener_var quote_listener =
::OpenDDS::DCPS::servant_to_reference("e_listener_servant);
if (CORBA::is_nil (quote_listener.in ()))
{
cerr << "Quote listener is nil." << endl;
ACE_OS::exit(1);
}
We create a second listener for the "Stock Exchange Event" topic. // Create an ExchangeEvent listener
ExchangeEventDataReaderListenerImpl exchange_evt_listener_servant;
DDS::DataReaderListener_var exchange_evt_listener =
::OpenDDS::DCPS::servant_to_reference(&exchange_evt_listener_servant);
if (CORBA::is_nil (exchange_evt_listener.in ()))
{
cerr << "ExchangeEvent listener is nil." << endl;
ACE_OS::exit(1);
}
Finally, we create a data reader for each of the two topics. First, we create a data reader for the "Stock Quotes" topic, attaching the relevant listener we created above. // Create the Quote DataReader
// Get the default QoS
// Could also use DATAREADER_QOS_DEFAULT
DDS::DataReaderQos dr_default_qos;
sub->get_default_datareader_qos (dr_default_qos);
DDS::DataReader_var quote_dr =
sub->create_datareader(quote_topic.in (),
dr_default_qos,
quote_listener.in ());
Then, we create a data reader for the "Stock Exchange Events" topic, attaching the other listener we created above. // Create the ExchangeEvent DataReader
DDS::DataReader_var exchange_evt_dr =
sub->create_datareader(exchange_evt_topic.in (),
dr_default_qos,
exchange_evt_listener.in ());
OpenDDS spawns it own threads to handle incoming events from the publisher. Thus, there is no event loop code in the subscriber. However, we must be sure not to allow the main thread to exit before we're ready to shut down the entire subscriber process. So, we loop, processing stock quotes and stock exchange events until the TRADING_CLOSED event is received on the "Stock Exchange Events" topic. Essentially, we want to receive published data samples until the stock exchange tells us that it has closed. The sleep call causes us to check this once per second to avoid consuming too much of the CPU. // Wait for events from the Publisher; shut
// down when "close" received
cout << "Subscriber: waiting for events" << endl;
while ( ! exchange_evt_listener_servant.
is_exchange_closed_received() )
{
ACE_OS::sleep(1);
}
When we have received the TRADING_CLOSED event, we gracefully exit the loop. cout << "Received CLOSED event from publisher; "
<< " exiting..."
<< endl;
} catch (CORBA::Exception& e) {
cerr << "Exception caught in main.cpp:" << endl
<< e << endl;
ACE_OS::exit(1);
}
Finally, we clean up after ourselves before exiting. // Cleanup
try {
if (!CORBA::is_nil (participant.in ())) {
participant->delete_contained_entities();
}
if (!CORBA::is_nil (dpf.in ())) {
dpf->delete_participant(participant.in ());
}
} catch (CORBA::Exception& e) {
cerr << "Exception caught in cleanup."
<< endl
<< e << endl;
ACE_OS::exit(1);
}
TheTransportFactory->release();
TheServiceParticipant->shutdown ();
return 0;
}
Table of Contents
Subscriber's "Stock Quote" and "Stock Exchange Event" Listeners
The "Stock Quote" data reader and the "Stock Exchange Event" data reader each has a listener attached. These listeners are called by the DDS framework each time a data sample is received from a publisher. We have decided that each of the two data readers shall have its own listener, although we could use a single listener for both data readers if we code the listener to handle both data types.
Each listener implements the DDS::DataReaderListener IDL interface. We used both a QuoteDataReaderListenerImpl and an ExchangeEventDataReaderListenerImpl in the subscriber code above, but we have not yet defined those classes. We will do that now.
First, we write a listener for the Quote type's data reader. This listener class implements the DDS::DataReaderListener IDL interface, overriding seven pure virtual methods from the interface. It is a CORBA local object implementation of an IDL interface, inheriting from the generated class for the IDL interface.
The listener class must override all seven methods, including methods for which the listener's implementation is empty. However, for simplicity, we'll show only the on_data_available method, which is called when a new Quote data sample is available. The other six methods have empty implementations. We'll also use a default constructor and destructor. #include "QuoteTypeSupportC.h"
#include "QuoteTypeSupportImpl.h"
#include "dds/DCPS/Service_Participant.h"
#include "dds/DdsDcpsSubscriptionS.h"
#include "ace/streams.h"
class QuoteDataReaderListenerImpl
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
{
public:
// DDS calls on_data_available on the listener for each
// received Quote sample.
virtual void on_data_available(DDS::DataReader_ptr reader)
throw (CORBA::SystemException)
{
try
{
We first narrow the value of the data reader parameter to the appropriate type for a Quote sample. StockQuoter::QuoteDataReader_var quote_dr =
StockQuoter::QuoteDataReader::_narrow(reader);
if (CORBA::is_nil (quote_dr.in ()))
{
cerr << "QuoteDataReaderListenerImpl:: "
<< "on_data_available:"
<< " _narrow failed." << endl;
ACE_OS::exit(1);
}
Then, we take the next Quote sample from the data reader. Note the type safety of the QuoteDataReader interface. StockQuoter::Quote quote;
DDS::SampleInfo si;
DDS::ReturnCode_t status =
quote_dr->take_next_sample(quote, si) ;
Once we have received the Quote sample, we simply print out its contents. if (status == DDS::RETCODE_OK) {
cout << "Quote: ticker = " << quote.ticker.in()
<< endl
<< " exchange= " << quote.exchange.in()
<< endl
<< " full name = " << quote.full_name.in()
<< endl
<< " value = " << quote.value
<< endl
<< " timestamp = " << quote.timestamp
<< endl;
cout << "SampleInfo.sample_rank = "
<< si.sample_rank << endl;
}
else if (status == DDS::RETCODE_NO_DATA)
{
cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!"
<< endl;
}
else
{
cerr << "ERROR: read Quote: Error: "
<<status << endl;
}
The Quote sample's memory is cleaned up by the stack when it goes out of scope. } catch (CORBA::Exception& e) {
cerr << "Exception caught in read:"
<< endl << e << endl;
ACE_OS::exit(1);
}
}
We have not shown implementations of the other methods in the DDS::DataReaderListener interface, but we must override them as well, even if their implementations are empty. // must also override:
// on_requested_deadline_missed
// on_requested_incompatible_qos (
// on_liveliness_changed
// on_subscription_match
// on_sample_rejected
// on_sample_lost
};
Next, we write a listener for the ExchangeEvent type's data reader. The basic structure is the same as that of the QuoteDataReaderListenerImpl. #include "ExchangeEventDataReaderListenerImpl.h"
#include "ExchangeEventTypeSupportC.h"
#include "ExchangeEventTypeSupportImpl.h"
#include "dds/DCPS/Service_Participant.h"
#include "dds/DdsDcpsSubscriptionS.h"
#include "ace/streams.h"
#include "ace/Synch.h"
class ExchangeEventDataReaderListenerImpl
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
{
public:
We add the is_exchange_closed_received method to the data reader so the subscriber's main program can find out when the TRADING_CLOSED stock exchange event has been received. This method checks a boolean value under the protection of a mutex lock. The boolean value is set by the listener's on_data_available method when a TRADING_CLOSED stock exchange event is received. // app-specific
CORBA::Boolean is_exchange_closed_received()
{
ACE_Guard<ACE_Mutex> guard(this->lock_);
return this->is_exchange_closed_received_;
}
DDS calls on_data_available on the listener for each received ExchangeEvent sample. virtual void on_data_available(DDS::DataReader_ptr reader)
throw (CORBA::SystemException)
{
try
{
As in the QuoteDataReaderListenerImpl, we first narrow the value of the data reader parameter to the appropriate type, which in this case is an ExchangeEventDataReader. StockQuoter::ExchangeEventDataReader_var exchange_evt_dr =
StockQuoter::ExchangeEventDataReader::_narrow(reader);
if (CORBA::is_nil (exchange_evt_dr.in ())) {
cerr << "ExchangeEventDataReaderListenerImpl:: "
<< "on_data_available:"
<< " _narrow failed."
<< endl;
ACE_OS::exit(1);
}
Then, we take the next ExchangeEvent sample from the data reader. Note the type safety. StockQuoter::ExchangeEvent exchange_evt;
DDS::SampleInfo si;
DDS::ReturnCode_t status =
exchange_evt_dr->take_next_sample(exchange_evt, si) ;
Once we have received the ExchangeEvent sample, we simply print out its contents. if (status == DDS::RETCODE_OK) {
cout << "ExchangeEvent: exchange= "
<< exchange_evt.exchange.in() << endl;
switch ( exchange_evt.event ) {
case StockQuoter::TRADING_OPENED:
cout << "TRADING_OPENED" << endl;
break;
When we receive a TRADING_CLOSED event, we set a flag indicating that the stock exchange has been closed for the day. case StockQuoter::TRADING_CLOSED: {
cout << "TRADING_CLOSED" << endl;
ACE_Guard<ACE_Mutex> guard(this->lock_);
this->is_exchange_closed_received_ = 1;
break;
}
case StockQuoter::TRADING_SUSPENDED:
cout << "TRADING_SUSPENDED" << endl;
break;
case StockQuoter::TRADING_RESUMED:
cout << "TRADING_RESUMED" << endl;
break;
default:
cerr << "ERROR: reader received unknown "
<< "ExchangeEvent: "
<< exchange_evt.event
<< endl;
}
cout << "timestamp = "
<< exchange_evt.timestamp
<< endl;
cout << "SampleInfo.sample_rank = "
<< si.sample_rank
<< endl;
}
else if (status == DDS::RETCODE_NO_DATA)
{
cerr << "ERROR: reader received "
<< "DDS::RETCODE_NO_DATA!"
<< endl;
}
else
{
cerr << "ERROR: read ExchangeEvent: Error: "
<<status
<< endl;
}
The ExchangeEvent sample is cleaned up by the stack when it goes out of scope. } catch (CORBA::Exception& e) {
cerr << "Exception caught in read:" << endl
<< e << endl;
ACE_OS::exit(1);
}
}
// must also override:
// on_requested_deadline_missed
// on_requested_incompatible_qos (
// on_liveliness_changed
// on_subscription_match
// on_sample_rejected
// on_sample_lost
We have added two private class attributes to keep track of the TRADING_CLOSED event and protect that value with a lock. private:
CORBA::Boolean is_exchange_closed_received_;
ACE_Mutex lock_;
};
This completes the C++ code for the subscriber.
Table of Contents
Building the Publisher and Subscriber
We use MPC, the Make Project Creator, to generate build files for the publisher and subscriber. MPC provides a simple syntax and is capable of generating build files for GNU Make, Visual C++, and many other build systems. For more information on MPC, please see OCI's MPC page at http://www.ociweb.com/products/mpc and the MPC chapter of the TAO Developer's Guide at http://downloads.ociweb.com/MPC/MakeProjectCreator.pdf.
We create two files to build our Stock Quoter, a workspace file and a project file. Our workspace file simply tells MPC where to find the MPC dcps and dcpsexe base project files that we'll use later. //
// file StockQuoter.mwc
//
workspace {
cmdline += -relative DDS_ROOT=$DDS_ROOT
cmdline += -include $DDS_ROOT/MPC/config
}
Next, we create an MPC file containing three projects - a Common project containing IDL and TypeSupport files, a Publisher, an a Subscriber. Each of the three projects inherits from either the dcps or dcpsexe base project, which are located in $DDS_ROOT. First, we create a library called StockQuoterCommon to hold the code generated by the IDL and dcps_ts.pl compilers. //
// file StockQuoter.mpc
//
project(*Common) : dcps {
sharedname = StockQuoterCommon
libout = .
includes += $(TAO_ROOT)/orbsvcs
idlflags += -I$(TAO_ROOT)/orbsvcs
idlflags += -Wb,export_macro=StockQuoterCommon_Export
idlflags += -Wb,export_include=StockQuoterCommon_Export.h
dcps_ts_flags += --export=StockQuoterCommon_Export
dynamicflags = STOCKQUOTERCOMMON_BUILD_DLL
A dcps project has a new section, TypeSupport_Files. This section executes the dcps_ts.pl script to generate TypeSupport files from our DDS data types. Here, we indicate the IDL file that contains our DDS data types, and also indicate the TypeSupport files generated from it. Specifying the TypeSupport's .idl, .h, and .cpp dependencies like this forces the dcps_ts.pl compiler to be run before the IDL compiler and enables the build system's clean targets to work properly. TypeSupport_Files {
StockQuoter.idl >> QuoteTypeSupport.idl QuoteTypeSupportImpl.h QuoteTypeSupportImpl.cpp \
ExchangeEventTypeSupport.idl ExchangeEventTypeSupportImpl.h ExchangeEventTypeSupportImpl.cpp
}
Our IDL_Files section contains our original IDL files plus the TypeSupport IDL files generated by the previous section. IDL_Files {
ExchangeEventTypeSupport.idl
QuoteTypeSupport.idl
StockQuoter.idl
}
The Header_Files and Source_Files sections contain the dcps_ts.pl-generated TypeSupport implementation files. MPC automatically adds the generated IDL stubs and skeletons, so we don't need to add those manually. Header_Files {
ExchangeEventTypeSupportImpl.h
QuoteTypeSupportImpl.h
}
Source_Files {
ExchangeEventTypeSupportImpl.cpp
QuoteTypeSupportImpl.cpp
}
}
Our publisher uses the StockQuoterCommon library from above, and adds a publisher.cpp source file containing the publisher's main(). project(*Publisher) : dcpsexe, svc_utils {
after += *Common
exename = publisher
includes += $(TAO_ROOT)/orbsvcs
libs += StockQuoterCommon
dynamicflags = STOCKQUOTERCOMMON_HAS_DLL
TypeSupport_Files {
}
IDL_Files {
}
Header_Files {
}
Source_Files {
publisher.cpp
}
Documentation_Files {
README.txt
domain_ids
}
}
Our subscriber also uses the StockQuoterCommon library, adds a subscriber.cpp source file containing the subscriber's main(), and its two listeners. project(*Subscriber) : dcpsexe {
after += *Common
exename = subscriber
includes += $(TAO_ROOT)/orbsvcs
libs += StockQuoterCommon
dynamicflags = STOCKQUOTERCOMMON_HAS_DLL
TypeSupport_Files {
}
IDL_Files {
}
Header_Files {
ExchangeEventDataReaderListenerImpl.h
QuoteDataReaderListenerImpl.h
}
Source_Files {
ExchangeEventDataReaderListenerImpl.cpp
QuoteDataReaderListenerImpl.cpp
subscriber.cpp
}
Documentation_Files {
README.txt
domain_ids
}
}
We use this MPC file to generate build files for our build system. For example, to generate GNU Makefiles, we execute $ACE_ROOT/bin/mwc.pl -type gnuace StockQuoter.mwc
To generate Visual C++ 7.1 solution files, we execute perl %ACE_ROOT%/bin/mwc.pl -type vc71 StockQuoter.mwc
We then build the project.
Table of Contents
Configuring the Stock Quoter
OpenDDS 1.0 includes a file-based configuration mechanism. With it, an OpenDDS user may configure a publisher's or subscriber's transport, the location of the DCPSInfoRepo process, and many other settings. The syntax of the configuration file is similar to the syntax of a Windows INI file. It contains several sections, which in turn contain property-like entries. The basic syntax is as follows:
Attribute1=value1
Attribute2=value2
Attribute1=value1
Attribute2=value2
The complete set of configuration settings is described in the OpenDDS chapter of the TAO Developer's Guide.
Our TCP-based example uses one configuration file, dds_tcp_conf.ini, for both the publisher and the subscriber: #
# dds_tcp_conf.ini
#
# Debug level
DCPSDebugLevel=0
# IOR of DCPSInfoRepo process.
DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
# the two supported transports are SimpleTcp and SimpleUdp
transport_type=SimpleTcp
Please note that there are two sections, and . The section contains configuration values that apply to the entire process; in this configuration file, we specify a debug level and an object reference for the DCPSInfoRepo process. Here, our DCPSInfoRepo process is listening on the loopback (127.0.0.1) interface, which means we have configured it to only be available to DDS processes running on the same host. To make it available across the network, use an IP address or a network hostname instead of localhost.
The section contains configuration values for the transport with the id of "1". Recall that in both the publisher and subscriber C++ code, we defined const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
and configured the transport via // Initialize the transport; the TRANSPORT_IMPL_ID must match the
// value in the configuration file.
OpenDDS::DCPS::TransportImpl_rch trans_impl =
TheTransportFactory->create_transport_impl (
TRANSPORT_IMPL_ID,
OpenDDS::DCPS::AUTO_CONFIG);
The "1" in the transport configuration file matches the "1" defined in code as a transport id. Naturally, a publisher or subscriber process may contain more than one transport, each configured differently. Once configured, we can attach the transport to a DCPS publisher or subscriber entity as we do in the C++ code above.
Table of Contents
Running the Stock Quoter over a TCP Transport
To run the example, we must start a DCPSInfoRepo process, and start at least one publisher and one subscriber. The DCPSInfoRepo process needs to know the domain ids of the DCPS domains that intend to use it as a broker. Our domain is 1066, as we specified when we created our DomainParticipant object in both the publisher and subscriber. We'll store that domain id in a file. // in file "domain_ids"
1066
Then we pass that file to the DCPSInfoRepo process on the command line: $DDS_ROOT/bin/DCPSInfoRepo \
-ORBListenEndpoints iiop://localhost:12345 -d domain_ids
Our DCPSInfoRepo process listens on port 12345. That port matches the port that we specified in the DCPSInfoRepo object reference in the transport configuration file above. This DCPSInfoRepo process is listening on the loopback (127.0.0.1) interface, which means we have configured it to only be available to DDS processes running on the same host. Again, to make it available across the network, use an IP address or a network hostname instead of localhost.
We run two subscribers and one publisher: subscriber -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
subscriber -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
publisher -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
We use the -DCPSConfigFile command-line argument to indicate the name of the configuration file we created above. Note that each subscriber and publisher uses the same transport configuration file.
We also use the "-ORBSvcConf tcp.conf" command-line argument to use the ACE Service Configurator dynamically link the library containing the SimpleTcp transport into the process. The tcp.conf file has the following contents: dynamic DCPS_SimpleTcpLoader
Service_Object * SimpleTcp:_make_DCPS_SimpleTcpLoader()
"-type SimpleTcp"
The above command lines are used to run the DCPSInfoRepo, publisher and subscriber with built-in-topic on which is the default case. We can also run these processes with the built-in-topic off. The -NOBITS is used by DCPSInfoRepo to turn off built-in-topic and the "-DCPSBit 0" is used by other DDS applications. The command lines are as follows: $DDS_ROOT/bin/DCPSInfoRepo \
-NOBITS -ORBListenEndpoints iiop://localhost:12345 -d domain_ids
subscriber -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
subscriber -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
publisher -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
The publisher publishes 20 stock quotes for each the SPY and MDY ticker symbols, and each subscriber receives them. When the publisher is finished, it publishes a "TRADING_CLOSED" message, which causes the subscribers to exit.
Table of Contents
Running the Stock Quoter over a UDP Transport
We can use the same code base to run the example over a UDP transport by creating three new transport configuration files. For the UDP transport, we need a configuration file for the publisher and for each subscriber because each process needs its own UDP endpoint.
For the publisher, we create a pub_udp_conf.ini file that contains the following (changes in bold):
DCPSDebugLevel=0
DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
transport_type=SimpleUdp
local_address=localhost:4444
For the first subscriber, we create a sub_udp_conf.ini file that contains the following (changes in bold):
DCPSDebugLevel=0
DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
transport_type=SimpleUdp
local_address=localhost:4443
For the second subscriber, we create a sub2_udp_conf.ini file that contains the following (changes in bold):
DCPSDebugLevel=0
DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
transport_type=SimpleUdp
local_address=localhost:4442
We then start the DCPSInfoRepo process as before: $DDS_ROOT/bin/DCPSInfoRepo \
-ORBListenEndpoints iiop://localhost:12345 -d domain_ids
We start the two subscribers and the publisher, using our new transport configuration files: subscriber -ORBSvcConf udp.conf -ORBSvcConf tcp.conf \
-DCPSConfigFile sub_udp_conf.ini
subscriber -ORBSvcConf udp.conf -ORBSvcConf tcp.conf \
-DCPSConfigFile sub2_udp_conf.ini
publisher -ORBSvcConf udp.conf -ORBSvcConf tcp.conf \
-DCPSConfigFile pub_udp_conf.ini
The above command-lines are used when the built-in-topic is turned on. Note there are two *.conf files are provided. The tcp.conf is required if a DDS process runs with built-in-topic on since the built-in-topic uses the SimpleTcp transport in OpenDDS1.0. The udp.conf is used to dynamically link the library containing the SimpleUdp transport into the process. The udp.conf file has the following contents: dynamic OPENDDS_DCPS_SimpleUnreliableDgramLoader
Service_Object * SimpleUnreliableDgram:_make_OPENDDS_DCPS_SimpleUnreliableDgramLoader()
"-type SimpleUdp"
We can also run each process with the built-in-topic off. The command-lines are as follows: $DDS_ROOT/bin/DCPSInfoRepo -NOBITS \
-ORBListenEndpoints iiop://localhost:12345 -d domain_ids
subscriber -ORBSvcConf udp.conf -DCPSBit 0 \
-DCPSConfigFile sub_udp_conf.ini
subscriber -ORBSvcConf udp.conf -DCPSBit 0 \
-DCPSConfigFile sub2_udp_conf.ini
publisher -ORBSvcConf udp.conf -DCPSBit 0 \
-DCPSConfigFile pub_udp_conf.ini
As before, the publisher publishes 20 stock quotes for each the SPY and MDY ticker symbols, and each subscriber receives them. When the publisher is finished, it again publishes a "TRADING_CLOSED" message, which causes the subscribers to exit. The only difference is that we have substituted a UDP transport for a TCP transport; the change in transport required no code changes.
Table of Contents
Summary
The OMG Data Distribution Service (DDS) for Real-Time Systems is a specification for a high-performance, type-safe, publish-and-subscribe communication middleware. DDS addresses data-centric applications, i.e. those for which dissemination of application data is a significant requirement.
OpenDDS is an open source implementation of the OMG Data Distribution Service specification, providing users with an efficient publish-and-subscribe framework with the advantages of the open-source software development model.
Recent versions of OpenDDS, including 1.0, include a file-based configuration mechanism. Through a configuration file, an OpenDDS user may configure a publisher's or subscriber's transport(s), debugging output, memory allocation, the location of the DCPSInfoRepo broker process, and many other settings. We have shown in our example that an OpenDDS application's underlying transport can be swapped out without making any code changes.
References
Sample Code (http://www.opendds.org/Article-Intro-code.zip)
OMG Data Distribution Service (DDS) for Real-Time Systems (http://www.omg.org/docs/formal/07-05-02)
OMG "Introduction to DDS" Whitepaper (http://www.omg.org/news/whitepapers/Intro_To_DDS.pdf)
OpenDDS Home Page (http://www.opendds.org)
TAO Developer's Guide Home Page (http://www.theaceorb.com/product/index.html and http://www.theaceorb.com/purchase/index.html)
OpenDDS Chapter of the TAO Developer's Guide (http://download.ociweb.com/OpenDDS/OpenDDS-latest.pdf)
MPC Chapter of the TAO Developer's Guide (http://download.ociweb.com/TAO-1.4a/TAO1.4aMakeProjectCreator.pdf)
[ 本帖最后由 winston 于 2008-6-23 13:35 编辑 ]
页:
[1]