Designing Observers in C++11

Designing Observers in C++11

By Alan Griffiths

Overload, 22(124):4-5, December 2014


The observer pattern is over two decades old. Alan Griffiths fits a venerable design pattern into a contemporary context.

Two decades ago the ‘Gang of Four’ popularised a pattern form and described twenty one patterns (and one anti-pattern). One of these is the Observer Pattern and the subject of this article.

I’ve been working on an open source project for the past couple of years and like many projects we found use for Observers . In order for you to appreciate the problems we encountered I first need to explain a little about the project. (I will be brief.)

The project is a library that allows the code that uses it to handle graphics and input devices in a manner that is portable across the Linux device drivers found on desktop and android devices. One area in which we used Observer was to monitor changes to the ‘surfaces’ representing things that appear on the screen(s). These need to be monitored by other components such as the ones that composite these surfaces onto the screens and the one that routes input.

There are a number of threads running in the application as this makes it easy to partition work between applications updating their surfaces, input events and interacting with display devices such as monitors. As I will describe, this leads to a need to address some synchronization issues.

It is hopefully evident that the ‘consumer’ of observations (e.g. one representing a monitor) generally outlive the surfaces being observed. Mostly the consumer is part of the application infrastructure and not part of the dynamic state of applications being launched, opening and closing windows and exiting.

For this case it is simplest to create an ‘observer’ object (that calls notification methods on the consumer) and pass its ownership to the ‘subject’. The consumer can then forget about everything except handling the notifications.

To summarise the differences from the classical Observer Pattern context :

  1. We’ve split the Observer role into Consumer and Observer
  2. We’re not using garbage collection and so need to explicitly address the lifetime of the objects
  3. We have multiple active threads

The subject maintains a collection of listeners and the naïve approach to synchronization is for the collection to be locked when being updated and when sending notifications. (In fact that is how we first implemented it.)

This works well until we hit one of two cases:

  1. The consumer takes some action that generates a new notification
  2. The consumer is destroyed (e.g. a monitor is unplugged) and needs to prevent further notifications to a dead object.

In case 1, if we have hold exclusive lock during notifications then we’ll get a deadlock.

In case 2, we expect the consumer to remove the observer from the subject’s collection after which notifications cease. And to ensure ‘after’ needs an ordering on removal and notifications we need something akin to the lock that causes deadlock in case 1.

One failed solution that we tried is to copy the collection before propagating a notification and release the lock before calling each of the observers. That doesn’t work with the above solution to case 2: after releasing the lock nothing prevents listeners being ‘removed’ on another thread before being notified through the copy.

Another solution we rejected was for the subject’s collection to be formed of weak_ptr<> s and the consumer to manage a collection of shared_ptr<> s to the observers it creates. Having an additional collection to manage in the consumer didn’t lead to any simplification. This might be different in other contexts but many of our consumers only need to register an observer and handle a few events without tracking either the subject or the observer.

Another (working) solution we tried was to hold a recursive lock during notifications and updates to the collection. That allowed other notifications to take place on the same thread and changes to the collection. Because the collection could change we took a copy of the collection and traversed that (to avoid iterators invalidating), but before invoking them also would check that objects exist in the ‘true’ collection. The disadvantage of this approach is that copying the collection is a lot of work to send each notification for collections that very rarely changed (your application might be updating the ‘surface’ at 60fps but adding and removing monitors might happen once a month).

You might think that we could avoid this trouble by sharing ownership of the consumer with the observer (so that the consumer could not ‘die’ while the observer exists). After all, that is exactly what would happen ‘automatically’ in a garbage collected language. The trouble is that it becomes extremely difficult to ensure that the consumer dies when one needs it to.

Eventually we came up with an ‘observer collection’ implementation that works for our specific circumstances. If you have massively parallel code or rapidly changing collections this is probably not going to work for you.

The code makes use of a RecursiveReadWriteMutex and associated RecursiveReadLock and RecursiveWriteLock . These work pretty much as one might expect – a read lock can be acquired provided there are no write locks and a write lock can be acquired unless another thread has a lock. (This isn’t a component of C++11, so we rolled our own – one day we will have shared and exclusive locks available to us in the standard library.)

Information about the observers is held in a singly linked list with atomic forward pointers that allow lock free traversal and expansion (Listing 1). (We don’t need contraction for our use case – we might end up with a few ‘free’ items in the list but not enough to be of concern.)

struct ListItem
{
  ListItem() {}
  RecursiveReadWriteMutex mutex;
  shared_ptr<Observer> observer;
  atomic<ListItem*> next{nullptr};
  ~ListItem() { delete next.load(); }
};
			
Listing 1

There are three member functions to go along with this. The easy one is for_each() which is used to traverse the collection and send notifications. Each node is read locked in turn, and the supplied functor invoked. Note that we need to lock the node until we complete the notification to prevent a race with another thread removing the observer and deleting the consumer. No lock is needed for the traversal itself as we are assuming that no node is ever removed from the list. (Listing 2)

template<class Observer>
void BasicObservers<Observer>::for_each(
  function<void(shared_ptr<Observer> 
    const& observer)> const& f)
{
  ListItem* current_item = &head;
  while (current_item)
  {
    RecursiveReadLock lock{current_item->mutex};
     // We need to take a copy in case we recursively
    // remove during call
    if (auto const copy_of_observer =
      current_item->observer)
            f(copy_of_observer);
      current_item = current_item->next;
  }
}
			
Listing 2

The second function is to add an item. This searches for a free node. If it finds one it tries to upgrade to a write lock and, if it is still free, uses it to store the supplied observer. Otherwise a new node is added to the end of the list using the C++11 atomic ‘compare exchange’. This code also assumes that the list never shrinks as current_item has to remain valid. (Listing 3) One ‘gotcha’ here (spotted by the Overload review team) is that compare_exchange_weak() can ‘fail spuriously’ 1 , so it is necessary to test that expected has changed before assigning it to current_item .

template<class Observer>
void BasicObservers<Observer>::add(
  shared_ptr<Observer> const& observer)
{
  ListItem* current_item = &head;

  do
  {
    // Note: we release the read lock to avoid two
    // threads calling add at the same time 
    // mutually blocking the other's upgrade to
    // write lock.
    {
      RecursiveReadLock lock{current_item->mutex};
      if (current_item->observer) continue;
    }
    RecursiveWriteLock lock{current_item->mutex};

    if (!current_item->observer)
    {
       current_item->observer = observer;
       return;
    }
  }
  while (current_item->next && 
    (current_item = current_item->next));
  // No empty Items so append a new one
  auto new_item = new ListItem;
    new_item->observer = observer;

  for (ListItem* expected{nullptr};
  !current_item->next.compare_exchange_weak
    (expected, new_item);
  expected = nullptr)
  {
    if (expected)
    current_item = expected;
  }
}
			
Listing 3

The final member function removes an observer by searching the list. The logic is very similar to that in add() . (Listing 4)

template<class Observer>
void BasicObservers<Observer>::remove
  (shared_ptr<Observer> const& observer)
{
  ListItem* current_item = &head;

  do
  {
    {
      RecursiveReadLock lock{current_item->mutex};
      if (current_item->observer != observer)
        continue;
    }
    RecursiveWriteLock lock{current_item->mutex};

    if (current_item->observer == observer)
    {
      current_item->observer.reset();
      return;
    }
  }
  while ((current_item = current_item->next));
}
			
Listing 4

It is a deceptively simple solution to a problem that for a while seemed intractable. I hope you enjoy it.

References

The project is called ‘Mir’ and can be found (including the full version of the code) at http://unity.ubuntu.com/mir/

Acknowledgements

The Mir team ( https://launchpad.net/~mir-team/+members ) especially Alberto Aguirre and Robert Carr for encountering this design context and working through a series of proposed resolutions.

The Overload team for seeing the code and text presented here with a fresh eye and spotting a few problems with correctness and clarity that had been overlooked.

  • http://www.cplusplus.com/reference/atomic/atomic/compare_exchange_weak/ : “Unlike compare_exchange_strong , this weak version is allowed to fail spuriously by returning false even when expected indeed compares equal to the contained object . This may be acceptable behavior for certain looping algorithms, and may lead to significantly better performance on some platforms. On these spurious failures , the function returns false while not modifying expected .”





Your Privacy

By clicking "Accept Non-Essential Cookies" you agree ACCU can store non-essential cookies on your device and disclose information in accordance with our Privacy Policy and Cookie Policy.

Current Setting: Non-Essential Cookies REJECTED


By clicking "Include Third Party Content" you agree ACCU can forward your IP address to third-party sites (such as YouTube) to enhance the information presented on this site, and that third-party sites may store cookies on your device.

Current Setting: Third Party Content EXCLUDED



Settings can be changed at any time from the Cookie Policy page.