找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 14411|回复: 0

Introduction to OpenDDS

[复制链接]
发表于 2008-6-23 13:32:17 | 显示全部楼层 |阅读模式
from http://www.opendds.org/Article-Intro.html
  1. Don Busch, Principal Software Engineer and Partner
  2. Object Computing, Inc. (OCI)
  3. Introduction
  4. Distributed real-time applications are sometimes more data-centric than service-centric, meaning that the primary objective of the participants in the distributed system is the dissemination of application data rather than access to shared services. The set of suppliers and/or consumers of application data may not be known at design time, and may change through the execution lifetime of the application. Typically, the data-centric paradigm is most efficiently realized by a publish/subscribe communication model rather than a request/response model.
  5. The OMG Data Distribution Service (DDS) for Real-Time Systems addresses the performance requirements and hard real-time requirements of distributed data-centric applications. DDS increases the range of publish/subscribe options available to developers of distributed real-time systems. For convenience, the DDS interfaces are defined using OMG Interface Definition Language (IDL). However, most details are left to the implementation, most significantly how data transfer occurs between the publishers and subscribers. A DDS implementer decides on the underlying communication mechanism that moves data from a publisher to a subscriber -- via TCP, UDP, UDP multicast, shared memory, etc. An implementation of the DDS specification is not required to use CORBA or the IIOP protocol to transfer data from a publisher to a subscriber.
  6. OpenDDS is an open source, C++ implementation of the OMG Data Distribution Service specification. Recent versions of OpenDDS, including OpenDDS1.0, include a file-based configuration mechanism. Through a configuration file, an OpenDDS user may configure a publisher's or subscriber's transport(s), debugging output, memory allocation, the location of the DCPSInfoRepo broker process, and many other settings. The complete set of configuration settings is described in the OpenDDS chapter of the TAO Developer's Guide.
  7. In this article, we cover the following topics:
  8. The OpenDDS Implementation of OMG DDS
  9. DDS Architecture
  10. Stock Quoter Example
  11. IDL Types
  12. Publisher
  13. Subscriber
  14. Subscriber's Listeners
  15. Building the Publisher and Subscriber
  16. Configuring the Stock Quoter
  17. Running the Stock Quoter over a TCP Transport
  18. Running the Stock Quoter over a UDP Transport
  19. Summary
  20. References
  21. The OpenDDS Implementation of OMG DDS
  22. OpenDDS leverages a pluggable transport architecture, enabling data delivery through transport and marshaling implementations of the application developer's choosing. Conceptually, the architecture borrows from TAO's Pluggable Protocols Framework. OpenDDS currently supports TCP and UDP point-to-point transports as well as unreliable and reliable multicast, and uses a high-performance marshaling implementation.
  23. This pluggable transport architecture permits a DDS user to optimize a DDS installation based on the desired transport and the homogeneous or heterogeneous nature of the application's deployment. These choices can be made without affecting the application code itself.
  24. Marshaling code is generated by a specialized OpenDDS IDL compiler. A single, separate DCPS Information Repository (DCPSInfoRepo) process acts as a central clearinghouse, associating publishers and subscribers. Under the covers, OpenDDS uses CORBA to communicate with the DCPSInfoRepo process to associate publishers and subscribers. Data transfer between publishers and subscribers is direct between the publishing and subscribing processes. OpenDDS creates its own threads for the RB and for the non-CORBA I/O that takes place when sending or receiving DDS data.
  25. DDS Architecture
  26. The OMG Data Distribution Service specification separates DDS into two separate architectural layers. The lower layer is the Data-Centric Publish and Subscribe (DCPS) layer, containing type-safe interfaces to a publish/subscribe communication mechanism. The upper layer is the Data Local Reconstruction Layer (DLRL), which enables an application developer to construct a local object model on top of the DCPS layer, shielding the application from DCPS knowledge. Each layer has its own set of concepts and usage patterns, and thus the concepts and terminology of the two layers can be discussed separately.
  27. Data-Centric Publish and Subscribe - DCPS
  28. The DCPS layer is responsible for efficiently disseminating data from publishers to interested subscribers. It is implemented using the concepts of publisher and data writer on the sending side and subscriber and data reader on the receiving side. The DCPS layer consists of one or more data domains, each of which contains a set of participants (publishers and subscribers) that communicate via DDS. Each entity (i.e. publisher or subscriber) belongs to a domain. Each process has one domain participant for each data domain of which it is a member.
  29. Within any data domain, data is identified by a topic, which is a type-specific domain segment that allows publishers and subscribers to refer to data unambiguously. Within a domain, a topic associates a unique topic name, data type, and a set of Quality of Service (QoS) policies with the data itself. Each topic is associated with only one data type, although many different topics can publish the same data type. Behavior of publishers is determined by the QoS policies associated with the publisher, data writer, and topic elements for a particular data source. Likewise, behavior of subscribers is determined by the QoS policies associated with the subscriber, data reader, and topic elements for a particular data sink.
  30. For more information on DCPS terminology, please see the OpenDDS Chapter of the TAO Developer's Guide.
  31. The DDS specification defines a number of Quality of Service (QoS) policies that applications use to specify their reliability, resource usage, fault tolerance, and other requirements to the service. Participants specify the behavior that they require from the service; the service decides how to achieve these behaviors. These policies may be applied to the various DCPS entities (Topic, Data Writer, Data Reader, Publisher, Subscriber, and Domain Participant) although not all policies are valid for all types of entities.
  32. Subscribers and publishers collaborate to specify QoS policies through an offer-request paradigm. A publisher offers a set of QoS policies to all subscribers; a subscriber requests the set of QoS policies that it requires. The DDS implementation then attempts to match the requested policies with the offered policies. If the policies are consistent, then the publication and the subscription are matched.
  33. OpenDDS supports the following DCPS Quality-of-Service (QoS) Policies:
  34. QoS Policy
  35. Description
  36. Liveliness
  37. Controls liveliness checks to make sure expected entities in the system are still alive
  38. Reliability
  39. Determines whether the service is allowed to drop samples
  40. History
  41. Controls what happens to an instance whose value changes before it is communicated to all Subscribers
  42. Resource Limits
  43. Controls resources that the service can use to meet other QoS requirements
  44. See the Object Management Group's Introduction to DDS Whitepaper, Appendix A, for a more complete list of Quality-of-Service policies and more detailed Quality-of-Service definitions.
  45. Data-Local Reconstruction Layer - DLRL
  46. The Data-Local Reconstruction Layer (DLRL) is an object-oriented layer on top of DCPS. A DLRL object is a native-language (i.e. C++) object with one or more shared attributes. Each DLRL class is mapped to one or more DCPS topics; each shared attribute value is mapped to a field in a topic's data type, and its value is distributed across the application via DCPS. A DLRL participant communicates data to the rest of the application by modifying a DLRL object, resulting in the the publication of a data sample on the associated topic. A DLRL shared attribute may be a simple value or structure, a reference to another DLRL object, or a collection (list, map) of those. DLRL supports complex object graphs and complex relationships between DLRL objects.
  47. The developer is responsible for deciding how DCPS entities are mapped to DLRL objects. The model is specified in OMG Interface Definition Language (IDL) using IDL valuetypes. The mapping is conceptually similar to an object-relational database mapping, which maps an object model onto relational database tables. We think of each DCPS topic as analogous to a relational database table, and each sample as a row in that table. The DDS specification has a default mapping from DCPS to DLRL. Or, a developer can choose to specify his own custom mapping via an XML mapping file.
  48. OpenDDS 1.0 does not implement the DLRL.
  49. Table of Contents
  50. OpenDDS Stock Quoter Example
  51. Our example illustrates publication of and subscription to data samples though the DDS DCPS layer. The example contains two DCPS topics, both related to the stock market.
  52. A stock quote publisher publishes stock quote samples to interested subscribers; each quote contains the ticker symbol of a security, its value, and a time stamp. Quotes are published periodically throughout the trading day, as buy and sell transactions affect the underlying value of the security. In addition, a stock exchange event publisher publishes important events relating to a stock exchange, namely when the exchange opens, closes, and when trading is suspended or resumed.
  53. Our subscriber subscribes to both stock quotes and stock exchange events. The subscriber prints the ticker symbol and value of each quote it sees. When the subscriber receives an event indicating that the stock exchange has closed for the day, it gracefully shuts down. Thus, the receipt of a "closed" stock exchange event is the subscriber's signal to stop expecting stock quote samples.
  54. We will demonstrate how to use the same publisher and subscriber code to communicate via the TCP and UDP transports. The transport configuration is isolated in a set of configuration files, allowing us to switch transports without making any code changes.
  55. Table of Contents
  56. IDL Types
  57. First, we define our published DDS data types in IDL:     #include "orbsvcs/TimeBase.idl"
  58.     module StockQuoter
  59.     {
  60.     #pragma DCPS_DATA_TYPE "StockQuoter::Quote"
  61.     #pragma DCPS_DATA_KEY "StockQuoter::Quote ticker"
  62.       struct Quote {
  63.         string ticker;
  64.         string exchange;
  65.         string full_name;
  66.         double value;
  67.         TimeBase::TimeT timestamp;
  68.       };
  69.     #pragma DCPS_DATA_TYPE "StockQuoter::ExchangeEvent"
  70.     #pragma DCPS_DATA_KEY "StockQuoter::ExchangeEvent exchange"
  71.       enum ExchangeEventType { TRADING_OPENED,
  72.                                TRADING_CLOSED,
  73.                                TRADING_SUSPENDED,
  74.                                TRADING_RESUMED };
  75.       struct ExchangeEvent {
  76.         string exchange;
  77.         ExchangeEventType event;
  78.         TimeBase::TimeT timestamp;
  79.       };
  80.     };
  81. We publish two data types: a Quote type for each stock quote, and an ExchangeEvent type to indicate when the stock exchange is opened, closed, and when trading is suspended or resumed. The DCPS_DATA_TYPE pragma marks a type for use with DDS. The DCPS_DATA_KEY defined for each type is a unique identifier for each instance of the data type. Our Quote type's key is the ticker symbol of the stock. Throughout the day, we would expect to publish many values, or samples, for each ticker symbol. The set of published samples for each ticker symbol belongs to the same instance. In our example, we'll publish two ticker symbols, and thus two instances: SPY (S&P Depository Receipts, i.e. the S&P 500) and MDY (S&P Midcap Depository Receipts, i.e. the S&P Midcap 400).
  82. Next, we compile the IDL with OpenDDS's dcps_ts.pl script to generate type support code. The type support code consists of generated DCPS data writer and data reader C++ classes and additional IDL code. DDS uses type-safe interfaces for publication and subscription. Type-safe interfaces have several advantages: first, programming errors are more likely to be caught at compile time; second, generated marshaling code can be made very efficient when the marshaled data type is known at compile time; and third, we can avoid the use of inefficient types such as the CORBA any in data transfer.
  83. The command to generate type support code for the Stock Quoter's IDL types is as follows:    $DDS_ROOT/bin/dcps_ts.pl StockQuoter.idl
  84. This command generates the following files:     QuoteTypeSupport.idl
  85.     QuoteTypeSupportImpl.h
  86.     QuoteTypeSupportImpl.cpp
  87.     ExchangeEventTypeSupport.idl
  88.     ExchangeEventTypeSupportImpl.h
  89.     ExchangeEventTypeSupportImpl.cpp
  90. Note that dcps_ts.pl generates a set of type support files for each DDS type in the StockQuoter.idl file.
  91. However, we don't need to run the dcps_ts.pl script manually. Later, we'll use a Make Project Creator (MPC) project to automate the build steps for us.
  92. Next, we use TAO's IDL compiler to compile all three IDL files -- the StockQuoter.idl file we wrote manually, plus the two type support files generated by dcps_ts.pl.    tao_idl -Gdcps -I $DDS_ROOT -I$(TAO_ROOT)/orbsvcs StockQuoter.idl
  93.     tao_idl -Gdcps -I $DDS_ROOT -I$(TAO_ROOT)/orbsvcs StockQuoteTypeSupport.idl
  94.     tao_idl -Gdcps -I $DDS_ROOT -I$(TAO_ROOT)/orbsvcs ExchnageEventTypeSupport.idl
  95. The -Gdcps command-line argument causes the TAO IDL compiler to generate code supporting DCPS type definitions. Again, we'll automate this later with MPC.
  96. Table of Contents
  97. Publisher
  98. Next, we write a publisher to publish stock quotes and stock exchange events via DDS. First, we include the two type support header files generated by the dcps_ts.pl script.     #include "QuoteTypeSupportImpl.h"
  99.     #include "ExchangeEventTypeSupportImpl.h"
  100. We also include DCPS publisher, service participant, and QoS header files.     #include "dds/DCPS/Service_Participant.h"
  101.     #include "dds/DCPS/Marked_Default_Qos.h"
  102.     #include "dds/DCPS/PublisherImpl.h"
  103.     #include "dds/DCPS/transport/framework/TheTransportFactory.h"
  104.     #include "ace/streams.h"
  105.     #include "orbsvcs/Time_Utilities.h"
  106. We will configure the publisher's transport via a configuration file. However, we need an identifier to tie a transport entry in the configuration file to the transport configured in the code. This transport id value must match the integer identifier defined later in the transport configuration file. Each topic -- in fact, each publisher within a topic -- can, in theory, have its own transport.     // constant used by this publisher for transport; the
  107.     // TRANSPORT_IMPL_ID must match the value in the
  108.     // configuration file.
  109.     const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
  110. The following constants are used for our domain, type names and topic names. Each type is published on a separate topic. The subscriber must use the same values for its domain, type names and topic names.     // constants for Stock Quoter domain Id, types, and topic
  111.     DDS::DomainId_t QUOTER_DOMAIN_ID = 1066;
  112.     const char* QUOTER_QUOTE_TYPE = "Quote Type";
  113.     const char* QUOTER_QUOTE_TOPIC = "Stock Quotes";
  114.     const char* QUOTER_EXCHANGE_EVENT_TYPE =
  115.       "Exchange Event Type";
  116.     const char* QUOTER_EXCHANGE_EVENT_TOPIC =
  117.       "Stock Exchange Events";
  118. When a stock exchange event - i.e. opened, closed, suspended, or resumed -- is published, we also publish the name of the stock exchange to which the event applies.     const char* STOCK_EXCHANGE_NAME = "Test Stock Exchange";
  119. This is a simple helper method to get the current date and time.     TimeBase::TimeT get_timestamp()
  120.     {
  121.       TimeBase::TimeT retval;
  122.       ACE_hrtime_t t = ACE_OS::gethrtime ();
  123.       ORBSVCS_Time::hrtime_to_TimeT (retval, t );
  124.       return retval;
  125.     }
  126. The remainder of the publisher's source code file contains its main(). We enter the publisher's main()     int main (int argc, char *argv[])
  127.     {
  128.       DDS::DomainParticipantFactory_var dpf =
  129.         DDS::DomainParticipantFactory::_nil();
  130.       DDS::DomainParticipant_var participant =
  131.         DDS::DomainParticipant::_nil();
  132.       try
  133.       {
  134. First, we create a domain participant. A DDS publisher may publish on many independent domains, but our example only publishes on one domain. We use the TheDomainParticipantFactoryWithArgs macro to pass command-line arguments into DCPS and get the singleton domain participant factory. We create one domain participant, for the "Quote" domain, using the default Quality-of-Service policies for a domain participant. The value of QUOTER_DOMAIN_ID passed into the factory must be identical in the publisher and the subscriber.         // Initialize, and create a DomainParticipant
  135.         dpf = TheParticipantFactoryWithArgs(argc, argv);
  136.         participant = dpf->create_participant(
  137.           QUOTER_DOMAIN_ID,
  138.           PARTICIPANT_QOS_DEFAULT,
  139.           DDS::DomainParticipantListener::_nil());
  140.    
  141.         if (CORBA::is_nil (participant.in ()))
  142.         {
  143.           cerr << "create_participant failed." << endl;
  144.           ACE_OS::exit(1);
  145.         }
  146. Then, we create a publisher through the domain participant with default Quality-of-Service values. We can attach a PublisherListener that is called by DCPS when certain publication-related events happen. However, we don't care about those events, so we attach a nil listener.         // Create a publisher for the two topics
  147.         // (PUBLISHER_QOS_DEFAULT is defined in
  148.         // Marked_Default_Qos.h)
  149.         DDS::Publisher_var pub =
  150.           participant->create_publisher(
  151.             PUBLISHER_QOS_DEFAULT,
  152.             DDS::PublisherListener::_nil());
  153.    
  154.         if (CORBA::is_nil (pub.in ()))
  155.         {
  156.           cerr << "create_publisher failed." << endl;
  157.           ACE_OS::exit(1);
  158.         }
  159. Next, we create a transport implementation to attach to the publisher. We get the transport implementation from the singleton transport factory, called TheTransportFactory. The value of the TRANSPORT_IMPL_ID identifier must match the transport id value in our configuration file (more on that later). The OpenDDS::DCPS::AUTO_CONFIG argument indicates that we're using a configuration file to configure the transport implementation. Note that the code itself does not need to know any details about the transport implementation -- whether it uses TCP, UDP, what its endpoints are, etc.         // Initialize the transport; the TRANSPORT_IMPL_ID
  160.         // must match the value in the configuration file.
  161.         OpenDDS::DCPS::TransportImpl_rch trans_impl =
  162.           TheTransportFactory->create_transport_impl (
  163.             TRANSPORT_IMPL_ID,
  164.             OpenDDS::DCPS::AUTO_CONFIG);
  165. We get the publisher's implementation object and attach the transport to it. Each publisher may have a different transport implementation, or several publishers may share the same transport implementation. In our case, we have one publisher, and thus one transport implementation.         // Attach the publisher to the TCP transport.
  166.         OpenDDS::DCPS::PublisherImpl* pub_impl =
  167.           OpenDDS::DCPS::reference_to_servant<
  168.             OpenDDS::DCPS::PublisherImpl>(pub.in ());
  169.    
  170.         if (0 == pub_impl)
  171.         {
  172.           cerr << "Failed to obtain publisher servant" << endl;
  173.           ACE_OS::exit(1);
  174.         }
  175.    
  176.         OpenDDS::DCPS::AttachStatus status =
  177.           pub_impl->attach_transport(trans_impl.in());
  178.    
  179.         if (status != OpenDDS::DCPS::ATTACH_OK)
  180.         {
  181.           cerr << "Failed to attach to the transport. "
  182.                << "Status == "
  183.                << status << endl;
  184.           ACE_OS::exit(1);
  185.         }
  186. There are three steps involved in publishing through DCPS. First, we register each type for the published data samples. Our example publishes samples of two IDL types, Quote and ExchangeEvent. Second, we create one or more topics upon which we publish. Each topic can only be bound to one type; thus we create a topic for each of our two types. Third, we create a data writer for each topic, and publish samples through the data writer.
  187. We first register the IDL Quote type with the domain participant, passing an instance of the generated QuoteTypeSupportImpl class for the Quote type. The name that we use for the Quote type, which is stored in the constant value QUOTER_QUOTE_TYPE, must match the name used on the subscriber. When we create a topic, we specify this type name, enabling DCPS to later create the appropriate type of data writer for the topic.         // Register the Quote type
  188.         StockQuoter::QuoteTypeSupport_var quote_servant
  189.           = new StockQuoter::QuoteTypeSupportImpl();
  190.    
  191.         if (DDS::RETCODE_OK !=
  192.               quote_servant->register_type(participant.in (),
  193.                                            QUOTER_QUOTE_TYPE))
  194.                  {
  195.           cerr << "register_type for " << QUOTER_QUOTE_TYPE
  196.                << " failed." << endl;
  197.           ACE_OS::exit(1);
  198.         }
  199. We then register the IDL ExchangeEvent type with the domain participant in the same way, using the generated ExchangeEventTypeSupportImpl class. Our DCPS domain participant is able to publish on topics for Quote or ExchangeEvent types.         // Register the ExchangeEvent type
  200.         StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant
  201.           = new StockQuoter::ExchangeEventTypeSupportImpl();
  202.         if (DDS::RETCODE_OK !=
  203.               exchange_evt_servant->register_type(
  204.                 participant.in (),
  205.                 QUOTER_EXCHANGE_EVENT_TYPE))
  206.         {
  207.           cerr << "register_type for "
  208.                << QUOTER_EXCHANGE_EVENT_TYPE
  209.                << " failed." << endl;
  210.           ACE_OS::exit(1);
  211.         }
  212. We create a topic for our Quote samples, indicating the topic name and the registered name of the Quote data type and using the default Quality-of-Service settings. Again, the Quote topic and type names must match on the publisher and the subscriber.         // Get QoS to use for our two topics
  213.         // Could also use TOPIC_QOS_DEFAULT instead
  214.         DDS::TopicQos default_topic_qos;
  215.         participant->get_default_topic_qos(default_topic_qos);
  216.         // Create a topic for the Quote type...
  217.         DDS::Topic_var quote_topic =
  218.           participant->create_topic (QUOTER_QUOTE_TOPIC,
  219.                                      QUOTER_QUOTE_TYPE,
  220.                                      default_topic_qos,
  221.                                      DDS::TopicListener::_nil());
  222.         if (CORBA::is_nil (quote_topic.in ()))
  223.         {
  224.           cerr << "create_topic for "
  225.                << QUOTER_QUOTE_TOPIC
  226.                << " failed." << endl;
  227.           ACE_OS::exit(1);
  228.         }
  229. Similarly, we create a topic for our ExchangeEvent samples, indicating the topic name and the registered name of the ExchangeEvent type, and using the default Quality-of-Service settings. Again, the stock exchange event topic and type names must match on the publisher and on the subscriber.         // .. and another topic for the Exchange Event type
  230.         DDS::Topic_var exchange_evt_topic =
  231.           participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,
  232.                                      QUOTER_EXCHANGE_EVENT_TYPE,
  233.                                      default_topic_qos,
  234.                                      DDS::TopicListener::_nil());
  235.    
  236.         if (CORBA::is_nil (exchange_evt_topic.in ()))
  237.         {
  238.           cerr << "create_topic for "
  239.                << QUOTER_EXCHANGE_EVENT_TOPIC
  240.                << " failed."
  241.                << endl;
  242.           ACE_OS::exit(1);
  243.         }
  244. We create two data writers, one for each topic. We pass in the topic created above; the topic knows its type. Each data writer is associated to exactly one publisher and publishes on one topic. Later, our publisher publishes on each topic by writing data samples to each data writer. The following code creates a data writer for the "Stock Quotes" topic.         // Get QoS to use for our two DataWriters
  245.         // Could also use DATAWRITER_QOS_DEFAULT
  246.         DDS::DataWriterQos dw_default_qos;
  247.         pub->get_default_datawriter_qos (dw_default_qos);
  248.         // Create a DataWriter for the Quote topic
  249.         DDS::DataWriter_var quote_base_dw =
  250.           pub->create_datawriter(quote_topic.in (),
  251.                                  dw_default_qos,
  252.                                  DDS::DataWriterListener::_nil());
  253.    
  254.         if (CORBA::is_nil (quote_base_dw.in ()))
  255.         {
  256.           cerr << "create_datawriter for "
  257.                << QUOTER_QUOTE_TOPIC
  258.                << " failed." << endl;
  259.           ACE_OS::exit(1);
  260.         }
  261.         StockQuoter::QuoteDataWriter_var quote_dw
  262.           = StockQuoter::QuoteDataWriter::_narrow(quote_base_dw.in());
  263.         if (CORBA::is_nil (quote_dw.in ()))
  264.         {
  265.           cerr << "QuoteDataWriter could not be narrowed"
  266.                << endl;
  267.           ACE_OS::exit(1);
  268.         }
  269. We then create a data writer for the "Stock Exchange Event" topic. Again, we pass in the topic created above, and the topic knows its type.         // Create a DataWriter for the Exchange Event topic
  270.         DDS::DataWriter_var exchange_evt_base_dw =
  271.           pub->create_datawriter(exchange_evt_topic.in (),
  272.                                  dw_default_qos,
  273.                                  DDS::DataWriterListener::_nil());
  274.    
  275.         if (CORBA::is_nil (exchange_evt_base_dw.in ()))
  276.         {
  277.           cerr << "create_datawriter for "
  278.                << QUOTER_EXCHANGE_EVENT_TOPIC
  279.                << " failed." << endl;
  280.           ACE_OS::exit(1);
  281.         }
  282.         StockQuoter::ExchangeEventDataWriter_var exchange_evt_dw =
  283.           StockQuoter::ExchangeEventDataWriter::_narrow(
  284.             exchange_evt_base_dw.in());
  285.    
  286.         if (CORBA::is_nil (exchange_evt_dw.in ()))
  287.         {
  288.           cerr << "ExchangeEventDataWriter could not "
  289.                << "be narrowed"<< endl;
  290.           ACE_OS::exit(1);
  291.         }
  292. We may choose to register each data instance. Registering each data instance will slightly improve latency while writing samples of that instance.
  293. A publisher may publish many data samples on each data instance. A data instance is identified by a unique key. For the Quote type, we identified ticker as the key field in its IDL type definition. Each Quote data sample with the same key value is considered part of the same data instance. In other words, each Quote sample published on the ticker symbol "SPY" is part of the same instance.
  294. We have two Quote instances, for tickers symbols "SPY" (S&P Depository Receipts, i.e. the S&P 500) and "MDY" (S&P Midcap Depository Receipts, i.e. the S&P Midcap 400), and one ExchangeEvent instance, for the "Test Stock Exchange". We register each instance with the appropriate data writer. The registration method is actually called _cxx_register because register is a reserved word in C++.         // Register the Exchange Event and the two
  295.         // Quoted securities (SPY and MDY) with the
  296.         // appropriate data writer
  297.         StockQuoter::Quote spy;
  298.         spy.ticker = CORBA::string_dup("SPY");
  299.         DDS::InstanceHandle_t spy_handle =
  300.           quote_dw->_cxx_register(spy);
  301.         StockQuoter::Quote mdy;
  302.         mdy.ticker = CORBA::string_dup("MDY");
  303.         DDS::InstanceHandle_t mdy_handle =
  304.           quote_dw->_cxx_register(mdy);
  305.         StockQuoter::ExchangeEvent ex_evt;
  306.         ex_evt.exchange = STOCK_EXCHANGE_NAME;
  307.         DDS::InstanceHandle_t exchange_handle =
  308.           exchange_evt_dw->_cxx_register(ex_evt);
  309. Finally, we publish. First, we publish a TRADING_OPENED event on the "Stock Exchange Event" topic.         // Publish...
  310.    
  311.         StockQuoter::ExchangeEvent opened;
  312.         opened.exchange = STOCK_EXCHANGE_NAME;
  313.         opened.event = StockQuoter::TRADING_OPENED;
  314.         opened.timestamp = get_timestamp();
  315.         cout << "Publishing TRADING_OPENED" << endl;
  316.         DDS::ReturnCode_t ret =
  317.           exchange_evt_dw->write(opened, exchange_handle);
  318.         if (ret != DDS::RETCODE_OK)
  319.         {
  320.           ACE_ERROR ((
  321.             LM_ERROR,
  322.             ACE_TEXT("(%P|%t)ERROR: OPEN write returned %d.\n"),
  323.             ret));
  324.         }
  325. Then, we publish several stock quote data samples for the "SPY" and "MDY" instances on the "Stock Quote" topic. We simply loop, increasing the quoted value for each ticker symbol a bit each time to simulate active trading on a really good day.         ACE_Time_Value quarterSecond( 0, 250000 );
  326.    
  327.         for ( int i = 0; i < 20; ++i )
  328.         {
  329.           //
  330.           // SPY
  331.           //
  332.           StockQuoter::Quote spy_quote;
  333.           spy_quote.exchange = STOCK_EXCHANGE_NAME;
  334.           spy_quote.ticker = CORBA::string_dup("SPY");
  335.           spy_quote.full_name =
  336.             CORBA::string_dup("S&P Depository Receipts");
  337.           spy_quote.value = 1200.0 + 10.0*i;
  338.           spy_quote.timestamp = get_timestamp();
  339.           cout << "Publishing SPY Quote: "
  340.                << spy_quote.value << endl;
  341.           ret = quote_dw->write(spy_quote, spy_handle);
  342.           if (ret != DDS::RETCODE_OK)
  343.           {
  344.             ACE_ERROR ((
  345.               LM_ERROR,
  346.               ACE_TEXT("(%P|%t)ERROR: SPY write returned %d.\n"),
  347.               ret));
  348.           }
  349.           ACE_OS::sleep( quarterSecond );
  350.           //
  351.           // MDY
  352.           //
  353.           StockQuoter::Quote mdy_quote;
  354.           mdy_quote.exchange = STOCK_EXCHANGE_NAME;
  355.           mdy_quote.ticker = CORBA::string_dup("MDY");
  356.           mdy_quote.full_name =
  357.             CORBA::string_dup("S&P Midcap Depository Receipts");
  358.           mdy_quote.value = 1400.0 + 10.0*i;
  359.           mdy_quote.timestamp = get_timestamp();
  360.           cout << "Publishing MDY Quote: "
  361.                << mdy_quote.value << endl;
  362.           ret = quote_dw->write(mdy_quote, mdy_handle);
  363.           if (ret != DDS::RETCODE_OK)
  364.           {
  365.             ACE_ERROR ((
  366.               LM_ERROR,
  367.               ACE_TEXT("(%P|%t)ERROR: MDY write returned %d.\n"),
  368.               ret));
  369.           }
  370.           ACE_OS::sleep( quarterSecond );
  371.         }
  372. Last, we publish a TRADING_CLOSED event on the "Stock Exchange Event" topic to indicate that the stock exchange is closed for the day.         StockQuoter::ExchangeEvent closed;
  373.         closed.exchange = STOCK_EXCHANGE_NAME;
  374.         closed.event = StockQuoter::TRADING_CLOSED;
  375.         closed.timestamp = get_timestamp();
  376.         cout << "Publishing TRADING_CLOSED" << endl;
  377.         ret = exchange_evt_dw->write(closed, exchange_handle);
  378.         if (ret != DDS::RETCODE_OK)
  379.         {
  380.           ACE_ERROR ((
  381.             LM_ERROR,
  382.             ACE_TEXT("(%P|%t)ERROR: CLOSED write returned %d.\n"),
  383.             ret));
  384.         }
  385.         cout << "Exiting..." << endl;
  386.       } catch (CORBA::Exception& e) {
  387.         cerr << "Exception caught in main.cpp:" << endl
  388.              << e << endl;
  389.         ACE_OS::exit(1);
  390.       }
  391. Finally, we clean up after ourselves before exiting.       // Cleanup
  392.       try {
  393.         if (!CORBA::is_nil (participant.in ())) {
  394.           participant->delete_contained_entities();
  395.         }
  396.         if (!CORBA::is_nil (dpf.in ())) {
  397.           dpf->delete_participant(participant.in ());
  398.         }
  399.       } catch (CORBA::Exception& e) {
  400.         cerr << "Exception caught in cleanup."
  401.              << endl
  402.              << e << endl;
  403.         ACE_OS::exit(1);
  404.       }
  405.       TheTransportFactory->release();
  406.       TheServiceParticipant->shutdown ();
  407.       return 0;
  408.     }
  409. This completes the C++ code for the publisher.
  410. Table of Contents
  411. Subscriber
  412. Our subscriber subscribes to the stock quotes and stock exchange events, receiving data samples from the publisher. We use the publisher's TRADING_CLOSED event to indicate that trading has finished for the day, triggering the subscriber's graceful shutdown.
  413. Much of the code in the subscriber is similar to that in the publisher. We get a domain participant, register types, etc. in the same way we did in the publisher. The main difference is that the subscriber is passive, waiting to receive samples, while the publisher is active. The subscriber uses listener objects to receive samples from the publisher.
  414. First, we include the two type support header files generated by the dcps_ts.pl script. We also included these files in the publisher. However, we also include two listener header files, one for each published type. A listener is called by DDS when a data sample is published on the associated topic.     #include "QuoteTypeSupportImpl.h"
  415.     #include "QuoteDataReaderListenerImpl.h"
  416.     #include "ExchangeEventTypeSupportImpl.h"
  417.     #include "ExchangeEventDataReaderListenerImpl.h"
  418. We also include DCPS subscriber, service participant, and QoS header files.     #include "dds/DCPS/Service_Participant.h"
  419.     #include "dds/DCPS/Marked_Default_Qos.h"
  420.     #include "dds/DCPS/SubscriberImpl.h"
  421.     #include "dds/DCPS/transport/framework/TheTransportFactory.h"
  422.     #include "dds/DCPS/BuiltinTopicUtils.h"
  423.     #include "ace/streams.h"
  424.     #include "orbsvcs/Time_Utilities.h"
  425. As in the publisher, we will configure the subscriber's transport via a configuration file. Again, we need an identifier to tie a transport entry in the configuration file to the transport configured in the code. It is important that this number matches the value in the subscriber's configuration file; it doesn't matter if it matches the same number in the publisher. We'll use it to create the transport implementation that we attach to our subscriber.     // constants used by this publisher for transport; the
  426.     // TRANSPORT_IMPL_ID must match the value in the
  427.     // configuration file.
  428.     const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
  429. The following constants are used for our domain, type names and topic names. These must match the domain, type names, and topic names used by the publisher.     // constants for Stock Quoter domain Id, types, and topic
  430.     // (same as publisher)
  431.     DDS::DomainId_t QUOTER_DOMAIN_ID = 1066;
  432.     const char* QUOTER_QUOTE_TYPE = "Quote Type";
  433.     const char* QUOTER_QUOTE_TOPIC = "Stock Quotes";
  434.     const char* QUOTER_EXCHANGE_EVENT_TYPE =
  435.       "Exchange Event Type";
  436.     const char* QUOTER_EXCHANGE_EVENT_TOPIC =
  437.       "Stock Exchange Events";
  438. The remainder of the subscriber's source code file contains its main(). We enter the subscriber's main().     int main (int argc, char *argv[])
  439.     {
  440.       DDS::DomainParticipantFactory_var dpf =
  441.         DDS::DomainParticipantFactory::_nil();
  442.       DDS::DomainParticipant_var participant =
  443.         DDS::DomainParticipant::_nil();
  444.       try {
  445. We create a domain participant, just as we did in the publisher. The specified domain matches the publisher's domain.         // Initialize, and create a DomainParticipant
  446.         // (same code as publisher)
  447.         dpf = TheParticipantFactoryWithArgs(argc, argv);
  448.         participant = dpf->create_participant(
  449.           QUOTER_DOMAIN_ID,
  450.           PARTICIPANT_QOS_DEFAULT,
  451.           DDS::DomainParticipantListener::_nil());
  452.         if (CORBA::is_nil (participant.in ()))
  453.         {
  454.           cerr << "create_participant failed." << endl;
  455.           ACE_OS::exit(1);
  456.         }
  457. Then, we create a subscriber through the domain participant with default Quality-of-Service policy values. We can attach a SubscriberListener that is called by DCPS when certain subscription-related events happen. However, we don't care about those events, so we attach a nil. This is almost the same as what we did in the publisher when we called create_publisher.         // Create a subscriber for the two topics
  458.         // (SUBSCRIBER_QOS_DEFAULT is defined
  459.         // in Marked_Default_Qos.h)
  460.         DDS::Subscriber_var sub =
  461.           participant->create_subscriber(
  462.             SUBSCRIBER_QOS_DEFAULT,
  463.             DDS::SubscriberListener::_nil());
  464.         if (CORBA::is_nil (sub.in ()))
  465.         {
  466.           cerr << "create_subscriber failed." << endl;
  467.           ACE_OS::exit(1);
  468.         }
  469. As in the publisher, we create a transport implementation. We get the transport implementation from the singleton transport factory, called TheTransportFactory. Again, the value of the TRANSPORT_IMPL_ID identifier must match the transport id value in our configuration file (more on that later). The OpenDDS::DCPS::AUTO_CONFIG argument indicates that we're using a configuration file to configure the transport implementation.         // Initialize the transport; the TRANSPORT_IMPL_ID
  470.         // must match the value in the configuration file.
  471.         OpenDDS::DCPS::TransportImpl_rch trans_impl =
  472.           TheTransportFactory->create_transport_impl (
  473.             TRANSPORT_IMPL_ID,
  474.             OpenDDS::DCPS::AUTO_CONFIG);
  475. We get the subscriber's implementation object and attach the transport to it. Each subscriber may have a different transport, or several subscribers may share the same transport. In our case, we have one subscriber, and thus one transport implementation.         // Attach the subscriber to the TCP transport.
  476.         // (almost identical to the publisher)
  477.         OpenDDS::DCPS::SubscriberImpl* sub_impl =
  478.           OpenDDS::DCPS::reference_to_servant<
  479.             OpenDDS::DCPS::SubscriberImpl>(sub.in ());
  480.         if (0 == sub_impl)
  481.         {
  482.           cerr << "Failed to obtain subscriber servant" << endl;
  483.           ACE_OS::exit(1);
  484.         }
  485.    
  486.         OpenDDS::DCPS::AttachStatus status =
  487.           sub_impl->attach_transport(trans_impl.in());
  488.         if (status != OpenDDS::DCPS::ATTACH_OK)
  489.         {
  490.           cerr << "Failed to attach to the transport. "
  491.                << "Status == "
  492.                << status << endl;
  493.           ACE_OS::exit(1);
  494.         }
  495. As in the publisher, we must register the IDL Quote and ExchangeEvent types with the domain participant to subscribe to topics on those types.         // Register the Quote type
  496.         // (same code as publisher)
  497.         StockQuoter::QuoteTypeSupport_var quote_servant
  498.           = new StockQuoter::QuoteTypeSupportImpl();
  499.         if (DDS::RETCODE_OK !=
  500.               quote_servant->register_type(participant.in (),
  501.                                            QUOTER_QUOTE_TYPE))
  502.         {
  503.           cerr << "register_type for " << QUOTER_QUOTE_TYPE
  504.                << " failed." << endl;
  505.           ACE_OS::exit(1);
  506.         }
  507.         // Register the ExchangeEvent type
  508.         // (same code as publisher)
  509.         StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant  
  510.           = new StockQuoter::ExchangeEventTypeSupportImpl();
  511.         if (DDS::RETCODE_OK !=
  512.               exchange_evt_servant->register_type(
  513.                 participant.in (),
  514.                 QUOTER_EXCHANGE_EVENT_TYPE))
  515.         {
  516.           cerr << "register_type for "
  517.                << QUOTER_EXCHANGE_EVENT_TYPE
  518.                << " failed." << endl;
  519.           ACE_OS::exit(1);
  520.         }
  521. As in the publisher, we create a topic for our stock quotes, indicating the topic name and the registered name of the Quote type and using the default Quality-of-Service settings. Again, the stock quote topic name must match on the publisher and the subscriber.         // Get QoS to use for our two topics
  522.         // Could also use TOPIC_QOS_DEFAULT instead
  523.         // (same code as publisher)
  524.         DDS::TopicQos default_topic_qos;
  525.         participant->get_default_topic_qos(default_topic_qos);
  526.         // Create a topic for the Quote type...
  527.         // (same code as publisher)
  528.         DDS::Topic_var quote_topic =
  529.           participant->create_topic (QUOTER_QUOTE_TOPIC,
  530.                                      QUOTER_QUOTE_TYPE,
  531.                                      default_topic_qos,
  532.                                      DDS::TopicListener::_nil());
  533.    
  534.         if (CORBA::is_nil (quote_topic.in ()))
  535.         {
  536.           cerr << "create_topic for "
  537.                << QUOTER_QUOTE_TOPIC
  538.                << " failed." << endl;
  539.           ACE_OS::exit(1);
  540.         }
  541. Similarly, we create a topic for our ExchangeEvent samples, indicating the topic name and the registered name of the ExchangeEvent type, and using the default Quality-of-Service settings. Again, the stock exchange event topic name must match on the publisher and the subscriber.         // .. and another topic for the Exchange Event type
  542.         // (same code as publisher)
  543.         DDS::Topic_var exchange_evt_topic =
  544.           participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,
  545.                                      QUOTER_EXCHANGE_EVENT_TYPE,
  546.                                      default_topic_qos,
  547.                                      DDS::TopicListener::_nil());
  548.         if (CORBA::is_nil (exchange_evt_topic.in ()))
  549.         {
  550.           cerr << "create_topic for "
  551.                << QUOTER_EXCHANGE_EVENT_TOPIC
  552.                << " failed."
  553.                << endl;
  554.           ACE_OS::exit(1);
  555.         }
  556. On the publisher, we created two data writers, one for each topic. On the subscriber, we'll create two data readers, one for each topic. Each data reader has exactly one subscriber and subscribes to one topic. We also attach a listener to each data reader to receive notification of published data samples. This is where the publisher and subscriber code diverges.
  557. The following code creates a listener for the "Stock Quotes" topic. The listener is a local CORBA object, implementing the DDS::DataReaderListener IDL interface. We use OpenDDS's convenient servant_to_reference function template to obtain a reference of the interface type.         // Create DataReaders and DataReaderListeners for the
  558.         // Quote and ExchangeEvent
  559.         // Create a Quote listener
  560.         QuoteDataReaderListenerImpl quote_listener_servant;   
  561.         DDS::DataReaderListener_var quote_listener =
  562.           ::OpenDDS::DCPS::servant_to_reference("e_listener_servant);
  563.         if (CORBA::is_nil (quote_listener.in ()))
  564.         {
  565.           cerr << "Quote listener is nil." << endl;
  566.           ACE_OS::exit(1);
  567.         }
  568. We create a second listener for the "Stock Exchange Event" topic.         // Create an ExchangeEvent listener
  569.         ExchangeEventDataReaderListenerImpl exchange_evt_listener_servant;
  570.    
  571.         DDS::DataReaderListener_var exchange_evt_listener =
  572.           ::OpenDDS::DCPS::servant_to_reference(&exchange_evt_listener_servant);
  573.         if (CORBA::is_nil (exchange_evt_listener.in ()))
  574.         {
  575.           cerr << "ExchangeEvent listener is nil." << endl;
  576.           ACE_OS::exit(1);
  577.         }
  578. Finally, we create a data reader for each of the two topics. First, we create a data reader for the "Stock Quotes" topic, attaching the relevant listener we created above.         // Create the Quote DataReader
  579.    
  580.         // Get the default QoS
  581.         // Could also use DATAREADER_QOS_DEFAULT
  582.         DDS::DataReaderQos dr_default_qos;
  583.         sub->get_default_datareader_qos (dr_default_qos);
  584.    
  585.         DDS::DataReader_var quote_dr =
  586.           sub->create_datareader(quote_topic.in (),
  587.                                  dr_default_qos,
  588.                                  quote_listener.in ());
  589. Then, we create a data reader for the "Stock Exchange Events" topic, attaching the other listener we created above.         // Create the ExchangeEvent DataReader
  590.    
  591.         DDS::DataReader_var exchange_evt_dr =
  592.           sub->create_datareader(exchange_evt_topic.in (),
  593.                                  dr_default_qos,
  594.                                  exchange_evt_listener.in ());
  595. OpenDDS spawns it own threads to handle incoming events from the publisher. Thus, there is no event loop code in the subscriber. However, we must be sure not to allow the main thread to exit before we're ready to shut down the entire subscriber process. So, we loop, processing stock quotes and stock exchange events until the TRADING_CLOSED event is received on the "Stock Exchange Events" topic. Essentially, we want to receive published data samples until the stock exchange tells us that it has closed. The sleep call causes us to check this once per second to avoid consuming too much of the CPU.         // Wait for events from the Publisher; shut
  596.         // down when "close" received
  597.         cout << "Subscriber: waiting for events" << endl;
  598.         while ( ! exchange_evt_listener_servant.
  599.                     is_exchange_closed_received() )
  600.         {
  601.           ACE_OS::sleep(1);
  602.         }
  603. When we have received the TRADING_CLOSED event, we gracefully exit the loop.         cout << "Received CLOSED event from publisher; "
  604.              << " exiting..."
  605.              << endl;
  606.    
  607.       } catch (CORBA::Exception& e) {
  608.         cerr << "Exception caught in main.cpp:" << endl
  609.              << e << endl;
  610.         ACE_OS::exit(1);
  611.       }
  612. Finally, we clean up after ourselves before exiting.       // Cleanup
  613.       try {
  614.         if (!CORBA::is_nil (participant.in ())) {
  615.           participant->delete_contained_entities();
  616.         }
  617.         if (!CORBA::is_nil (dpf.in ())) {
  618.           dpf->delete_participant(participant.in ());
  619.         }
  620.       } catch (CORBA::Exception& e) {
  621.         cerr << "Exception caught in cleanup."
  622.              << endl
  623.              << e << endl;
  624.         ACE_OS::exit(1);
  625.       }
  626.       TheTransportFactory->release();
  627.       TheServiceParticipant->shutdown ();
  628.       return 0;
  629.     }
  630. Table of Contents
  631. Subscriber's "Stock Quote" and "Stock Exchange Event" Listeners
  632. The "Stock Quote" data reader and the "Stock Exchange Event" data reader each has a listener attached. These listeners are called by the DDS framework each time a data sample is received from a publisher. We have decided that each of the two data readers shall have its own listener, although we could use a single listener for both data readers if we code the listener to handle both data types.
  633. Each listener implements the DDS::DataReaderListener IDL interface. We used both a QuoteDataReaderListenerImpl and an ExchangeEventDataReaderListenerImpl in the subscriber code above, but we have not yet defined those classes. We will do that now.
  634. First, we write a listener for the Quote type's data reader. This listener class implements the DDS::DataReaderListener IDL interface, overriding seven pure virtual methods from the interface. It is a CORBA local object implementation of an IDL interface, inheriting from the generated class for the IDL interface.
  635. The listener class must override all seven methods, including methods for which the listener's implementation is empty. However, for simplicity, we'll show only the on_data_available method, which is called when a new Quote data sample is available. The other six methods have empty implementations. We'll also use a default constructor and destructor.     #include "QuoteTypeSupportC.h"
  636.     #include "QuoteTypeSupportImpl.h"
  637.     #include "dds/DCPS/Service_Participant.h"
  638.     #include "dds/DdsDcpsSubscriptionS.h"
  639.     #include "ace/streams.h"
  640.     class QuoteDataReaderListenerImpl
  641.       : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
  642.     {
  643.     public:
  644.       // DDS calls on_data_available on the listener for each
  645.       // received Quote sample.
  646.       virtual void on_data_available(DDS::DataReader_ptr reader)
  647.         throw (CORBA::SystemException)
  648.       {
  649.         try
  650.         {
  651. We first narrow the value of the data reader parameter to the appropriate type for a Quote sample.           StockQuoter::QuoteDataReader_var quote_dr =
  652.             StockQuoter::QuoteDataReader::_narrow(reader);
  653.    
  654.           if (CORBA::is_nil (quote_dr.in ()))
  655.           {
  656.             cerr << "QuoteDataReaderListenerImpl:: "
  657.                  << "on_data_available:"
  658.                  << " _narrow failed." << endl;
  659.             ACE_OS::exit(1);
  660.           }
  661. Then, we take the next Quote sample from the data reader. Note the type safety of the QuoteDataReader interface.           StockQuoter::Quote quote;
  662.           DDS::SampleInfo si;
  663.           DDS::ReturnCode_t status =
  664.             quote_dr->take_next_sample(quote, si) ;
  665. Once we have received the Quote sample, we simply print out its contents.           if (status == DDS::RETCODE_OK) {
  666.             cout << "Quote: ticker    = " << quote.ticker.in()   
  667.                  << endl
  668.                  << "       exchange  = " << quote.exchange.in()  
  669.                  << endl
  670.                  << "       full name = " << quote.full_name.in()
  671.                  << endl
  672.                  << "       value     = " << quote.value         
  673.                  << endl
  674.                  << "       timestamp = " << quote.timestamp      
  675.                  << endl;
  676.      
  677.             cout << "SampleInfo.sample_rank = "
  678.                  << si.sample_rank << endl;
  679.           }
  680.           else if (status == DDS::RETCODE_NO_DATA)
  681.           {
  682.             cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!"
  683.                  << endl;
  684.           }
  685.           else
  686.           {
  687.             cerr << "ERROR: read Quote: Error: "
  688.                  <<  status << endl;
  689.           }
  690. The Quote sample's memory is cleaned up by the stack when it goes out of scope.         } catch (CORBA::Exception& e) {
  691.           cerr << "Exception caught in read:"
  692.                << endl << e << endl;
  693.           ACE_OS::exit(1);
  694.         }
  695.       }
  696. We have not shown implementations of the other methods in the DDS::DataReaderListener interface, but we must override them as well, even if their implementations are empty.       // must also override:
  697.       //    on_requested_deadline_missed
  698.       //    on_requested_incompatible_qos (
  699.       //    on_liveliness_changed
  700.       //    on_subscription_match
  701.       //    on_sample_rejected
  702.       //    on_sample_lost
  703.     };
  704. Next, we write a listener for the ExchangeEvent type's data reader. The basic structure is the same as that of the QuoteDataReaderListenerImpl.     #include "ExchangeEventDataReaderListenerImpl.h"
  705.     #include "ExchangeEventTypeSupportC.h"
  706.     #include "ExchangeEventTypeSupportImpl.h"
  707.     #include "dds/DCPS/Service_Participant.h"
  708.     #include "dds/DdsDcpsSubscriptionS.h"
  709.     #include "ace/streams.h"
  710.     #include "ace/Synch.h"
  711.     class ExchangeEventDataReaderListenerImpl
  712.       : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
  713.     {
  714.     public:
  715. We add the is_exchange_closed_received method to the data reader so the subscriber's main program can find out when the TRADING_CLOSED stock exchange event has been received. This method checks a boolean value under the protection of a mutex lock. The boolean value is set by the listener's on_data_available method when a TRADING_CLOSED stock exchange event is received.       // app-specific
  716.       CORBA::Boolean is_exchange_closed_received()
  717.       {
  718.         ACE_Guard<ACE_Mutex> guard(this->lock_);
  719.         return this->is_exchange_closed_received_;
  720.       }
  721. DDS calls on_data_available on the listener for each received ExchangeEvent sample.       virtual void on_data_available(DDS::DataReader_ptr reader)
  722.         throw (CORBA::SystemException)
  723.       {
  724.         try
  725.         {
  726. As in the QuoteDataReaderListenerImpl, we first narrow the value of the data reader parameter to the appropriate type, which in this case is an ExchangeEventDataReader.           StockQuoter::ExchangeEventDataReader_var exchange_evt_dr =
  727.             StockQuoter::ExchangeEventDataReader::_narrow(reader);
  728.      
  729.           if (CORBA::is_nil (exchange_evt_dr.in ())) {
  730.             cerr << "ExchangeEventDataReaderListenerImpl:: "
  731.                  << "on_data_available:"
  732.                  << " _narrow failed."
  733.                  << endl;
  734.             ACE_OS::exit(1);
  735.           }
  736. Then, we take the next ExchangeEvent sample from the data reader. Note the type safety.           StockQuoter::ExchangeEvent exchange_evt;
  737.           DDS::SampleInfo si;
  738.           DDS::ReturnCode_t status =
  739.             exchange_evt_dr->take_next_sample(exchange_evt, si) ;
  740. Once we have received the ExchangeEvent sample, we simply print out its contents.           if (status == DDS::RETCODE_OK) {
  741.             cout << "ExchangeEvent: exchange  = "
  742.                  << exchange_evt.exchange.in() << endl;
  743.    
  744.             switch ( exchange_evt.event ) {
  745.               case StockQuoter::TRADING_OPENED:
  746.                 cout << "TRADING_OPENED" << endl;
  747.                 break;
  748. When we receive a TRADING_CLOSED event, we set a flag indicating that the stock exchange has been closed for the day.               case StockQuoter::TRADING_CLOSED: {
  749.                 cout << "TRADING_CLOSED" << endl;
  750.                 ACE_Guard<ACE_Mutex> guard(this->lock_);
  751.                 this->is_exchange_closed_received_ = 1;
  752.                 break;
  753.               }
  754.               case StockQuoter::TRADING_SUSPENDED:
  755.                 cout << "TRADING_SUSPENDED" << endl;
  756.                 break;
  757.               case StockQuoter::TRADING_RESUMED:
  758.                 cout << "TRADING_RESUMED" << endl;
  759.                 break;
  760.               default:
  761.                 cerr << "ERROR: reader received unknown "
  762.                      << "ExchangeEvent: "
  763.                      << exchange_evt.event
  764.                      << endl;
  765.             }
  766.             cout << "timestamp = "
  767.                  << exchange_evt.timestamp
  768.                  << endl;
  769.             cout << "SampleInfo.sample_rank = "
  770.                  << si.sample_rank
  771.                  << endl;
  772.           }
  773.           else if (status == DDS::RETCODE_NO_DATA)
  774.           {
  775.             cerr << "ERROR: reader received "
  776.                  << "DDS::RETCODE_NO_DATA!"
  777.                  << endl;
  778.           }
  779.           else
  780.           {
  781.             cerr << "ERROR: read ExchangeEvent: Error: "
  782.                  <<  status
  783.                  << endl;
  784.           }
  785. The ExchangeEvent sample is cleaned up by the stack when it goes out of scope.     } catch (CORBA::Exception& e) {
  786.       cerr << "Exception caught in read:" << endl
  787.            << e << endl;
  788.       ACE_OS::exit(1);   
  789.     }
  790.   }
  791.   // must also override:
  792.   //    on_requested_deadline_missed
  793.   //    on_requested_incompatible_qos (
  794.   //    on_liveliness_changed
  795.   //    on_subscription_match
  796.   //    on_sample_rejected
  797.   //    on_sample_lost
  798. We have added two private class attributes to keep track of the TRADING_CLOSED event and protect that value with a lock.     private:
  799.       CORBA::Boolean is_exchange_closed_received_;
  800.       ACE_Mutex lock_;
  801.     };
  802. This completes the C++ code for the subscriber.
  803. Table of Contents
  804. Building the Publisher and Subscriber
  805. We use MPC, the Make Project Creator, to generate build files for the publisher and subscriber. MPC provides a simple syntax and is capable of generating build files for GNU Make, Visual C++, and many other build systems. For more information on MPC, please see OCI's MPC page at http://www.ociweb.com/products/mpc and the MPC chapter of the TAO Developer's Guide at http://downloads.ociweb.com/MPC/MakeProjectCreator.pdf.
  806. We create two files to build our Stock Quoter, a workspace file and a project file. Our workspace file simply tells MPC where to find the MPC dcps and dcpsexe base project files that we'll use later.     //
  807.     // file StockQuoter.mwc
  808.     //
  809.    
  810.     workspace {
  811.       cmdline += -relative DDS_ROOT=$DDS_ROOT
  812.       cmdline += -include $DDS_ROOT/MPC/config
  813.     }
  814. Next, we create an MPC file containing three projects - a Common project containing IDL and TypeSupport files, a Publisher, an a Subscriber. Each of the three projects inherits from either the dcps or dcpsexe base project, which are located in $DDS_ROOT. First, we create a library called StockQuoterCommon to hold the code generated by the IDL and dcps_ts.pl compilers.     //
  815.     // file StockQuoter.mpc
  816.     //
  817.    
  818.     project(*Common) : dcps {
  819.       sharedname = StockQuoterCommon
  820.       libout = .
  821.       includes += $(TAO_ROOT)/orbsvcs
  822.       idlflags += -I$(TAO_ROOT)/orbsvcs
  823.       idlflags += -Wb,export_macro=StockQuoterCommon_Export
  824.       idlflags += -Wb,export_include=StockQuoterCommon_Export.h
  825.       dcps_ts_flags += --export=StockQuoterCommon_Export
  826.       dynamicflags = STOCKQUOTERCOMMON_BUILD_DLL
  827. A dcps project has a new section, TypeSupport_Files. This section executes the dcps_ts.pl script to generate TypeSupport files from our DDS data types. Here, we indicate the IDL file that contains our DDS data types, and also indicate the TypeSupport files generated from it. Specifying the TypeSupport's .idl, .h, and .cpp dependencies like this forces the dcps_ts.pl compiler to be run before the IDL compiler and enables the build system's clean targets to work properly.       TypeSupport_Files {
  828.         StockQuoter.idl >> QuoteTypeSupport.idl QuoteTypeSupportImpl.h QuoteTypeSupportImpl.cpp \
  829.                            ExchangeEventTypeSupport.idl ExchangeEventTypeSupportImpl.h ExchangeEventTypeSupportImpl.cpp
  830.       }
  831. Our IDL_Files section contains our original IDL files plus the TypeSupport IDL files generated by the previous section.       IDL_Files {
  832.         ExchangeEventTypeSupport.idl
  833.         QuoteTypeSupport.idl
  834.         StockQuoter.idl
  835.       }
  836. The Header_Files and Source_Files sections contain the dcps_ts.pl-generated TypeSupport implementation files. MPC automatically adds the generated IDL stubs and skeletons, so we don't need to add those manually.       Header_Files {
  837.         ExchangeEventTypeSupportImpl.h
  838.         QuoteTypeSupportImpl.h
  839.       }
  840.    
  841.       Source_Files {
  842.         ExchangeEventTypeSupportImpl.cpp
  843.         QuoteTypeSupportImpl.cpp
  844.       }
  845.     }
  846. Our publisher uses the StockQuoterCommon library from above, and adds a publisher.cpp source file containing the publisher's main().     project(*Publisher) : dcpsexe, svc_utils {
  847.       after += *Common
  848.       exename   = publisher
  849.       includes += $(TAO_ROOT)/orbsvcs
  850.       libs += StockQuoterCommon
  851.       dynamicflags = STOCKQUOTERCOMMON_HAS_DLL
  852.       TypeSupport_Files {
  853.       }
  854.    
  855.       IDL_Files {
  856.       }
  857.    
  858.       Header_Files {
  859.       }
  860.    
  861.       Source_Files {
  862.         publisher.cpp
  863.       }
  864.    
  865.       Documentation_Files {
  866.         README.txt
  867.         domain_ids
  868.       }
  869.     }
  870. Our subscriber also uses the StockQuoterCommon library, adds a subscriber.cpp source file containing the subscriber's main(), and its two listeners.     project(*Subscriber) : dcpsexe {
  871.       after += *Common
  872.       exename   = subscriber
  873.    
  874.       includes += $(TAO_ROOT)/orbsvcs
  875.       libs += StockQuoterCommon
  876.       dynamicflags = STOCKQUOTERCOMMON_HAS_DLL
  877.    
  878.       TypeSupport_Files {
  879.       }
  880.    
  881.       IDL_Files {
  882.       }
  883.    
  884.       Header_Files {
  885.         ExchangeEventDataReaderListenerImpl.h
  886.         QuoteDataReaderListenerImpl.h
  887.       }
  888.    
  889.       Source_Files {
  890.         ExchangeEventDataReaderListenerImpl.cpp
  891.         QuoteDataReaderListenerImpl.cpp
  892.         subscriber.cpp
  893.       }
  894.    
  895.       Documentation_Files {
  896.         README.txt
  897.         domain_ids
  898.       }
  899.     }
  900. We use this MPC file to generate build files for our build system. For example, to generate GNU Makefiles, we execute     $ACE_ROOT/bin/mwc.pl -type gnuace StockQuoter.mwc
  901. To generate Visual C++ 7.1 solution files, we execute     perl %ACE_ROOT%/bin/mwc.pl -type vc71 StockQuoter.mwc
  902. We then build the project.
  903. Table of Contents
  904. Configuring the Stock Quoter
  905. OpenDDS 1.0 includes a file-based configuration mechanism. With it, an OpenDDS user may configure a publisher's or subscriber's transport, the location of the DCPSInfoRepo process, and many other settings. The syntax of the configuration file is similar to the syntax of a Windows INI file. It contains several sections, which in turn contain property-like entries. The basic syntax is as follows:     [section1-name]
  906.     Attribute1=value1
  907.     Attribute2=value2
  908.     [section2-name]
  909.     Attribute1=value1
  910.     Attribute2=value2
  911. The complete set of configuration settings is described in the OpenDDS chapter of the TAO Developer's Guide.
  912. Our TCP-based example uses one configuration file, dds_tcp_conf.ini, for both the publisher and the subscriber:     #
  913.     # dds_tcp_conf.ini
  914.     #
  915.     [common]
  916.     # Debug level
  917.     DCPSDebugLevel=0
  918.     # IOR of DCPSInfoRepo process.
  919.     DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
  920.     [transport_impl_1]
  921.     # the two supported transports are SimpleTcp and SimpleUdp
  922.     transport_type=SimpleTcp
  923. Please note that there are two sections, [common] and [transport_impl_1]. The [common] section contains configuration values that apply to the entire process; in this configuration file, we specify a debug level and an object reference for the DCPSInfoRepo process. Here, our DCPSInfoRepo process is listening on the loopback (127.0.0.1) interface, which means we have configured it to only be available to DDS processes running on the same host. To make it available across the network, use an IP address or a network hostname instead of localhost.
  924. The [transport_impl_1] section contains configuration values for the transport with the id of "1". Recall that in both the publisher and subscriber C++ code, we defined     const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
  925. and configured the transport via     // Initialize the transport; the TRANSPORT_IMPL_ID must match the
  926.     // value in the configuration file.
  927.     OpenDDS::DCPS::TransportImpl_rch trans_impl =
  928.       TheTransportFactory->create_transport_impl (
  929.         TRANSPORT_IMPL_ID,
  930.         OpenDDS::DCPS::AUTO_CONFIG);
  931. The "1" in the transport configuration file matches the "1" defined in code as a transport id. Naturally, a publisher or subscriber process may contain more than one transport, each configured differently. Once configured, we can attach the transport to a DCPS publisher or subscriber entity as we do in the C++ code above.
  932. Table of Contents
  933. Running the Stock Quoter over a TCP Transport
  934. To run the example, we must start a DCPSInfoRepo process, and start at least one publisher and one subscriber. The DCPSInfoRepo process needs to know the domain ids of the DCPS domains that intend to use it as a broker. Our domain is 1066, as we specified when we created our DomainParticipant object in both the publisher and subscriber. We'll store that domain id in a file.     // in file "domain_ids"
  935.     1066
  936. Then we pass that file to the DCPSInfoRepo process on the command line:     $DDS_ROOT/bin/DCPSInfoRepo \
  937.       -ORBListenEndpoints iiop://localhost:12345 -d domain_ids
  938. Our DCPSInfoRepo process listens on port 12345. That port matches the port that we specified in the DCPSInfoRepo object reference in the transport configuration file above. This DCPSInfoRepo process is listening on the loopback (127.0.0.1) interface, which means we have configured it to only be available to DDS processes running on the same host. Again, to make it available across the network, use an IP address or a network hostname instead of localhost.
  939. We run two subscribers and one publisher:     subscriber -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
  940.     subscriber -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
  941.     publisher -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
  942. We use the -DCPSConfigFile command-line argument to indicate the name of the configuration file we created above. Note that each subscriber and publisher uses the same transport configuration file.
  943. We also use the "-ORBSvcConf tcp.conf" command-line argument to use the ACE Service Configurator dynamically link the library containing the SimpleTcp transport into the process. The tcp.conf file has the following contents:     dynamic DCPS_SimpleTcpLoader
  944.       Service_Object * SimpleTcp:_make_DCPS_SimpleTcpLoader()
  945.       "-type SimpleTcp"
  946. The above command lines are used to run the DCPSInfoRepo, publisher and subscriber with built-in-topic on which is the default case. We can also run these processes with the built-in-topic off. The -NOBITS is used by DCPSInfoRepo to turn off built-in-topic and the "-DCPSBit 0" is used by other DDS applications. The command lines are as follows:     $DDS_ROOT/bin/DCPSInfoRepo \
  947.       -NOBITS -ORBListenEndpoints iiop://localhost:12345 -d domain_ids
  948.     subscriber -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
  949.     subscriber -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
  950.     publisher -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini -ORBSvcConf tcp.conf
  951. The publisher publishes 20 stock quotes for each the SPY and MDY ticker symbols, and each subscriber receives them. When the publisher is finished, it publishes a "TRADING_CLOSED" message, which causes the subscribers to exit.
  952. Table of Contents
  953. Running the Stock Quoter over a UDP Transport
  954. We can use the same code base to run the example over a UDP transport by creating three new transport configuration files. For the UDP transport, we need a configuration file for the publisher and for each subscriber because each process needs its own UDP endpoint.
  955. For the publisher, we create a pub_udp_conf.ini file that contains the following (changes in bold):     [common]
  956.     DCPSDebugLevel=0
  957.     DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
  958.     [transport_impl_1]
  959.     transport_type=SimpleUdp
  960.     local_address=localhost:4444
  961. For the first subscriber, we create a sub_udp_conf.ini file that contains the following (changes in bold):     [common]
  962.     DCPSDebugLevel=0
  963.     DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
  964.     [transport_impl_1]
  965.     transport_type=SimpleUdp
  966.     local_address=localhost:4443
  967. For the second subscriber, we create a sub2_udp_conf.ini file that contains the following (changes in bold):     [common]
  968.     DCPSDebugLevel=0
  969.     DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
  970.     [transport_impl_1]
  971.     transport_type=SimpleUdp
  972.     local_address=localhost:4442
  973. We then start the DCPSInfoRepo process as before:     $DDS_ROOT/bin/DCPSInfoRepo \
  974.       -ORBListenEndpoints iiop://localhost:12345 -d domain_ids
  975. We start the two subscribers and the publisher, using our new transport configuration files:     subscriber -ORBSvcConf udp.conf -ORBSvcConf tcp.conf \
  976.                -DCPSConfigFile sub_udp_conf.ini
  977.    
  978.     subscriber -ORBSvcConf udp.conf -ORBSvcConf tcp.conf \
  979.                -DCPSConfigFile sub2_udp_conf.ini
  980.    
  981.     publisher -ORBSvcConf udp.conf -ORBSvcConf tcp.conf \
  982.               -DCPSConfigFile pub_udp_conf.ini
  983. The above command-lines are used when the built-in-topic is turned on. Note there are two *.conf files are provided. The tcp.conf is required if a DDS process runs with built-in-topic on since the built-in-topic uses the SimpleTcp transport in OpenDDS1.0. The udp.conf is used to dynamically link the library containing the SimpleUdp transport into the process. The udp.conf file has the following contents:     dynamic OPENDDS_DCPS_SimpleUnreliableDgramLoader
  984.       Service_Object * SimpleUnreliableDgram:_make_OPENDDS_DCPS_SimpleUnreliableDgramLoader()
  985.       "-type SimpleUdp"
  986. We can also run each process with the built-in-topic off. The command-lines are as follows:     $DDS_ROOT/bin/DCPSInfoRepo -NOBITS \
  987.       -ORBListenEndpoints iiop://localhost:12345 -d domain_ids
  988.     subscriber -ORBSvcConf udp.conf -DCPSBit 0 \
  989.                -DCPSConfigFile sub_udp_conf.ini
  990.    
  991.     subscriber -ORBSvcConf udp.conf -DCPSBit 0 \
  992.                -DCPSConfigFile sub2_udp_conf.ini
  993.    
  994.     publisher -ORBSvcConf udp.conf -DCPSBit 0 \
  995.               -DCPSConfigFile pub_udp_conf.ini
  996. As before, the publisher publishes 20 stock quotes for each the SPY and MDY ticker symbols, and each subscriber receives them. When the publisher is finished, it again publishes a "TRADING_CLOSED" message, which causes the subscribers to exit. The only difference is that we have substituted a UDP transport for a TCP transport; the change in transport required no code changes.
  997. Table of Contents
  998. Summary
  999. The OMG Data Distribution Service (DDS) for Real-Time Systems is a specification for a high-performance, type-safe, publish-and-subscribe communication middleware. DDS addresses data-centric applications, i.e. those for which dissemination of application data is a significant requirement.
  1000. OpenDDS is an open source implementation of the OMG Data Distribution Service specification, providing users with an efficient publish-and-subscribe framework with the advantages of the open-source software development model.
  1001. Recent versions of OpenDDS, including 1.0, include a file-based configuration mechanism. Through a configuration file, an OpenDDS user may configure a publisher's or subscriber's transport(s), debugging output, memory allocation, the location of the DCPSInfoRepo broker process, and many other settings. We have shown in our example that an OpenDDS application's underlying transport can be swapped out without making any code changes.
  1002. References
  1003. Sample Code (http://www.opendds.org/Article-Intro-code.zip)
  1004. OMG Data Distribution Service (DDS) for Real-Time Systems (http://www.omg.org/docs/formal/07-05-02)
  1005. OMG "Introduction to DDS" Whitepaper (http://www.omg.org/news/whitepapers/Intro_To_DDS.pdf)
  1006. OpenDDS Home Page (http://www.opendds.org)
  1007. TAO Developer's Guide Home Page (http://www.theaceorb.com/product/index.html and http://www.theaceorb.com/purchase/index.html)
  1008. OpenDDS Chapter of the TAO Developer's Guide (http://download.ociweb.com/OpenDDS/OpenDDS-latest.pdf)
  1009. MPC Chapter of the TAO Developer's Guide (http://download.ociweb.com/TAO-1.4a/TAO1.4aMakeProjectCreator.pdf)
复制代码

[ 本帖最后由 winston 于 2008-6-23 13:35 编辑 ]
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-12-22 16:24 , Processed in 0.024982 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表