The dRuby Book

10.2 Examining Each Component

The search system consists of three components, Crawler, Indexer, and Drip, as a messaging layer. In this section, we will examine each component.

How System Elements Are Structured

Let’s start by looking at the objects and tags that this system writes to Drip. We’ll mainly use Drip as the file update notification system.

There are also some optional tags:

Next, let’s see how to use these objects and tags.

How the Crawler Works

Here’s the basic mechanism of this simple crawler:

  class Crawler​
  ​ include MonitorMixin​
  ​​
  def initialize​
  super()​
  ​ @root = File.expand_path('~/develop/git-repo/')​
  ​ @drip = MyDrip​
  ​ k, = @drip.head(1, 'rbcrawl-begin')[0]​
  ​ @fence = k || 0​
  end
  ​​
  def last_mtime(fname)​
  ​ k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0]​
  ​ (v && k > @fence) ? v[1] : Time.at(1)​
  end
  ​​
  def do_crawl​
  ​ synchronize do
  ​ ary = []​
  ​ Dir.chdir(@root)​
  ​ Dir.glob('**/*.rb').each do |fname|​
  ​ mtime = File.mtime(fname)​
  next if last_mtime(fname) >= mtime​
  ​ @drip.write([fname, mtime, File.read(fname)],​
  'rbcrawl', 'rbcrawl-fname=' + fname)​
  ​ ary << fname​
  end
  ​ @drip.write(ary, 'rbcrawl-footprint')​
  ​ ary​
  end
  end
  ​​
  def quit​
  ​ synchronize do
  ​ exit(0)​
  end
  end
  end

First the crawler tries to find files that match the *.rb wildcard under (@root). Once found, it checks the update time and writes the content and timestamp if the file has been updated since the last crawl. This will write the following data:

  ​@drip.write(​
  ​ ["sample/demo4book/index.rb", 2011-08-23 23:50:44 +0100, "file content"],​
  "rbcrawl", "rbcrawl-fname=sample/demo4book/index.rb"
  ​)​

The value is an array of a filename, timestamp, and file content. The value has two tags.

As explained earlier, the crawler runs every sixty seconds and ends after crawling if you type something from standard input. When the crawler terminates, it will write the list of filenames with an rbcrawl-footprint tag. Here’s the example data:

  ​@drip.write(["sample/demo4book/index.rb"], 'rbcrawl-footprint')​

This version of the crawler doesn’t update the file deletion information, but you may be able to add that functionality by using this information.

How do you find out if the file is updated? You can get the previous version with the head method and compare with it. Or, you can search the latest file version by passing rbcrawl-fname=file name into the head method.

  ​k, v = @drip.head(1, "rbcrawl-fname=sample/demo4book/index.rb")[0]​

Here is the complete crawler code:

crawl.rb
  ​require 'pp'
  ​require 'my_drip'
  ​require 'monitor'
  ​​
  class Crawler​
  ​ include MonitorMixin​
  ​​
  def initialize​
  super()​
  ​ @root = File.expand_path('~/develop/git-repo/')​
  ​ @drip = MyDrip​
  ​ k, = @drip.head(1, 'rbcrawl-begin')[0]​
  ​ @fence = k || 0​
  end
  ​​
  def last_mtime(fname)​
  ​ k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0]​
  ​ (v && k > @fence) ? v[1] : Time.at(1)​
  end
  ​​
  def do_crawl​
  ​ synchronize do
  ​ ary = []​
  ​ Dir.chdir(@root)​
  ​ Dir.glob('**/*.rb').each do |fname|​
  ​ mtime = File.mtime(fname)​
  next if last_mtime(fname) >= mtime​
  ​ @drip.write([fname, mtime, File.read(fname)],​
  'rbcrawl', 'rbcrawl-fname=' + fname)​
  ​ ary << fname​
  end
  ​ @drip.write(ary, 'rbcrawl-footprint')​
  ​ ary​
  end
  end
  ​​
  def quit​
  ​ synchronize do
  ​ exit(0)​
  end
  end
  end
  ​​
  if __FILE__ == $0​
  ​ crawler = Crawler.new​
  ​ Thread.new do
  while true​
  ​ pp crawler.do_crawl​
  ​ sleep 60​
  end
  end
  ​​
  ​ gets​
  ​ crawler.quit​
  end

So far, we implemented the crawler script using write and head methods. Next, we will look into how to implement the indexer script.

How the Indexer Works

The indexer provides index creation, update, and searching. It returns the list of filenames that include the word you are searching for. Since this sample is a miniature version, it creates the index in memory using the RBTree library.

  class Indexer​
  def initialize(cursor=0)​
  ​ @drip = MyDrip​
  ​ @dict = Dict.new​
  ​ k, = @drip.head(1, 'rbcrawl-begin')[0]​
  ​ @fence = k || 0​
  ​ @cursor = [cursor, @fence].max​
  end
  ​ attr_reader :dict​
  ​​
①  def update_dict ​
  ​ each_document do |cur, prev|​
  ​ @dict.delete(*prev) if prev​
  ​ @dict.push(*cur)​
  end
  end
  ​​
  def each_document​
  while true​
  ​ ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1)​
  ​ ary.each do |k, v|​
  ​ prev = prev_version(k, v[0])​
  yield(v, prev)​
  ​ @cursor = k​
  end
  end
  end
  ​​
②  def prev_version(cursor, fname) ​
  ​ k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)​
  ​ (v && k > @fence) ? v : nil​
  end
  end

The indexer takes out objects with the rbcrawl tag and updates the index.

  ​@drip.read_tag(@cursor, 'rbcrawl', 10, 1)​

The fourth argument, 1, is very important. Do you remember that I said earlier if elements newer than the specified keys don’t have at_least elements, then it will block until enough number of elements arrive? The preceding code requests ten elements at a time with a minimum of one element, so it will block if there are no elements to return. This enables the indexer to wait until the crawler writes data with an rbcrawl tag.

Objects with rbcrawl are notification events as well as documents on their own, containing the filename, the updated time, and their content. Unlike Queue, Drip lets you read the elements you already read repeatedly. You can check the previous version using the older method, as shown in ②.

If Indexer already has a previous document of the files that are updated, it will remove the old content first and then add an index with the new content, as shown in ①.

Once you start Indexer, the script will generate a thread and start creating an index under the subthread by dealing with data using Drip.read_tag.

  ​indexer ||= Indexer.new(0)​
  ​Thread.new do
  ​ indexer.update_dict​
  end

The main thread of the script awaits input from a user. It searches the word and prints out the result once the question is received from the input.

  while line = gets​
  ​ ary = indexer.dict.query(line.chomp)​
  ​ pp ary​
  ​ pp ary.size​
  end

Here’s the complete Indexer:

index.rb
  ​require 'nkf'
  ​require 'rbtree'
  ​require 'my_drip'
  ​require 'monitor'
  ​require 'pp'
  ​​
  class Indexer​
  def initialize(cursor=0)​
  ​ @drip = MyDrip​
  ​ @dict = Dict.new​
  ​ k, = @drip.head(1, 'rbcrawl-begin')[0]​
  ​ @fence = k || 0​
  ​ @cursor = [cursor, @fence].max​
  end
  ​ attr_reader :dict​
  ​​
  def update_dict​
  ​ each_document do |cur, prev|​
  ​ @dict.delete(*prev) if prev​
  ​ @dict.push(*cur)​
  end
  end
  ​​
  def each_document​
  while true​
  ​ ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1)​
  ​ ary.each do |k, v|​
  ​ prev = prev_version(k, v[0])​
  yield(v, prev)​
  ​ @cursor = k​
  end
  end
  end
  ​​
  def prev_version(cursor, fname)​
  ​ k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname)​
  ​ (v && k > @fence) ? v : nil​
  end
  end
  ​​
  class Dict​
  ​ include MonitorMixin​
  def initialize​
  super()​
  ​ @tree = RBTree.new​
  end
  ​​
  def query(word)​
  ​ synchronize do
  ​ @tree.bound([word, 0, ''], [word + "\0", 0, '']).collect {|k, v| k[2]}​
  end
  end
  ​​
  def delete(fname, mtime, src)​
  ​ synchronize do
  ​ each_tree_key(fname, mtime, src) do |key|​
  ​ @tree.delete(key)​
  end
  end
  end
  def push(fname, mtime, src)​
  ​ synchronize do
  ​ each_tree_key(fname, mtime, src) do |key|​
  ​ @tree[key] = true​
  end
  end
  end
  ​​
  def intern(word)​
  ​ k, v = @tree.lower_bound([word, 0, ''])​
  return k[0] if k && k[0] == word​
  ​ word​
  end
  ​​
  def each_tree_key(fname, mtime, src)​
  ​ NKF.nkf('-w', src).scan(/\w+/m).uniq.each do |word|​
  yield([intern(word), mtime.to_i, fname])​
  end
  end
  end
  ​​
  if __FILE__ == $0​
  ​ indexer ||= Indexer.new(0)​
  ​ Thread.new do
  ​ indexer.update_dict​
  end
  ​​
  while line = gets​
  ​ ary = indexer.dict.query(line.chomp)​
  ​ pp ary​
  ​ pp ary.size​
  end
  end

In the indexer script, we used head, read_tag, and older methods. So far, we’ve covered the core functionalities of this search system. Let’s stop for a moment and discuss the architecture of this system.