5.4 Passing Objects via Queue
So far, we’ve discussed various exclusive locking mechanisms to “protect something.” In this section, we’ll focus more on a messaging layer that passes objects among threads.
Using Queue
Queue
provides first in, first out (FIFO) buffering (see Figure 28, How the queue mechanism works). Queue
is one of the most common ways to act as a messaging layer.
To put data into Queue
, you can use the enq
method or the push
method. Queue
doesn’t have any upper limit. To take out data from Queue
, you can use either the deq
method or the pop
method. If you deq
when Queue
is empty, then the thread gets blocked. This will be unblocked when data gets enq
. enq
and deq
are atomic operations, and therefore multiple threads will not deq
the same data at the same time.
Let’s open two terminals to experiment with Queue
. To use Queue
, require ’thread’
.
On the first terminal, create a Queue
object, publish via DRb, and then call the enq
method to put integer 1.
% irb --prompt simple -r drb/drb |
|
>> require 'thread' |
|
>> q = Queue.new |
|
>> DRb.start_service('druby://localhost:12345', q) |
|
>> q.enq(1) |
On the second terminal, create a reference object to the published Queue
object, and call deq
. When deq
is called for the first time, it should return 1, because 1 was enq
earlier. If you deq
for the second time, it should be blocked because there is no data.
% irb --prompt simple -r drb/drb |
|
>> DRb.start_service |
|
>> q = DRbObject.new_with_uri('druby://localhost:12345') |
|
>> q.deq |
|
=>1 |
|
>> q.deq |
Go to terminal 1 and do enq
again. This unblocks deq
at terminal 2.
>> q.enq(2) |
Using SizedQueue
SizedQueue
is a subclass of Queue
, and it is a Queue
with an upper limit (see Figure 29, SizedQueue mechanism). Queue
doesn’t have any upper limit, so it never blocks enq
, but SizedQueue
will block enq
if called over the limit. When deq
decreases the number of objects below the upper limit, then the enq
block will be unblocked. Let’s try it.
First create a SizedQueue
with a length of 4 at terminal 1 and then publish it at ’druby://localhost:12345’
.
% irb --prompt simple -r drb/drb |
|
>> require 'thread' |
|
>> q = SizedQueue.new(4) |
|
>> DRb.start_service('druby://localhost:12345', q) |
At terminal 2, connect to the dRuby server you just started at terminal 1 and assign the remote object to q
and then enq
four objects into SizedQueue
. When it’s finished, we’ll check the length of SizedQueue
with the length
method.
% irb --prompt simple -r drb/drb |
|
>> DRb.start_service |
|
>> q = DRbObject.new_with_uri('druby://localhost:12345') |
|
>> q.enq(1) |
|
>> q.enq(2) |
|
>> q.enq(3) |
|
>> q.enq(4) |
|
>> q.length |
|
=> 4 |
If you enq
again, the enq
should get blocked because it already reached the limit. Let’s try it from terminal 1.
# [Terminal 1] |
|
>> q.enq(5) |
Your enq
should be blocked. Now, let’s unblock by doing deq
at terminal 2.
# [Terminal 2] |
|
>> q.deq |
|
=> 1 |
You should have received 1, which you enq
in the very beginning. When this happens, it should have unblocked your terminal 1.
Like we did for Queue
, let’s check how it blocks using deq
. This time, it loops until nil
arrives. Go back to terminal 1 and type the following loop
command:
>> loop do |
|
?> puts "length: #{q.length}" |
|
>> it = q.deq |
|
>> puts "deq: #{it}" |
|
>> break if it.nil? |
|
>> end |
|
length: 4 |
|
deq: 2 |
|
length: 3 |
|
deq: 3 |
|
length: 2 |
|
deq: 4 |
|
length: 1 |
|
deq: 5 |
|
length: 0 |
When q.length
becomes 0, deq
gets blocked. Let’s unblock using enq
at terminal 2.
>> q.enq(6) |
It first gets unblocked, goes to the next loop, and gets blocked again at the next deq
. To finish the loop, enq
nil
at terminal 2.
>> q.enq(nil) |
The loop at terminal 1 should complete because it received nil
from terminal 2.
Rewriting Rendezvous with SizedQueue
Before we finish this chapter, let’s reimplement Rendezvous
, which we created using Monitor
in Synchronization Example with Rendezvous. We’ll use SizedQueue
this time. Exclusive locking and “messaging layer” are two sides of the same coin. If you have one implementation, you can create another implementation. This time, the SizedQueue
version is a lot shorter. It’s probably because SizedQueue
is the more abstracted method but also because it’s more suited for this problem.
rendezvous_q.rb | |
require 'thread' |
|
class Rendezvous |
|
def initialize |
|
super |
|
@send_queue = SizedQueue.new(1) |
|
@recv_queue = SizedQueue.new(1) |
|
end |
|
def send(obj) |
|
@send_queue.enq(obj) |
|
@recv_queue.deq |
|
end |
|
|
|
def recv |
|
@send_queue.deq |
|
ensure |
|
@recv_queue.enq(nil) |
|
end |
|
end |
To pass objects among threads, you can use another mechanism called Rinda::TupleSpace
.Rinda::TupleSpace
is a Ruby implementation of the Linda tuple- space. We’ll talk about Rinda::TupleSpace
in Chapter 6, Coordinating Processes Using Rinda.
As we have seen in this chapter, you can use the majority of threading coordination mechanisms in dRuby. It’s a common practice to use Monitor
and Queue
in dRuby like you do in Ruby to ensure thread safety.