OmniEvents
ProxyPushConsumer.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPushConsumer.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003,2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPushConsumer.h"
25 #include "ConsumerAdmin.h"
26 #include "Orb.h"
27 #include "omniEventsLog.h"
28 #include "PersistNode.h"
29 
30 #include <assert.h>
31 
32 namespace OmniEvents {
33 
35  CosEventComm::PushSupplier_ptr pushSupplier)
36 {
37  // pushSupplier is permitted to be nil.
38  if(CORBA::is_nil(pushSupplier))
39  return;
40 
41  string oidstr =currentObjectId();
42  Connections_t::iterator pos =_connections.find(oidstr);
43 
44  if(pos!=_connections.end())
45  throw CosEventChannelAdmin::AlreadyConnected();
46 
47  Connection* newConnection =
48  new Connection(
49  _channelName.in(),
50  oidstr,
51  CosEventComm::PushSupplier::_duplicate(pushSupplier)
52  );
53  _connections.insert( Connections_t::value_type(oidstr,newConnection) );
54 
55  // Test to see whether pushSupplier is a ProxyPushSupplier.
56  // If so, then we will aggressively try to reconnect, when we are reincarnated
57  CORBA::Request_var req =pushSupplier->_request("_is_a");
58  req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59  req->set_return_type(CORBA::_tc_boolean);
60  req->send_deferred();
61  Orb::inst().deferredRequest(req._retn(),newConnection); // Register callback
62 
64  {
65  WriteLock log;
66  newConnection->output(log.os);
67  }
68 }
69 
70 
72 {
73 #ifdef HAVE_OMNIORB4
74  DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
75  string oidstr =currentObjectId();
76  Connections_t::iterator pos =_connections.find(oidstr);
77 
78  if(pos!=_connections.end())
79  {
80  CORBA::Request_var req =
81  pos->second->_target->_request("disconnect_push_supplier");
82  pos->second->_remove_ref();
83  _connections.erase(pos);
84  // The following line could result in a reentrant callback, if this call was
85  // not made through the POA => must erase the connection BEFORE this point.
86  req->send_deferred();
87  Orb::inst().deferredRequest(req._retn());
89  {
90  // Erase this connection from the log file.
91  WriteLock log;
92  log.os<<"-ecf/"<<_channelName.in();
93  log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
94  }
95  }
96 #else /* Silently ignore disconnects with omniORB3 */
97  DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
98 #endif
99 }
100 
101 
102 void ProxyPushConsumer_i::push(const CORBA::Any& event)
103 {
104 #ifdef OMNIEVENTS_REAL_TIME_PUSH
105  if(!_useLocalQueue)
106  {
107  _consumerAdmin.send(new CORBA::Any(event));
108  _useLocalQueue=true;
109  }
110  else
111 #endif
112  _queue.push_back(new CORBA::Any(event));
113 }
114 
115 
117  PortableServer::POA_ptr p,
118  list<CORBA::Any*>& q,
119  ConsumerAdmin_i& consumerAdmin
120 )
121 : Servant(PortableServer::POA::_nil()),
122  _connections(),
123  _channelName(p->the_name()),
124  _consumerAdmin(consumerAdmin),
125  _queue(q),
126  _useLocalQueue(false)
127 {
128  _consumerAdmin._add_ref();
129 
130  using namespace PortableServer;
131 
132  // POLICIES:
133  // Lifespan =PERSISTENT // we can persist
134  // Assignment =USER_ID // write our own oid
135  // Uniqueness =MULTIPLE_ID // only one servant
136  // ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation
137  // RequestProcessing =USE_DEFAULT_SERVANT // only one servant
138  // ServantRetention =NON_RETAIN // stateless POA
139  // Thread =SINGLE_THREAD_MODEL // keep it simple
140 
141  CORBA::PolicyList policies;
142  policies.length(7);
143  policies[0]=p->create_lifespan_policy(PERSISTENT);
144  policies[1]=p->create_id_assignment_policy(USER_ID);
145  policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
146  policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
147  policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
148  policies[5]=p->create_servant_retention_policy(NON_RETAIN);
149  policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
150 
151  try
152  {
153  // Create a POA for this proxy type in this channel.
154  string poaName =string(_channelName.in())+".ProxyPushConsumer";
155  POAManager_var parentManager =p->the_POAManager();
156  _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
157  }
158  catch(POA::AdapterAlreadyExists&) // create_POA
159  {
160  DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
161  "POA::AdapterAlreadyExists")
162  }
163  catch(POA::InvalidPolicy& ex) // create_POA
164  {
165  DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
166  "POA::InvalidPolicy: "<<ex.index)
167  }
168 
169  // Destroy the policy objects (Not strictly necessary in omniORB)
170  for(CORBA::ULong i=0; i<policies.length(); ++i)
171  policies[i]->destroy();
172 
173  // This object is the POA's default servant.
174  _poa->set_servant(this);
175 }
176 
177 
179 {
180  DB(20,"~ProxyPushConsumer_i()")
181  for(Connections_t::iterator i =_connections.begin();
182  i!=_connections.end();
183  ++i)
184  {
185  i->second->_remove_ref();
186  }
187  _connections.clear();
188 
189  _consumerAdmin._remove_ref();
190 }
191 
192 
193 CosEventChannelAdmin::ProxyPushConsumer_ptr
195 {
196  return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
197  _poa.in(),
198  CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
199  );
200 }
201 
202 
204 {
205  // Note. We are (probably) in the EventChannel's thread.
206  Connections_t::iterator curr,next=_connections.begin();
207  while(next!=_connections.end())
208  {
209  curr=next++;
210  CORBA::Request_var req =
211  curr->second->_target->_request("disconnect_push_supplier");
212  curr->second->_remove_ref();
213  _connections.erase(curr);
214  // The following line could result in a reentrant callback
215  // => must erase the connection BEFORE this point.
216  req->send_deferred();
217  Orb::inst().deferredRequest(req._retn());
218  }
219 }
220 
221 
223 {
224  // Reincarnate all connections from node's children.
225  for(map<string,PersistNode*>::const_iterator i=node._child.begin();
226  i!=node._child.end();
227  ++i)
228  {
229  const char* oidstr =i->first.c_str();
230  string ior( i->second->attrString("IOR") );
231  bool isProxy( i->second->attrLong("proxy") );
232  assert(_connections.find(oidstr)==_connections.end());
233  try
234  {
235  using namespace CosEventComm;
236  using namespace CosEventChannelAdmin;
237 
238  PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
239  _connections.insert(Connections_t::value_type(
240  oidstr,
241  new Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
242  ));
243  DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
244 
245  // If supp is a ProxyPushSupplier, then try to reconnect.
246  if(isProxy)
247  {
248  DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
249  // This will only work if the proxy is implemented in the same way as
250  // omniEvents, so connect_() automatically creates a proxy.
251  ProxyPushSupplier_var proxySupp =
252  string_to_<ProxyPushSupplier>(ior.c_str());
253  PortableServer::ObjectId_var objectId =
254  PortableServer::string_to_ObjectId(oidstr);
255  CORBA::Object_var obj =
256  _poa->create_reference_with_id(
257  objectId.in(),
258  CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
259  );
260  PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
261  proxySupp->connect_push_consumer(thisCons.in());
262  DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
263  }
264  }
265  catch(CORBA::BAD_PARAM&) {
266  // This will happen when IOR fails to narrow.
267  DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
268  }
269  catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer()
270  // The supplier doesn't need to be reconnected.
271  DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
272  }
273  catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer()
274  // Don't know what to make of this...
275  DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
276  }
277  catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding.
278  catch(CORBA::TRANSIENT& ) {} // object 'supp' not responding.
279  catch(CORBA::COMM_FAILURE& ) {} // object 'supp' not responding.
280  } // end loop for(i)
281 }
282 
283 
284 void ProxyPushConsumer_i::output(ostream& os) const
285 {
286  for(Connections_t::const_iterator i=_connections.begin();
287  i!=_connections.end();
288  ++i)
289  {
290  i->second->output(os);
291  }
292 }
293 
294 
296 {
297 #ifdef HAVE_OMNIORB4
298  try
299  {
300  using namespace PortableServer;
301  ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
302  CORBA::String_var oidStr =ObjectId_to_string(oid.in());
303  return string(oidStr.in());
304  }
305  catch(PortableServer::Current::NoContext&) // get_object_id()
306  {
307  DB(0,"No context!!")
308  }
309  catch(CORBA::BAD_PARAM&) // ObjectId_to_string()
310  {
311  // Should never get here in omniORB, because ObjectID is a char*.
312  assert(0);
313  }
314  return "ERROR";
315 #else
316  throw CORBA::NO_IMPLEMENT();
317 #endif
318 }
319 
320 
321 //
322 // ProxyPushConsumer_i::Connection
323 //
324 
325 #if OMNIEVENTS__DEBUG_SERVANT
326 int ProxyPushConsumer_i::Connection::_objectCount =0;
327 #endif
328 
330  const char* channelName,
331  const string& oidstr,
332  CosEventComm::PushSupplier_ptr pushSupplier,
333  bool isProxy
334 ):Callback(),
335  _channelName(channelName),
336  _oidstr(oidstr),
337  _target(pushSupplier),
338  _targetIsProxy(isProxy)
339 {
340 #if OMNIEVENTS__DEBUG_SERVANT
341  ++_objectCount;
342  DB(21,"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
343 #endif
344 }
345 
347 {
348 #if OMNIEVENTS__DEBUG_SERVANT
349  --_objectCount;
350  DB(20,"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
351 #else
352  DB(20,"ProxyPushConsumer_i::Connection::~Connection()")
353 #endif
354 }
355 
357 
358 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
359 {
360  bool save =_targetIsProxy;
361  if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
362  {
364  {
365  WriteLock log;
366  output(log.os);
367  DB(15,"ProxyPushConsumer is federated.");
368  }
369  }
370  else
371  {
372  DB(2,"ProxyPushConsumer got unexpected callback.");
373  _targetIsProxy=save; // Reset it just to be sure.
374  }
375 }
376 
378 {
379  os<<"ecf/"<<_channelName;
380  os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
381 
382  if(!CORBA::is_nil(_target.in()))
383  {
384  CORBA::String_var iorstr;
385  iorstr = Orb::inst()._orb->object_to_string(_target.in());
386  os<<" IOR="<<iorstr.in();
387  if(_targetIsProxy)
388  os<<" proxy=1";
389  }
390  os<<" ;;\n";
391 }
392 
393 
394 }; // end namespace OmniEvents
bool _useLocalQueue
Switch between RT/chunked modes.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
void output(ostream &os) const
Save this object&#39;s state to a stream.
Interface for classes that wish to receive callbacks from deferred requests.
Definition: Callback.h:43
void output(ostream &os) const
Save this object&#39;s state to a stream.
#define DB(l, x)
Definition: Orb.h:49
ProxyPushConsumer_i(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q, ConsumerAdmin_i &consumerAdmin)
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
map< string, PersistNode * > _child
Definition: PersistNode.h:71
bool _targetIsProxy
TRUE if _target is a ProxyPushSupplier.
CosEventComm::PushSupplier_var _target
static bool exists()
Library code may create Event Service objects without the need for persistency.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
PortableServer::POA_var _poa
Definition: Servant.h:131
void push(const CORBA::Any &event)
Accepts events from any supplier, not just those stored in _connections.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
void connect_push_supplier(CosEventComm::PushSupplier_ptr pushSupplier)
If pushSupplier is provided, then it is stored in _connections.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
static Orb & inst()
Definition: Orb.h:81
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access...
Base class for servants.
Definition: Servant.h:113
void disconnect_push_consumer()
We may not have a record of the supplier, so this method must accept calls from any supplier without ...
Default servant for ProxyPushConsumer objects.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
CORBA::ORB_var _orb
Definition: Orb.h:88