Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 79 additions & 66 deletions lib/faye/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,37 +63,40 @@ def disconnect
end

def create_client(&callback)
init
client_id = @server.generate_id
@redis.zadd(@ns + '/clients', 0, client_id) do |added|
next create_client(&callback) if added == 0
@server.debug 'Created new client ?', client_id
ping(client_id)
@server.trigger(:handshake, client_id)
callback.call(client_id)
with_redis do
client_id = @server.generate_id
@redis.zadd(@ns + '/clients', 0, client_id) do |added|
next create_client(&callback) if added == 0
@server.debug 'Created new client ?', client_id
ping(client_id)
@server.trigger(:handshake, client_id)
callback.call(client_id)
end
end
end

def client_exists(client_id, &callback)
init
cutoff = get_current_time - (1000 * 1.6 * @server.timeout)
with_redis do
cutoff = get_current_time - (1000 * 1.6 * @server.timeout)

@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score.to_i > cutoff)
@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score.to_i > cutoff)
end
end
end

def destroy_client(client_id, &callback)
init
@redis.zadd(@ns + '/clients', 0, client_id) do
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
i, n = 0, channels.size
next after_subscriptions_removed(client_id, &callback) if i == n

channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
after_subscriptions_removed(client_id, &callback) if i == n
with_redis do
@redis.zadd(@ns + '/clients', 0, client_id) do
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
i, n = 0, channels.size
next after_subscriptions_removed(client_id, &callback) if i == n

channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
after_subscriptions_removed(client_id, &callback) if i == n
end
end
end
end
Expand All @@ -112,75 +115,80 @@ def after_subscriptions_removed(client_id, &callback)
end

def ping(client_id)
init
timeout = @server.timeout
return unless Numeric === timeout
with_redis do
timeout = @server.timeout
return unless Numeric === timeout

time = get_current_time
@server.debug 'Ping ?, ?', client_id, time
@redis.zadd(@ns + '/clients', time, client_id)
time = get_current_time
@server.debug 'Ping ?, ?', client_id, time
@redis.zadd(@ns + '/clients', time, client_id)
end
end

def subscribe(client_id, channel, &callback)
init
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel) do |added|
@server.trigger(:subscribe, client_id, channel) if added == 1
end
@redis.sadd(@ns + "/channels#{channel}", client_id) do
@server.debug 'Subscribed client ? to channel ?', client_id, channel
callback.call if callback
with_redis do
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel) do |added|
@server.trigger(:subscribe, client_id, channel) if added == 1
end
@redis.sadd(@ns + "/channels#{channel}", client_id) do
@server.debug 'Subscribed client ? to channel ?', client_id, channel
callback.call if callback
end
end
end

def unsubscribe(client_id, channel, &callback)
init
@redis.srem(@ns + "/clients/#{client_id}/channels", channel) do |removed|
@server.trigger(:unsubscribe, client_id, channel) if removed == 1
end
@redis.srem(@ns + "/channels#{channel}", client_id) do
@server.debug 'Unsubscribed client ? from channel ?', client_id, channel
callback.call if callback
with_redis do
@redis.srem(@ns + "/clients/#{client_id}/channels", channel) do |removed|
@server.trigger(:unsubscribe, client_id, channel) if removed == 1
end
@redis.srem(@ns + "/channels#{channel}", client_id) do
@server.debug 'Unsubscribed client ? from channel ?', client_id, channel
callback.call if callback
end
end
end

def publish(message, channels)
init
@server.debug 'Publishing message ?', message
with_redis do
@server.debug 'Publishing message ?', message

json_message = MultiJson.dump(message)
channels = Channel.expand(message['channel'])
keys = channels.map { |c| @ns + "/channels#{c}" }
json_message = MultiJson.dump(message)
channels = Channel.expand(message['channel'])
keys = channels.map { |c| @ns + "/channels#{c}" }

@redis.sunion(*keys) do |clients|
clients.each do |client_id|
queue = @ns + "/clients/#{client_id}/messages"
@redis.sunion(*keys) do |clients|
clients.each do |client_id|
queue = @ns + "/clients/#{client_id}/messages"

@server.debug 'Queueing for client ?: ?', client_id, message
@redis.rpush(queue, json_message)
@redis.publish(@message_channel, client_id)
@server.debug 'Queueing for client ?: ?', client_id, message
@redis.rpush(queue, json_message)
@redis.publish(@message_channel, client_id)

client_exists(client_id) do |exists|
@redis.del(queue) unless exists
client_exists(client_id) do |exists|
@redis.del(queue) unless exists
end
end
end
end

@server.trigger(:publish, message['clientId'], message['channel'], message['data'])
@server.trigger(:publish, message['clientId'], message['channel'], message['data'])
end
end

def empty_queue(client_id)
return unless @server.has_connection?(client_id)
init
with_redis do

key = @ns + "/clients/#{client_id}/messages"
key = @ns + "/clients/#{client_id}/messages"

@redis.multi
@redis.lrange(key, 0, -1)
@redis.del(key)
@redis.exec.callback do |json_messages, deleted|
next unless json_messages
messages = json_messages.map { |json| MultiJson.load(json) }
@server.deliver(client_id, messages)
@redis.multi
@redis.lrange(key, 0, -1)
@redis.del(key)
@redis.exec.callback do |json_messages, deleted|
next unless json_messages
messages = json_messages.map { |json| MultiJson.load(json) }
@server.deliver(client_id, messages)
end
end
end

Expand Down Expand Up @@ -235,6 +243,11 @@ def with_lock(lock_name, &block)
end
end

def with_redis
init
EventMachine.next_tick { yield }
end

end
end