10.2 Examining Each Component
The search system consists of three components, Crawler
, Indexer
, and Drip
, as a messaging layer. In this section, we will examine each component.
How System Elements Are Structured
Let’s start by looking at the objects and tags that this system writes to Drip. We’ll mainly use Drip as the file update notification system.
-
File update notification: Consists of an array of
[’file name’, ’content’, ’timestamp’]
. We also use two tags:rbcrawl
andrbcrawl-fname=file name
. The crawler writes this information every time updated files are found. This not only archives the file content but also notifies the update event. The indexer updates the index every time it receives the notification.
There are also some optional tags:
-
Footprint of the crawler: Writes the list of filenames and timestamp of the update. It uses the
rbcrawl-footprint
tag. -
Anchor tag to declare the start of the experiment: Uses the
rbcrawl-begin
tag. You can write anything with this tag when you repeat this experiment and you want to reset from scratch.
Next, let’s see how to use these objects and tags.
How the Crawler Works
Here’s the basic mechanism of this simple crawler:
class Crawler |
|
include MonitorMixin |
|
|
|
def initialize |
|
super() |
|
@root = File.expand_path('~/develop/git-repo/') |
|
@drip = MyDrip |
|
k, = @drip.head(1, 'rbcrawl-begin')[0] |
|
@fence = k || 0 |
|
end |
|
|
|
def last_mtime(fname) |
|
k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0] |
|
(v && k > @fence) ? v[1] : Time.at(1) |
|
end |
|
|
|
def do_crawl |
|
synchronize do |
|
ary = [] |
|
Dir.chdir(@root) |
|
Dir.glob('**/*.rb').each do |fname| |
|
mtime = File.mtime(fname) |
|
next if last_mtime(fname) >= mtime |
|
@drip.write([fname, mtime, File.read(fname)], |
|
'rbcrawl', 'rbcrawl-fname=' + fname) |
|
ary << fname |
|
end |
|
@drip.write(ary, 'rbcrawl-footprint') |
|
ary |
|
end |
|
end |
|
|
|
def quit |
|
synchronize do |
|
exit(0) |
|
end |
|
end |
|
end |
First the crawler tries to find files that match the *.rb
wildcard under (@root
). Once found, it checks the update time and writes the content and timestamp if the file has been updated since the last crawl. This will write the following data:
@drip.write( |
|
["sample/demo4book/index.rb", 2011-08-23 23:50:44 +0100, "file content"], |
|
"rbcrawl", "rbcrawl-fname=sample/demo4book/index.rb" |
|
) |
The value is an array of a filename, timestamp, and file content. The value has two tags.
As explained earlier, the crawler runs every sixty seconds and ends after crawling if you type something from standard input. When the crawler terminates, it will write the list of filenames with an rbcrawl-footprint
tag. Here’s the example data:
@drip.write(["sample/demo4book/index.rb"], 'rbcrawl-footprint') |
This version of the crawler doesn’t update the file deletion information, but you may be able to add that functionality by using this information.
How do you find out if the file is updated? You can get the previous version with the head
method and compare with it.
Or, you can search the latest file version by passing rbcrawl-fname=file name
into the head
method.
k, v = @drip.head(1, "rbcrawl-fname=sample/demo4book/index.rb")[0] |
Here is the complete crawler code:
crawl.rb | |
require 'pp' |
|
require 'my_drip' |
|
require 'monitor' |
|
|
|
class Crawler |
|
include MonitorMixin |
|
|
|
def initialize |
|
super() |
|
@root = File.expand_path('~/develop/git-repo/') |
|
@drip = MyDrip |
|
k, = @drip.head(1, 'rbcrawl-begin')[0] |
|
@fence = k || 0 |
|
end |
|
|
|
def last_mtime(fname) |
|
k, v, = @drip.head(1, 'rbcrawl-fname=' + fname)[0] |
|
(v && k > @fence) ? v[1] : Time.at(1) |
|
end |
|
|
|
def do_crawl |
|
synchronize do |
|
ary = [] |
|
Dir.chdir(@root) |
|
Dir.glob('**/*.rb').each do |fname| |
|
mtime = File.mtime(fname) |
|
next if last_mtime(fname) >= mtime |
|
@drip.write([fname, mtime, File.read(fname)], |
|
'rbcrawl', 'rbcrawl-fname=' + fname) |
|
ary << fname |
|
end |
|
@drip.write(ary, 'rbcrawl-footprint') |
|
ary |
|
end |
|
end |
|
|
|
def quit |
|
synchronize do |
|
exit(0) |
|
end |
|
end |
|
end |
|
|
|
if __FILE__ == $0 |
|
crawler = Crawler.new |
|
Thread.new do |
|
while true |
|
pp crawler.do_crawl |
|
sleep 60 |
|
end |
|
end |
|
|
|
gets |
|
crawler.quit |
|
end |
So far, we implemented the crawler script using write
and head
methods.
Next, we will look into how to implement the indexer script.
How the Indexer Works
The indexer provides index creation, update, and searching. It returns the list of filenames that include the word you are searching for. Since this sample is a miniature version, it creates the index in memory using the RBTree
library.
class Indexer |
|
def initialize(cursor=0) |
|
@drip = MyDrip |
|
@dict = Dict.new |
|
k, = @drip.head(1, 'rbcrawl-begin')[0] |
|
@fence = k || 0 |
|
@cursor = [cursor, @fence].max |
|
end |
|
attr_reader :dict |
|
|
|
① | def update_dict |
each_document do |cur, prev| |
|
@dict.delete(*prev) if prev |
|
@dict.push(*cur) |
|
end |
|
end |
|
|
|
def each_document |
|
while true |
|
ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1) |
|
ary.each do |k, v| |
|
prev = prev_version(k, v[0]) |
|
yield(v, prev) |
|
@cursor = k |
|
end |
|
end |
|
end |
|
|
|
② | def prev_version(cursor, fname) |
k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname) |
|
(v && k > @fence) ? v : nil |
|
end |
|
end |
The indexer takes out objects with the rbcrawl
tag and updates the index.
@drip.read_tag(@cursor, 'rbcrawl', 10, 1) |
The fourth argument, 1, is very important. Do you remember that I said earlier if elements newer than the specified keys don’t have at_least
elements, then it will block until enough number of elements arrive? The preceding code requests ten elements at a time with a minimum of one element, so it will block if there are no elements to return. This enables the indexer to wait until the crawler writes data with an rbcrawl
tag.
Objects with rbcrawl
are notification events as well as documents on their own, containing the filename, the updated time, and their content. Unlike Queue
, Drip lets you read the elements you already read repeatedly. You can check the previous version using the older
method, as shown in ②.
If Indexer
already has a previous document of the files that are updated, it will remove the old content first and then add an index with the new content, as shown in ①.
Once you start Indexer
, the script will generate a thread and start creating an index under the subthread by dealing with data using Drip.read_tag
.
indexer ||= Indexer.new(0) |
|
Thread.new do |
|
indexer.update_dict |
|
end |
The main thread of the script awaits input from a user. It searches the word and prints out the result once the question is received from the input.
while line = gets |
|
ary = indexer.dict.query(line.chomp) |
|
pp ary |
|
pp ary.size |
|
end |
Here’s the complete Indexer
:
index.rb | |
require 'nkf' |
|
require 'rbtree' |
|
require 'my_drip' |
|
require 'monitor' |
|
require 'pp' |
|
|
|
class Indexer |
|
def initialize(cursor=0) |
|
@drip = MyDrip |
|
@dict = Dict.new |
|
k, = @drip.head(1, 'rbcrawl-begin')[0] |
|
@fence = k || 0 |
|
@cursor = [cursor, @fence].max |
|
end |
|
attr_reader :dict |
|
|
|
def update_dict |
|
each_document do |cur, prev| |
|
@dict.delete(*prev) if prev |
|
@dict.push(*cur) |
|
end |
|
end |
|
|
|
def each_document |
|
while true |
|
ary = @drip.read_tag(@cursor, 'rbcrawl', 10, 1) |
|
ary.each do |k, v| |
|
prev = prev_version(k, v[0]) |
|
yield(v, prev) |
|
@cursor = k |
|
end |
|
end |
|
end |
|
|
|
def prev_version(cursor, fname) |
|
k, v = @drip.older(cursor, 'rbcrawl-fname=' + fname) |
|
(v && k > @fence) ? v : nil |
|
end |
|
end |
|
|
|
class Dict |
|
include MonitorMixin |
|
def initialize |
|
super() |
|
@tree = RBTree.new |
|
end |
|
|
|
def query(word) |
|
synchronize do |
|
@tree.bound([word, 0, ''], [word + "\0", 0, '']).collect {|k, v| k[2]} |
|
end |
|
end |
|
|
|
def delete(fname, mtime, src) |
|
synchronize do |
|
each_tree_key(fname, mtime, src) do |key| |
|
@tree.delete(key) |
|
end |
|
end |
|
end |
|
def push(fname, mtime, src) |
|
synchronize do |
|
each_tree_key(fname, mtime, src) do |key| |
|
@tree[key] = true |
|
end |
|
end |
|
end |
|
|
|
def intern(word) |
|
k, v = @tree.lower_bound([word, 0, '']) |
|
return k[0] if k && k[0] == word |
|
word |
|
end |
|
|
|
def each_tree_key(fname, mtime, src) |
|
NKF.nkf('-w', src).scan(/\w+/m).uniq.each do |word| |
|
yield([intern(word), mtime.to_i, fname]) |
|
end |
|
end |
|
end |
|
|
|
if __FILE__ == $0 |
|
indexer ||= Indexer.new(0) |
|
Thread.new do |
|
indexer.update_dict |
|
end |
|
|
|
while line = gets |
|
ary = indexer.dict.query(line.chomp) |
|
pp ary |
|
pp ary.size |
|
end |
|
end |
In the indexer script, we used head
, read_tag
, and older
methods.
So far, we’ve covered the core functionalities of this search system. Let’s stop for a moment and discuss the architecture of this system.