QA@IT

redis-rb で subscribe 中にブロック外から subscribe 先の追加

3346 PV

イメージとしてはこういう事がしたいんですが、 (現状は synchronize でブロックされる。最初に Redis#subscribe に渡したブロック内だとブロックされない)

redis = Redis.new
Thread.new do
  redis.subscribe do |on|
    on.message {|ch, msg| p [ch, msg] }
  end
end
redis.subscribe("a")
Redis.new.publish("a", "hello")
redis.unsubscribe("a")
Redis.new.publish("a", "bye")
#=> ["a", "hello"]

こういう方法しかないんでしょうか:

require 'redis'

th = Thread.new(Redis.new) do |redis|
  loop do
    redis.publish('time', Time.now.to_s)
    sleep 1
  end
end

# Redis client for subscription
sub_redis = Redis.new
class << sub_redis
  def subscribe_add(*channels)
    client.instance_variable_get(:@client).write([["SUBSCRIBE", *channels.map(&:to_s)]])
  end

  def subscribe_rem(*channels)
    client.instance_variable_get(:@client).write([["UNSUBSCRIBE", *channels.map(&:to_s)]])
  end
end

sub_redis_ready = false

subscription_thread = Thread.new do
  sub_redis.subscribe("dummy") do |on|
    on.subscribe do
      sub_redis_ready = true
    end

    on.message do |ch, msg|
      p [ch, msg]
    end
  end
end

nil until sub_redis_ready
sub_redis.subscribe_add('time')
sleep 10
sub_redis.subscribe_rem('time')
th.join

綺麗な方法があれば教えてください。

回答

redisのインスタンス(=redisへの接続)は、複数のスレッドで共有しないものだと思います。たとえば、forkして親子プロセスで同じ接続が共有されているとInheritedErrorになります。

メインスレッドと、Thread.newした側と、両方でRedis.newして、計2本の接続を持たせるのでどうですか?

質問の意図を勘違いしてたので追記

メインスレッド側からワーカー側のsubscriptionをコントロールしたいという話だったのですね。。。勘違いしてましたすみません。

うーん、その場合には、subscriptionを追加させるためのトピック・メッセージを組み込む、つまり制御用のトピックに必ずsubscribeさせておくという感じですかね。ただ、pub/subの設計って、そもそも疎結合のための仕組みだと思うので、メインスレッド側からワーカーを直接コントロールしたいという要件自体が、あまりpub/sub向きではない(ほかの通信方法のほうが向いてる)という気がします。

編集 履歴 (1)
  • (結局別に一つのチャンネルでまとめる事にしましたが)あのthreadで動いてるredisインスタンスのsubscribe先を追加したい!という需要でした。 -
  • 追記が入れ違いになっちゃいましたね。。。別の制御用チャンネルを追加するというのは、pub/subでは比較的よくあるパターンの一つではあると思います。ただ、追記したように、実はcode smellでもある、という点にも注意したいですね。 -
ウォッチ

この質問への回答やコメントをメールでお知らせします。