iQONのバックエンドの非同期処理について(具体的な実装編)

はじめに

こんにちは、じゃがいもの皮はもっぱらキレイにむいて食べるエンジニアの村田です。 前回のエントリ iQONのバックエンドの非同期処理について ではざっくりとした方針とかを書きましたが今回は具体的な実装方法や運用方法などについて紹介したいと思います。

使用技術

iQONではResqueという仕組みを採用して、メール送信やDBの重たい更新処理などを非同期処理しています。 ResqueはRedisにキューを出し入れして遅延処理を実現する仕組みです。シンプルだし導入しやすいと思い採用しました。 このResqueの仕組みをdaemon-spawnというgemでデーモン化して運用しています。 イメージにするとこんな感じです。

f:id:vasilyjp:20160303221448p:plain

導入方法

1.Gemfileに記述

gem 'resque'
gem 'daemon-spawn', :require => 'daemon_spawn'
  1. initializersにRailsの環境ごとのredisサーバのホスト設定を記述

config/initializers/load_redis_config.rb

require 'resque'
if Rails.env.to_s == "production"
   Resque.redis = 'redis.production.iqon.jp:6379'
elsif Rails.env.to_s == "test"
  Resque.redis = 'redis.test.iqon.jp:6379'
else
  Resque.redis = 'redis.dev.iqon.jp:6379'
end

3.デーモンの起動処理を記述

script/resque_worker

#!/usr/bin/env ruby
require File.expand_path('../../config/application', __FILE__)
Rails.application.require_environment!
class ResqueWorkerDaemon < DaemonSpawn::Base
  def start(args)
    @worker = Resque::Worker.new('default')
    @worker.verbose = true
    @worker.work
  end
  def stop
  end
end

ResqueWorkerDaemon.spawn!({ 
  :working_dir => Rails.root, 
  :pid_file => File.join(Rails.root, 'tmp', 'pids', 'resque_worker.pid'), 
  :log_file => File.join(Rails.root, 'log', 'resque_worker.log'), 
  :sync_log => true, 
  :singleton => true, 
  :signal => 'QUIT' }
) 

このスクリプトにstart/stop/restartなど渡して実行することでデーモンを操作します。 例えばこんな感じです。

起動 script/resque_worker start

停止 script/resque_worker stop

  1. キューをredisにエンキューする処理を実装する

エンキューする処理はcontrollerでもmodelの中でも好きなタイミングで問題ないと思います。 大切なのはどのworker(あとでキューを処理する処理、つまり非同期で動く処理)にどんなパラメータを投げるかを指定するところです。

#TestResqueWorkerにparamsを渡す
Resque.enqueue(TestResqueWorker, params) 
  1. キューを処理するワーカーを実装する

app/worker/test_resque_worker.rb

class TestResqueWorker
  @queue = :default
  def self.perform(params)
    #DBの更新処理やメール送信などの重たい処理を実装する
  end
end

具体例

ここまでは導入する最低限の概要でしたが、実際のiQONでどのようにしているかを一部紹介したいと思います。 例として、コーディネートモデルからキューを発行して、コーディネートに使われている各アイテムのブランドのコーディネート数を更新していく処理を見てみます。

  1. キューの発行

コントローラからキューを発行してみます

app/controller/coordinate.rb

class CoordinateController < ApplicationController
  def update_brand_coordinate_count
    #params[:brands]は123,235,12345のようにカンマ区切りの文字列が入っている    
    Resque.enqueue(UpdateBrandCoordinateCountWorker, {:brands => params[:brands]})
  end
end
  1. キューを処理する

app/worker/update_brand_coordinate_count_worker.rb

class UpdateBrandCoordinateCountWorker
  @queue = :default
  def self.perform(params)
    brand_id_list = params["brands"].split(",")
    brand_id_list.each do | brand_id |
      #brandテーブルのコーディネート数を更新していく
      if Brand.where(:brand_id => brand_id, :delete_flag => 0).exists?
        Brand.increment_counter(:coordinate_count, brand_id)
      end
    end
  end
end

このような形でResque.enqueueで非同期で処理させたい内容(どのworkerクラスに処理させるかを第一引数に、パラメータを第二引数に設定)をキューとしてRedisにエンキューします。 それを受けてUpdateBrandCoordinateCountWorkerがパラメータを受け取り実際の処理をしていくという流れになります。 Redisサーバを用意して、Railsアプリケーションにこれだけ実装すれば非同期処理を実現できるのは手軽で便利ですね。

気をつけること

ここまでで導入と具体例を紹介させて頂きましたが、実際に開発、運用をしていく上で困ったことと対応方法を紹介したいと思います。

  1. workerが受け取るハッシュのキーはシンボルではなく文字列になる

サンプルコードでも書いてありますが、キューを発行するときにシンボルをキーにしたハッシュを渡していいますが、キューを処理するworkerの中では文字列をキーにしたハッシュとしてパラメータにアクセスしています。 これはResqueがキューを入れるときに内部的にto_jsonをしているからのようです。 開発を始めたときは結構ハマったのでこれから導入を考えている方は気をつけたほうがいいと思います。

  1. workerがキューを処理中にdaemonをリスタートした時に中断されてしまう問題

この問題も最初はハマりましたが最初に紹介させて頂いたResqueWorkerDaemon.spawnで :signal => 'QUIT' という指定することで対応できます。 これはworkerが現在処理している内容をすべて処理し終わってからworkerプロセスを終了するというオプションになります。

  1. 特定のキューだけ優先的に処理したい場合

優先度の低い重たいキューがたまりすぎると優先度の高い軽いキューが詰まってしまう問題ですね。

class FirstPriorityWorker
  @queue = :first_priority
  #省略
end

class SecondPriorityWorker
  @queue = :second_priority
  #省略
end

class ResqueWorkerDaemon < DaemonSpawn::Base
  @worker = Resque::Worker.new(:first_priority, :second_priority)
  #省略
end

このように設定するとfirst_priorityのキューを優先的に処理することが可能です。 キューに対して優先度を設定したい場合などはこのように設定すればできると思います。

最後に

Resqueはかなり簡単に非同期処理を実現できるので素晴らしいのですが、なんでもかんでも非同期処理にしてしまうと問題になることもあるのでそこは見極めが必要です。 あとは前述した注意点を守れば導入はスムーズに運用していくことができるかと思います。