The dRuby Book

9.2 Drip Compared to Queue

Let’s first compare Drip with Queue to understand how process coordination works differently.

We use the Queue class that comes with Ruby. Queue is a FIFO buffer in which you can put any object as an element. You can use push to add an object and pop to take it out. Multiple threads can pop at the same time, but an element goes to only one thread. The same element never goes to multiple threads with pop.

If you pop against an empty Queue, then pop will block. When a new object is added, then the object reaches to only the thread that acquired the object.

Drip has an equivalent method called read. read will return an element newer than the specified cursor. However, Drip doesn’t delete the element. When multiple threads read with the same cursor, Drip returns the same element.

Both Drip and Queue can wait for the arrival of a new element. The key difference is whether the element is consumed.

Queue#pop will consume the element, but Drip#read doesn’t consume elements. This means multiple people can read the same element repeatedly. In Rinda, if you lose a tuple because of an application bug or system crash, it means that the entire system could go down. In Drip, you never need to worry about the loss of an element.

Let’s see how this works in Drip with code.

Basic Operations with Read and Write Methods

You can use two methods to compare with Queue.

  ​Drip#write(obj, *tags)​

Drip#write adds an element to Drip. This is the only operation to change the state of Drip. This operation stores an element obj into Drip and returns the key that you use to retrieve the object. You can also specify tags to make object access easier, which I’ll explain in Using Tags.

Another method is read.

  ​Drip#read(key, n=1, at_least=1, timeout=nil)​

This is the most basic operation for browsing Drip. key is a cursor, and this method returns n number of arrays that consist of key and value pairs added later than the requested key. n specifies the number of matched pairs to return. You can configure it so that the method is blocked until at_least number of elements arrive with timeout.

In short, you can specify “Give me n elements, but wait until at_least elements are there.”

Installing and Starting Drip

Oops, we haven’t installed Drip yet. Drip depends on an external library called RBTree. If you use gem, it should install the dependency as well.

  ​gem install drip​

Next, let’s start the Drip server.

Drip uses a plain-text file as a default secondary storage. To create a Drip object, you need to specify a directory. The next script generates Drip and serves via dRuby.

drip_s.rb
  ​require 'drip'
  ​require 'drb'
  ​​
  class Drip​
  def quit​
  ​ Thread.new do
  ​ synchronize do |key|​
  ​ exit(0)​
  end
  end
  end
  end
  ​​
  ​drip = Drip.new('drip_dir')​
  ​DRb.start_service('druby://localhost:54321', drip)​
  ​DRb.thread.join​

The quit method terminates the process via RMI. The script waits until Drip doesn’t write to any secondary storage using synchronize (see Locking Single Resources with Mutex for more detail).

Start Drip like this:

  ​% ruby drip_s.rb​

It won’t print out anything; it simply runs as a server.

MyDrip

I prepared a single-user Drip server called MyDrip. This works only for POSIX-compliant operating systems (such as Mac OS X), but it’s very handy. It creates a .drip storage directory under your home directory and communicates with the Unix domain socket. Since this is just a normal Unix domain socket, you can restrict permission and ownership using the file system. Unlike TCP, a Unix socket is handy, because you can have your own socket file descriptor on your own path, and you don’t have to worry about port conflict with other users. To use MyDrip, you need to require my_drip (my_drip.rb comes with Drip gem, so you don’t have to download the file by yourself).

Let’s invoke the server.

  ​# terminal 1​
  ​% irb -r my_drip --prompt simple​
  ​>> MyDrip.invoke​
  ​=> 51252​
  ​>> MyDrip.class​
  ​=> DRb::DRbObject​

MyDrip is actually a DRbObject pointing to the fixed Drip server port, but it also has a special invoke method. MyDrip.invoke forks a new process and starts a Drip daemon if necessary. If your own MyDrip server is already running, it finishes without doing anything. Use MyDrip.quit when you want to stop MyDrip.

MyDrip is a convenient daemon to store objects while running irb. In my environment, I always have MyDrip up and running to archive my Twitter timeline. I also use it to take notes or to use as middleware for a bot.

I always require my_drip so that I can write a memo to MyDrip while running irb. You can insert the following line in .irbrc to include it by default:

  ​require 'my_drip'

Going forward, we’ll use Drip for most of the exercises. If you can’t use MyDrip in your environment, you can create the following client:

drip_d.rb
  ​require 'drb/drb'
  ​MyDrip = DRbObject.new_with_uri('druby://localhost:54321')​

You can use drip_d.rb and drip_s.rb as an alternative to MyDrip.

Comparing with Queue Again

Let’s experiment while MyDrip (or the equivalent drip_s.rb) is up and running.

Let’s add two new objects using the write method. As explained earlier, write is the only method to change the state of Drip. The response of write returns the key that’s associated with the added element. The key is an integer generated from a timestamp (usec). The number will be a Fixnum class in a 64-bit machine.

  ​# terminal 2​
  ​% irb -r my_drip --prompt simple​
  ​>> MyDrip.write('Hello')​
  ​=> 1312541947966187​
  ​>> MyDrip.write('world')​
  ​=> 1312541977245158​

Next, let’s read data from Drip.

  ​# terminal 3​
  ​% irb -r my_drip --prompt simple​
  ​>> MyDrip.read(0, 1)​
  ​=> [[1312541947966187, "Hello"]]​

read is a method to read n number of elements since the specified cursor, and it returns an array consisting of a key and value pair. To read elements in order, you can move the cursor as follows:

  ​>> k = 0​
  ​=> 0​
  ​>> k, v = MyDrip.read(k, 1)[0]​
  ​=> [1312541947966187, "Hello"]​
  ​>> k, v = MyDrip.read(k, 1)[0]​
  ​=> [1312541977245158, "World"]​

So far, you’ve read two elements. Let’s try to read one more.

  ​>> k, v = MyDrip.read(k, 1)[0]​

It will be blocked since there are no elements newer than k. If you add a new element from terminal 2, it will unblock and be able to read the object.

  ​# terminal 2​
  ​>> MyDrip.write('Hello, Again')​
  ​=> 1312542657718320​
  ​>> k, v = MyDrip.read(k, 1)[0]​
  ​=> [1312542657718320, "Hello, Again"]​

How did it go? Were you able to simulate the waiting operation?

Let’s increase the number of the listener and start reading from 0.

  ​terminal 4​
  ​% irb -r my_drip --prompt simple​
  ​>> k = 0​
  ​=> 0​
  ​>> k, v = MyDrip.read(k, 1)[0]​
  ​=> [1312541947966187, "Hello"]​
  ​>> k, v = MyDrip.read(k, 1)[0]​
  ​=> [1312541977245158, "World"]​
  ​>> k, v = MyDrip.read(k, 1)[0]​
  ​=> [1312542657718320, "Hello, Again"]​

You should be able to read the same element. Unlike Queue, Drip doesn’t consume elements, so you can keep reading the same information. Instead, you need to specify where to read, every time you request.

Let’s try to restart MyDrip. The quit method terminates the process when no one is writing. Call invoke to restart. MyDrip.invoke may take a while to start up if the log size is big.

  ​# terminal 1​
  ​>> MyDrip.quit​
  ​=> #<Thread:...>​
  ​>> MyDrip.invoke​
  ​=> 61470​

Let’s call the read method to check whether you recovered the previous state.

  ​# terminal 1​
  ​>> MyDrip.read(0, 3)​
  ​=> [[1312541947966187, "Hello"], [1312541977245158, "World"],​
  ​ [1312542657718320, "Hello, Again"]]​

Phew, looks like it’s working fine.

Let’s recap what we’ve learned so far. Drip is similar to Queue, where you can retrieve data in a time order, and also you can wait for new data to arrive. It’s different because data does not decrease. You can read the same elements from different processes, and the same process can read the same element again and again. You may have experienced that batch operations tend to stop often while developing them as well as running them in a production environment. With Drip, you can work around this if you make use of the functionality because you can restart from the middle many times.

So far, we’ve seen two basic operations, write and read, in comparison with Queue.