The dRuby Book

5.4 Passing Objects via Queue

So far, we’ve discussed various exclusive locking mechanisms to “protect something.” In this section, we’ll focus more on a messaging layer that passes objects among threads.

Using Queue

Queue provides first in, first out (FIFO) buffering (see Figure 28, How the queue mechanism works). Queue is one of the most common ways to act as a messaging layer.

images/d2queue.png

Figure 28. How the queue mechanism works

To put data into Queue, you can use the enq method or the push method. Queue doesn’t have any upper limit. To take out data from Queue, you can use either the deq method or the pop method. If you deq when Queue is empty, then the thread gets blocked. This will be unblocked when data gets enq. enq and deq are atomic operations, and therefore multiple threads will not deq the same data at the same time.

Let’s open two terminals to experiment with Queue. To use Queue, require ’thread’. On the first terminal, create a Queue object, publish via DRb, and then call the enq method to put integer 1.

  ​% irb --prompt simple -r drb/drb​
  ​>> require 'thread'​
  ​>> q = Queue.new​
  ​>> DRb.start_service('druby://localhost:12345', q)​
  ​>> q.enq(1)​

On the second terminal, create a reference object to the published Queue object, and call deq. When deq is called for the first time, it should return 1, because 1 was enq earlier. If you deq for the second time, it should be blocked because there is no data.

  ​% irb --prompt simple -r drb/drb​
  ​>> DRb.start_service​
  ​>> q = DRbObject.new_with_uri('druby://localhost:12345')​
  ​>> q.deq​
  ​=>1​
  ​>> q.deq​

Go to terminal 1 and do enq again. This unblocks deq at terminal 2.

  ​>> q.enq(2)​

Using SizedQueue

SizedQueue is a subclass of Queue, and it is a Queue with an upper limit (see Figure 29, SizedQueue mechanism). Queue doesn’t have any upper limit, so it never blocks enq, but SizedQueue will block enq if called over the limit. When deq decreases the number of objects below the upper limit, then the enq block will be unblocked. Let’s try it.

images/d2sized_queue.png

Figure 29. SizedQueue mechanism

First create a SizedQueue with a length of 4 at terminal 1 and then publish it at ’druby://localhost:12345’.

  ​% irb --prompt simple -r drb/drb​
  ​>> require 'thread'​
  ​>> q = SizedQueue.new(4)​
  ​>> DRb.start_service('druby://localhost:12345', q)​

At terminal 2, connect to the dRuby server you just started at terminal 1 and assign the remote object to q and then enq four objects into SizedQueue. When it’s finished, we’ll check the length of SizedQueue with the length method.

  ​% irb --prompt simple -r drb/drb​
  ​>> DRb.start_service​
  ​>> q = DRbObject.new_with_uri('druby://localhost:12345')​
  ​>> q.enq(1)​
  ​>> q.enq(2)​
  ​>> q.enq(3)​
  ​>> q.enq(4)​
  ​>> q.length​
  ​=> 4​

If you enq again, the enq should get blocked because it already reached the limit. Let’s try it from terminal 1.

  ​# [Terminal 1]​
  ​>> q.enq(5)​

Your enq should be blocked. Now, let’s unblock by doing deq at terminal 2.

  ​# [Terminal 2]​
  ​>> q.deq​
  ​=> 1​

You should have received 1, which you enq in the very beginning. When this happens, it should have unblocked your terminal 1.

Like we did for Queue, let’s check how it blocks using deq. This time, it loops until nil arrives. Go back to terminal 1 and type the following loop command:

  ​>> loop do​
  ​?> puts "length: #{q.length}"​
  ​>> it = q.deq​
  ​>> puts "deq: #{it}"​
  ​>> break if it.nil?​
  ​>> end​
  ​length: 4​
  ​deq: 2​
  ​length: 3​
  ​deq: 3​
  ​length: 2​
  ​deq: 4​
  ​length: 1​
  ​deq: 5​
  ​length: 0​

When q.length becomes 0, deq gets blocked. Let’s unblock using enq at terminal 2.

  ​>> q.enq(6)​

It first gets unblocked, goes to the next loop, and gets blocked again at the next deq. To finish the loop, enq nil at terminal 2.

  ​>> q.enq(nil)​

The loop at terminal 1 should complete because it received nil from terminal 2.

Rewriting Rendezvous with SizedQueue

Before we finish this chapter, let’s reimplement Rendezvous, which we created using Monitor in Synchronization Example with Rendezvous. We’ll use SizedQueue this time. Exclusive locking and “messaging layer” are two sides of the same coin. If you have one implementation, you can create another implementation. This time, the SizedQueue version is a lot shorter. It’s probably because SizedQueue is the more abstracted method but also because it’s more suited for this problem.

rendezvous_q.rb
  ​require 'thread'
  class Rendezvous​
  def initialize​
  super
  ​ @send_queue = SizedQueue.new(1)​
  ​ @recv_queue = SizedQueue.new(1)​
  end
  def send(obj)​
  ​ @send_queue.enq(obj)​
  ​ @recv_queue.deq​
  end
  ​​
  def recv​
  ​ @send_queue.deq​
  ensure
  ​ @recv_queue.enq(nil)​
  end
  end

To pass objects among threads, you can use another mechanism called Rinda::TupleSpace.Rinda::TupleSpace is a Ruby implementation of the Linda tuple- space. We’ll talk about Rinda::TupleSpace in Chapter 6, Coordinating Processes Using Rinda.

As we have seen in this chapter, you can use the majority of threading coordination mechanisms in dRuby. It’s a common practice to use Monitor and Queue in dRuby like you do in Ruby to ensure thread safety.