Redis::DistMutex - 時限付き分散ロックで効率良くサイトクロールをしよう

はじめまして。バックエンドエンジニアの吉田です。 2013年5月末の入社以降、大量のEC2インスタンスのVPC移行を担当した後、今はiQONの商品DBを支えるクローラーの改善に取り組んでいます。今回はその改善の1つとして開発したRedis::DistMutexという分散ロック機構のruby実装を紹介をしようと思います。

Redis::DistMutex

開発の経緯や細かい設計の話は後述するとして、まずはつくったgemの紹介をします。

Redis::DistMutex

  • Redisベースの分散ロック機構
  • rubyのライブラリにあるMutex互換
  • スレッド間だけでなく、プロセス間・ホスト間でも共有できるMutex
  • 時限つきロックの作成が可能(redisのsetnxとexpireを活用)
  • namespaceを指定できるので、特定の処理ごとにロックの作成が可能
  • redis2.6以上のみサポート(1秒以下のexpireの指定が可能なpexpire依存するため)

サンプルコード

sample.rb
require 'redis'
require 'redis-dist-mutex'

Redis::DistMutex.redis = Redis.new
mutex = Redis::DistMutex.new(:test_app, expire: 1, auto_release: false)
mutex.synchronize { puts 'started'; sleep rand(5) }

これを5つ以上の複数スレッドや複数プロセスで同時実行すると、'Kernel.sleep(n)'の長さに影響されることなく全体として一定間隔でsyncrhonizeに渡されたブロック内の処理が実行されます。

複数スレッドから使うサンプル

  • スレッドごとにsynchronize内でランダムにKernel.sleepを実行
  • syncrhonizeに渡すブロック内でRedisにIDと現在時刻を記録
test_mutex.rb
require 'redis'
require 'redis'
require 'redis-dist-mutex'

class TestMutex
  def initialize
    @redis = Redis.new
    Redis::DistMutex.redis = @redis
    @mutex = Redis::DistMutex.new :test_app, expire: 1, autorelease: false
  end

  def create_thread(id)
    Thread.new do
      now = -> { sprintf('%.1f', Time.now.to_f) }
      @redis.lpush('hoge_test:start', "#{id}:#{now.call}")
      @mutex.synchronize { @redis.lpush('hoge_test:end', "#{id}:#{now.call}"); sleep 2 + rand }
    end
  end

  def start
    puts '-' * 40
    @redis.del('hoge_test:start')
    @redis.del('hoge_test:end')
    5.times.map { |id| create_thread(id) }.each(&:join)
    puts "Start: #{@redis.lrange('hoge_test:start', 0, -1)}"
    puts "End  : #{@redis.lrange('hoge_test:end', 0, -1)}"
  end
end

if $0 == __FILE__
  test = TestMutex.new
  3.times.each { |i| test.start }
end

実行してみると、スレッドのIDは実行順とは無関係に記録されていて、それぞれ約1秒おき記録されています。

$ bundle exec ruby ./test_mutex.rb
----------------------------------------
Start: ["0:1385368596.6", "1:1385368596.6", "2:1385368596.6", "4:1385368596.6", "3:1385368596.6"]
End : ["0:1385368600.6", "4:1385368599.6", "1:1385368598.6", "2:1385368597.6", "3:1385368596.6"]
----------------------------------------
Start: ["0:1385368605.0", "1:1385368605.0", "2:1385368605.0", "4:1385368605.0", "3:1385368605.0"]
End : ["4:1385368609.0", "0:1385368608.0", "1:1385368607.0", "2:1385368606.0", "3:1385368605.0"]
----------------------------------------
Start: ["0:1385368613.4", "1:1385368613.4", "4:1385368613.4", "3:1385368613.4", "2:1385368613.4"]
End : ["3:1385368617.4", "0:1385368616.4", "1:1385368615.4", "4:1385368614.4", "2:1385368613.4"]

実行時に別のターミナルで'redis-cli monitor|grep expire'を実行しておくと、1秒おきにロックが取得されるのがよくわかると思います。

initialize時のオプション

  • 'expire: n'を設定しないとブロックの実行が完了次第、次の処理に移る
    • rubyのMutexと同じ動作
  • 'autorelease'はデフォルトtrueで、'false'を設定しない限り、処理が同期的になる
    • 'true'だとsyncrhonize終了時に必ずロックを開放
  • 今回の要件では、'expire: 1, autorelease: false'を指定
    • そうすることで、ロック取得から1秒後に必ずロックが開放される

サイトクロール時の問題点

iQON掲載商品数 200万件以上

iOONでは大量の商品データを保有しているのですが、それらは提携しているECサイトをクロールしてDBに取り込んでいます。さらに、いったん貯めたアイテムの在庫情報も定期的に再クロールして更新しているので、相当数の商品ページを効率的にクロールする仕組みが必要になります。

クローラー最大のボトルネックはネットワークIO

クローラーの処理は、大きく"ダウンロード"と"解析"の2つに分けられます。 ページの解析については、サーバーの性能やコードの見直しで高速化が容易です。一方、ページのダウンロードについては外部サイトの性能に依存するため、効率化が難しくなります。これを効率的に行う設計が必要になります。 既存のクローラーはサイトの性能にかかわらず直列でダウンロードして、一定時間sleepしていました。そのように素直に直列でダウンロードした場合、レスポンスタイムに依存してリクエスト間隔がばらばらになります。1ページ5000msかかると1000ページで5000秒(83分20秒)かかる計算です。逆に、200msで返してくれるサイトの場合は1000ページを200秒(3分20秒)で処理できますが、5req/secの負荷がかかります。 このようなサイトごとのレスポンス性能の差異に関係なく一定間隔でページをリクエストできれば、全体としてクロール効率を向上させ、対象サイトへの負荷も軽減することができます。 今回の新クローラー開発では、そのような問題にも取り組みました。

Resqueによるダウンロード処理の並列化

今回開発したクローラーは、起点となるページから抽出した商品詳細ページのURLをResqueにenqueueし、workerはそのURLのページをダウンロードしてDBに保存し、ページ解析用のworkerに処理を引き継ぎます。この時、worker数を増やせばそれだけ並列度を上げて効率よくダウンロードできるのですが、前述した通り、増やしすぎると対象サイトに負荷をかけることになります。 また、何らかの障害によってworkerがdequeueしていなかった場合、ダウンロードキューが大量に蓄積されます。障害から復旧した後でworkerを起動すると、蓄積されたキューが一気に処理されるため、worker数だけ同時にHTTPリクエストする状態が一定時間継続することになります。これはクロール対象サイト対してDoS攻撃しているようなものです。 そこで、このクロール頻度の制御に有効な手段として、時限付き分散ロック(redis-dist-mutex)を導入しました。

時限付き分散ロック

Mutex(=Binary Semaphore)

Rubyにも同梱されている"Mutex"は、wikipediaではこう説明されています。

クリティカルセクションでアトミック性を確保するための同期機構の一種である。

つまり、複数スレッドやプロセス間の排他制御に使用するロック機構の一種であり、ロックを取得したものだけが処理をすすめられ、取得できなければ開放されるまで待つことになります。 ただし、今回の要件では厳密な排他制御は必要なく、全体として一定間隔での実行を保証できれば十分です。ロック取得から一定以上の時間が経過したらそれ以上の排他制御は必要ありません。 そのような仕組みがあることによって、各worker内のHTTPリクエスト開始のタイミングのみの制御が可能になります。(※スケジューラーに近いイメージですが、スレッドやプロセス同士での協調動作となる点でスケジューラーとは異なります。) このように時限付き分散ロックを用意しておけば、複数のResqueWorkerから特定のサイトHTTPリクエストが集中することを防止できるようになります。例えば1リクエストに3秒かかるサイトに対しては3つのworkerが起動していれば、1秒おきにHTTPリクエストを発生させることができます。仮に1秒間隔でリクエストすることが条件ならば、起動しておくべきworker数は以下の式で算出できます。

worker数 >= 平均レスポンスタイム(秒) + α

ちなみに、iQONのクローラーのざっくりした構成は以下の図のようになっています。 実際にはiQONのクローラーは複数サイトをクロールするので、サイトごとにqueueとmutexを用意して、複数のworkerから全サイトに対して効率よく処理されるように設計を工夫しています。各workerはそれら複数のダウンロードキューをランダムな優先順位でlistenすれば、全体のバランスがとれます。このあたりの設計の話については別の機会に紹介したいと思います。 以上、Redis::DistMutexと、それを使ったクローラー事例の紹介でした。

最後に

今回紹介したものはiQONのクローラーのほんの一部分であり、iQONというサービスを支えるためのサーバーサイド技術は多岐にわたります。クローラーにかかわらずiQONを支えるサーバーサイド技術に興味のある方はぜひ恵比寿にあるオフィスに直接遊びにきてください。VASILYでは現在、一緒に働いてくれるエンジニアを大募集しています。