9.2 Drip Compared to Queue
Let’s first compare Drip with Queue
to understand how process coordination works differently.
We use the Queue
class that comes with Ruby. Queue
is a FIFO buffer in which you can put any object as an element. You can use push
to add an object and pop
to take it out.
Multiple threads can pop
at the same time, but an element goes to only one thread. The same element never goes to multiple threads with pop
.
If you pop
against an empty Queue
, then pop
will block.
When a new object is added, then the object reaches to only the thread that acquired the object.
Drip has an equivalent method called read
. read
will return an element newer than the specified cursor. However, Drip doesn’t delete the element. When multiple threads read
with the same cursor, Drip returns the same element.
Both Drip and Queue
can wait for the arrival of a new element.
The key difference is whether the element is consumed.
Queue#pop
will consume the element, but Drip#read
doesn’t consume elements. This means multiple people can read the same element repeatedly. In Rinda, if you lose a tuple because of an application bug or system crash, it means that the entire system could go down. In Drip, you never need to worry about the loss of an element.
Let’s see how this works in Drip with code.
Basic Operations with Read and Write Methods
You can use two methods to compare with Queue
.
Drip#write(obj, *tags) |
Drip#write
adds an element to Drip. This is the only operation to change the state of Drip. This operation stores an element obj
into Drip and returns the key that you use to retrieve the object. You can also specify tags to make object access easier, which I’ll explain in Using Tags.
Another method is read
.
Drip#read(key, n=1, at_least=1, timeout=nil) |
This is the most basic operation for browsing Drip. key
is a cursor, and this method returns n
number of arrays that consist of key and value pairs added later than the requested key
. n
specifies the number of matched pairs to return. You can configure it so that the method is blocked until at_least
number of elements arrive with timeout
.
In short, you can specify “Give me n
elements, but wait until at_least
elements are there.”
Installing and Starting Drip
Oops, we haven’t installed Drip yet. Drip depends on an external library called RBTree
. If you use gem
, it should install the dependency as well.
gem install drip |
Next, let’s start the Drip server.
Drip uses a plain-text file as a default secondary storage. To create a Drip object, you need to specify a directory. The next script generates Drip and serves via dRuby.
drip_s.rb | |
require 'drip' |
|
require 'drb' |
|
|
|
class Drip |
|
def quit |
|
Thread.new do |
|
synchronize do |key| |
|
exit(0) |
|
end |
|
end |
|
end |
|
end |
|
|
|
drip = Drip.new('drip_dir') |
|
DRb.start_service('druby://localhost:54321', drip) |
|
DRb.thread.join |
The quit
method terminates the process via RMI. The script waits until Drip doesn’t write to any secondary storage using synchronize
(see Locking Single Resources with Mutex for more detail).
Start Drip like this:
% ruby drip_s.rb |
It won’t print out anything; it simply runs as a server.
MyDrip
I prepared a single-user Drip server called MyDrip. This works only for POSIX-compliant operating systems (such as Mac OS X), but it’s very handy. It creates a .drip
storage directory under your home directory and communicates with the Unix domain socket. Since this is just a normal Unix domain socket, you can restrict permission and ownership using the file system. Unlike TCP, a Unix socket is handy, because you can have your own socket file descriptor on your own path, and you don’t have to worry about port conflict with other users. To use MyDrip, you need to require my_drip
(my_drip.rb
comes with Drip gem, so you don’t have to download the file by yourself).
Let’s invoke the server.
# terminal 1 |
|
% irb -r my_drip --prompt simple |
|
>> MyDrip.invoke |
|
=> 51252 |
|
>> MyDrip.class |
|
=> DRb::DRbObject |
MyDrip is actually a DRbObject
pointing to the fixed Drip server port, but it also has a special invoke
method. MyDrip.invoke
forks a new process and starts a Drip daemon if necessary. If your own MyDrip server is already running, it finishes without doing anything. Use MyDrip.quit
when you want to stop MyDrip.
MyDrip is a convenient daemon to store objects while running irb. In my environment, I always have MyDrip up and running to archive my Twitter timeline. I also use it to take notes or to use as middleware for a bot.
I always require my_drip
so that I can write a memo to MyDrip while running irb. You can insert the following line in .irbrc
to include it by default:
require 'my_drip' |
Going forward, we’ll use Drip for most of the exercises. If you can’t use MyDrip in your environment, you can create the following client:
drip_d.rb | |
require 'drb/drb' |
|
MyDrip = DRbObject.new_with_uri('druby://localhost:54321') |
You can use drip_d.rb
and drip_s.rb
as an alternative to MyDrip.
Comparing with Queue Again
Let’s experiment while MyDrip (or the equivalent drip_s.rb
) is up and running.
Let’s add two new objects using the write
method. As explained earlier, write
is the only method to change the state of Drip. The response of write
returns the key that’s associated with the added element. The key is an integer generated from a timestamp (usec
). The number will be a Fixnum
class in a 64-bit machine.
# terminal 2 |
|
% irb -r my_drip --prompt simple |
|
>> MyDrip.write('Hello') |
|
=> 1312541947966187 |
|
>> MyDrip.write('world') |
|
=> 1312541977245158 |
Next, let’s read data from Drip.
# terminal 3 |
|
% irb -r my_drip --prompt simple |
|
>> MyDrip.read(0, 1) |
|
=> [[1312541947966187, "Hello"]] |
read
is a method to read n
number of elements since the specified cursor, and it returns an array consisting of a key and value pair. To read elements in order, you can move the cursor as follows:
>> k = 0 |
|
=> 0 |
|
>> k, v = MyDrip.read(k, 1)[0] |
|
=> [1312541947966187, "Hello"] |
|
>> k, v = MyDrip.read(k, 1)[0] |
|
=> [1312541977245158, "World"] |
So far, you’ve read two elements. Let’s try to read one more.
>> k, v = MyDrip.read(k, 1)[0] |
It will be blocked since there are no elements newer than k
. If you add a new element from terminal 2, it will unblock and be able to read the object.
# terminal 2 |
|
>> MyDrip.write('Hello, Again') |
|
=> 1312542657718320 |
|
>> k, v = MyDrip.read(k, 1)[0] |
|
=> [1312542657718320, "Hello, Again"] |
How did it go? Were you able to simulate the waiting operation?
Let’s increase the number of the listener and start reading from 0.
terminal 4 |
|
% irb -r my_drip --prompt simple |
|
>> k = 0 |
|
=> 0 |
|
>> k, v = MyDrip.read(k, 1)[0] |
|
=> [1312541947966187, "Hello"] |
|
>> k, v = MyDrip.read(k, 1)[0] |
|
=> [1312541977245158, "World"] |
|
>> k, v = MyDrip.read(k, 1)[0] |
|
=> [1312542657718320, "Hello, Again"] |
You should be able to read the same element. Unlike Queue
, Drip doesn’t consume elements, so you can keep reading the same information. Instead, you need to specify where to read, every time you request.
Let’s try to restart MyDrip. The quit
method terminates the process when no one is writing. Call invoke
to restart. MyDrip.invoke
may take a while to start up if the log size is big.
# terminal 1 |
|
>> MyDrip.quit |
|
=> #<Thread:...> |
|
>> MyDrip.invoke |
|
=> 61470 |
Let’s call the read
method to check whether you recovered the previous state.
# terminal 1 |
|
>> MyDrip.read(0, 3) |
|
=> [[1312541947966187, "Hello"], [1312541977245158, "World"], |
|
[1312542657718320, "Hello, Again"]] |
Phew, looks like it’s working fine.
Let’s recap what we’ve learned so far. Drip is similar to Queue
, where you can retrieve data in a time order, and also you can wait for new data to arrive. It’s different because data does not decrease. You can read the same elements from different processes, and the same process can read the same element again and again. You may have experienced that batch operations tend to stop often while developing them as well as running them in a production environment. With Drip, you can work around this if you make use of the functionality because you can restart from the middle many times.
So far, we’ve seen two basic operations, write
and read
, in comparison with Queue
.