ruby并发编程


ruby并发编程一般使用Thread实现,但是Thread默认使用时通过共享内存的使用的,即在子线程和主线程(或其他子线程)是get/set同一套变量的,不使用锁则会因数据竞争导致执行不可控,执行结果不正确:

counter = 0

threads = 10.times.map do
  Thread.new do
    1000.times do
      counter += 1  # 非原子操作:读、加、写
    end
  end
end

threads.each(&:join)
puts counter  # 期望是 10000,但结果可能小于 10000

但ruby的GIL保证只有一个线程执行,我运行了很久都没遇到小于10000的情况(但不能保证不会遇到)

使用锁又会导致性能低下,以及死锁等问题(多个共享资源、多个锁的情况);并发执行多个且有分支控制时,也会导致代码逻辑过于复杂,容易出bug且难以调试。

concurrent-ruby是一个并发编程的工具集,可以使用其提供的并发原语,方便地实现多线程编程。

Concurrent::Async

Async模块是一种将简单但强大的异步功能混合到任何普通的旧式 Ruby 对象或类中的方法,将每个对象变成一个简单的 Actor。方法调用在后台线程上处理。调用者可以在后台进行处理时自由执行其他操作。

require 'concurrent-ruby'
class A
  # 引入Async module
  include Concurrent::Async

  def say_it(word)
    raise 'a is wrong' if word == 'a'

    sleep(1)
    puts "say #{word}"
    true
  end
end
# 异步方式调用
a = A.new.async.say_it('a') #异步执行
# value可传递timeout参数->超时秒数,执行到此时如果异步任务执行完,则返回结果,正在执行,则等待(阻塞)
puts a.value # 调用失败时 value为nil,如果是value!方法直接抛出异常
puts a.reason # error

b = A.new.async.say_it('b')
puts b.value # true
puts b.reason # 错误为nil

# 阻塞方式调用
c = A.new.await.say_it('c')
puts c

执行的结果是一个IVar对象,能够检查执行状态,返回结果等

Concurrent::ScheduledTask & Concurrent::TimerTask

ScheduledTask为在指定的延迟后执行

require 'concurrent-ruby'
task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' } # 2秒延时
puts task.class
puts task.state         #=> :unscheduled
task.execute # 开始执行
puts task.state         #=> pending

# wait for it...
sleep(3)

puts task.unscheduled? #=> false
puts task.pending?     #=> false
puts task.fulfilled?   #=> true
puts task.rejected?    #=> false
puts task.value        #=> 'What does the fox say?'

TimerTask运行一个定期执行任务

require 'concurrent-ruby'

# execution_interval 执行间隔(秒数),默认60秒
# interval_type 间隔方式默认fixed_delay
#               fixed_delay: 上一次执行结束和下一次执行开始之间的间隔
#               fixed_rate:上一次执行开始到下一次开始执行的间隔(如果执行时间超过间隔,则下一次执行将在前一次执行完成后立即开始。不会并发运行)
#                 
task = Concurrent::TimerTask.new(execution_interval: 1, interval_type: :fixed_rate){ t =  Time.now.sec; raise "aaa" if t % 5 == 0 ; puts t; t }

# 观察者类
class TaskObserver
  # time 执行时间
  # result 执行结果
  # ex 异常
  def update(time, result, ex)
    if result
      print "(#{time}) Execution successfully returned #{result}\n"
    else
      print "(#{time}) Execution failed with error #{ex}\n"
    end
  end
end

# 添加观察者
task.add_observer(TaskObserver.new) # 调用其update方法

task.execute # 开始执行

# 异步执行,防止主线程结束
gets
# 关闭
task.shutdown

Concurrent::Promises

提供优雅的方式实现处理异步计算和任务链式执行。

require 'concurrent-ruby'

x =  Concurrent::Promises.
    future(2) { |v| raise 'aa' }.
    # 顺序执行
    then(&:succ).then{|x| x -= 1}.
    # 异常时
    rescue { |error| 999 }
puts x.result.inspect # 3



# 分支执行
head    = Concurrent::Promises.fulfilled_future(-1) # 
branch1 = head.then(&:abs).then(&:succ) # 分支1(绝对值 -> +1)
branch2 = head.then(&:succ).then(&:abs) # 分支2(+1 -> 绝对值)

puts head.value # -1
puts branch1.value # 2
puts branch2.value # 0

# 压缩分支
puts (branch1 & branch2).value.inspect # [2, 0]
# 任意分支
puts (branch1.then{raise 'a'} | branch2).value.inspect # 0

线程池:限制线程数量

require 'concurrent-ruby'

pool = Concurrent::FixedThreadPool.new(5) # 最大5 threads

# 可以指定最大,最小线程数,回收空闲时间,
# pool = Concurrent::ThreadPoolExecutor.new(
#    min_threads: 5, # 最小线程数
#    max_threads: 5, # 最大线程数
#    idletime: 60, # 回收空闲时间
#    max_queue: 0 # 最大队列大小
#    fallback_policy: :abort # 等待队列满时策略:abort(异常),discard(丢弃),caller_runs(调用者线程执行)
# )
# 使用线程池执行异步任务
promises = (1..10).map do |i|
  # 在pool上执行
  Concurrent::Promises.future_on(pool, i) do |i|
    sleep 1 # 模拟耗时操作
    raise "x" if i %2 == 0
    puts "Task #{i} completed by #{Thread.current.object_id}"
    i * 2
  end
end
puts pool.running? # true
# 等待所有任务完成并获取结果
results = promises.map(&:value) # [2, nil, 6, nil, 10, nil, 14, nil, 18, nil]
puts pool.running? # true
puts "Results: #{results}"

# pool.wait_for_termination # 等待执行完关闭(但测试时报错,未知原因)
puts pool.running? # true
pool.shutdown # 需要手动关闭

go风格channel

实验版本edge才有的功能。

require 'concurrent-edge'
# 输入channel,容量为2
in_c = Concurrent::Channel.new(capacity: 2)
# 输出channel
out_c = Concurrent::Channel.new(capacity: 2)

# 写入数据
Concurrent::Channel.go do
  10.times do |i|
    in_c.put(i)
  end
  in_c.close
end


Concurrent::Channel.go do
  loop do
    # 读取channel数据
    msg = ~ in_c
    break if msg.nil? # close时发送的nil数据

    out_c << (msg ** 2)
  end
  # 等效写法,each(忽略掉close的)
  # in_c.each do |msg|
  #  out_c << msg ** 2
  # end
  out_c.close # 关闭channel
end

# 读取
loop do
  v = ~out_c
  break if v.nil?
  puts v
end

Ractor

上面说的花里胡哨的,但是因为ruby的 GIL (Global Interpreter Lock),多线程其实在同一时间,只有一个在执行。 这就使多线程仅在IO阻塞时起到作用,多CPU其实是用不到的。 Ractor 是 Ruby 3 新引入的特性。Ractor 顾名思义是 Ruby 和 Actor 的组合词。Actor 模型是一个基于通讯的、非锁同步的并发模型。

NUM = 100000
THREAD_NUM = 4
BATCH_SIZE = NUM / THREAD_NUM 
def ractor_run(num)
  Ractor.new(num) do |start_index|
    sum = (start_index...start_index + BATCH_SIZE).inject(0) do |sum, i|
      sum += i ** 2
    end
    Ractor.yield(sum)
  end
end

def thread_run(num)
  Thread.new(num) do |start_index|
    (start_index...start_index+BATCH_SIZE).inject(0) do |sum, i|
      sum += i ** 2
    end
  end
end

def ractor_sum
  THREAD_NUM.times.map do |i|
    ractor_run(i * BATCH_SIZE)
  end.map(&:take).sum
end

def thread_sum
  THREAD_NUM.times.map do |i|
    thread_run(i * BATCH_SIZE)
  end.map{|t| t.join.value}.sum
end

def normal_sum
  (0...NUM).inject(0) do |sum, i|
    sum += i ** 2
  end
end

puts thread_sum

puts ractor_sum

puts normal_sum
require 'benchmark'

Benchmark.bmbm do |x|
  # sequential version
  x.report('normal'){ 100.times{normal_sum} }

  # parallel version with thread
  x.report('thread'){ 100.times{thread_sum}}
 
  # parallel version with ractors
  x.report('ractor'){ 100.times{ractor_sum} }
end

image.png 我们看到 ractor对比thread和normal提升超过三倍,而thread对比normal甚至稍慢。

但Ractor也是有代价的,Ractor之间时数据隔离的:

  • 只能通过消息传递(send 和 receive)进行通信。
  • 数据传递时,必须是 深拷贝 或 不可变 的对象(如数字、符号等)。

ractor内发送消息(from 1),最后发送4

r1 = Ractor.new {Ractor.yield 'from 1';puts 'go on';4}
 r1.take # from 1
# print go on
r1.take # get 4

ractor内接收消息,最后发送5

r1 = Ractor.new {msg = Ractor.receive;puts msg;5}
r1.send('ok') 
# print ok
r1.take # get 5

take未能接收到消息时,阻塞

可共享的object_id是一样的

one = '3'.freeze
r = Ractor.new(one) do |one|
  one
end

two = r.take
puts one.object_id # 60
puts two.object_id # 60

不可共享的object_id就不一样了(复制了一份)

one = []
r = Ractor.new(one) do |one|
  one
end

two = r.take
puts one.object_id # 80
puts two.object_id # 100

move 移动,这会将 对象移动到接收方,使发送方无法访问它。

one = []
r = Ractor.new do
  x = Ractor.receive
  x
end
r.send(one, move: true)

two = r.take
puts two.object_id # 60
# 移动了,不能再使用
puts one.object_id #  `method_missing': can not send any methods to a moved object (Ractor::MovedError)

结语

  • Concurrent::Async,可以以简单的方式实现异步调用(或修改已有代码为异步方式),
  • Concurrent::ScheduledTask & Concurrent::TimerTask能够制定延时执行和定期执行
  • Concurrent::Promises 实现链式调用、分支处理
  • thread pool 实现对并发的数量控制
  • go风格channel实现消息机制的异步调用
  • Ractor实现真正的并行。