|
1 | | -require 'concurrent/atomic/thread_local_var/weak_key_map' |
| 1 | +require 'thread' |
2 | 2 |
|
3 | 3 | module Concurrent |
4 | 4 |
|
@@ -33,169 +33,132 @@ module Concurrent |
33 | 33 | # |
34 | 34 | # @see https://docs.oracle.com/javase/7/docs/api/java/lang/ThreadLocal.html Java ThreadLocal |
35 | 35 | # |
36 | | - # @!visibility private |
37 | | - class AbstractThreadLocalVar |
| 36 | + class ThreadLocalVar |
| 37 | + |
| 38 | + # Each thread has a (lazily initialized) array of thread-local variable values |
| 39 | + # Each time a new thread-local var is created, we allocate an "index" for it |
| 40 | + # For example, if the allocated index is 1, that means slot #1 in EVERY |
| 41 | + # thread's thread-local array will be used for the value of that TLV |
| 42 | + # |
| 43 | + # The good thing about using a per-THREAD structure to hold values, rather |
| 44 | + # than a per-TLV structure, is that no synchronization is needed when |
| 45 | + # reading and writing those values (since the structure is only ever |
| 46 | + # accessed by a single thread) |
| 47 | + # |
| 48 | + # Of course, when a TLV is GC'd, 1) we need to recover its index for use |
| 49 | + # by other new TLVs (otherwise the thread-local arrays could get bigger |
| 50 | + # and bigger with time), and 2) we need to null out all the references |
| 51 | + # held in the now-unused slots (both to avoid blocking GC of those objects, |
| 52 | + # and also to prevent "stale" values from being passed on to a new TLV |
| 53 | + # when the index is reused) |
| 54 | + # Because we need to null out freed slots, we need to keep references to |
| 55 | + # ALL the thread-local arrays -- ARRAYS is for that |
| 56 | + # But when a Thread is GC'd, we need to drop the reference to its thread-local |
| 57 | + # array, so we don't leak memory |
38 | 58 |
|
39 | 59 | # @!visibility private |
40 | 60 | NIL_SENTINEL = Object.new |
41 | | - private_constant :NIL_SENTINEL |
| 61 | + FREE = [] |
| 62 | + LOCK = Mutex.new |
| 63 | + ARRAYS = {} # used as a hash set |
| 64 | + @@next = 0 |
| 65 | + private_constant :NIL_SENTINEL, :FREE, :LOCK, :ARRAYS |
42 | 66 |
|
43 | | - # @!macro [attach] thread_local_var_method_initialize |
44 | | - # |
45 | 67 | # Creates a thread local variable. |
46 | 68 | # |
47 | 69 | # @param [Object] default the default value when otherwise unset |
48 | 70 | def initialize(default = nil) |
49 | 71 | @default = default |
50 | | - allocate_storage |
| 72 | + @index = LOCK.synchronize do |
| 73 | + FREE.pop || begin |
| 74 | + result = @@next |
| 75 | + @@next += 1 |
| 76 | + result |
| 77 | + end |
| 78 | + end |
| 79 | + ObjectSpace.define_finalizer(self, self.class.threadlocal_finalizer(@index)) |
51 | 80 | end |
52 | 81 |
|
53 | | - # @!macro [attach] thread_local_var_method_get |
54 | | - # |
55 | 82 | # Returns the value in the current thread's copy of this thread-local variable. |
56 | 83 | # |
57 | 84 | # @return [Object] the current value |
58 | 85 | def value |
59 | | - value = get |
60 | | - |
61 | | - if value.nil? |
62 | | - @default |
63 | | - elsif value == NIL_SENTINEL |
64 | | - nil |
| 86 | + if array = Thread.current[:__threadlocal_array__] |
| 87 | + value = array[@index] |
| 88 | + if value.nil? |
| 89 | + @default |
| 90 | + elsif value.equal?(NIL_SENTINEL) |
| 91 | + nil |
| 92 | + else |
| 93 | + value |
| 94 | + end |
65 | 95 | else |
66 | | - value |
| 96 | + @default |
67 | 97 | end |
68 | 98 | end |
69 | 99 |
|
70 | | - # @!macro [attach] thread_local_var_method_set |
71 | | - # |
72 | 100 | # Sets the current thread's copy of this thread-local variable to the specified value. |
73 | | - # |
| 101 | + # |
74 | 102 | # @param [Object] value the value to set |
75 | 103 | # @return [Object] the new value |
76 | 104 | def value=(value) |
77 | | - bind value |
78 | | - end |
79 | | - |
80 | | - # @!macro [attach] thread_local_var_method_bind |
81 | | - # |
82 | | - # Bind the given value to thread local storage during |
83 | | - # execution of the given block. |
84 | | - # |
85 | | - # @param [Object] value the value to bind |
86 | | - # @yield the operation to be performed with the bound variable |
87 | | - # @return [Object] the value |
88 | | - def bind(value, &block) |
89 | | - if value.nil? |
90 | | - stored_value = NIL_SENTINEL |
91 | | - else |
92 | | - stored_value = value |
| 105 | + me = Thread.current |
| 106 | + # We could keep the thread-local arrays in a hash, keyed by Thread |
| 107 | + # But why? That would require locking |
| 108 | + # Using Ruby's built-in thread-local storage is faster |
| 109 | + unless array = me[:__threadlocal_array__] |
| 110 | + array = me[:__threadlocal_array__] = [] |
| 111 | + LOCK.synchronize { ARRAYS[array.object_id] = array } |
| 112 | + ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array)) |
93 | 113 | end |
94 | | - |
95 | | - set(stored_value, &block) |
96 | | - |
| 114 | + array[@index] = (value.nil? ? NIL_SENTINEL : value) |
97 | 115 | value |
98 | 116 | end |
99 | 117 |
|
100 | 118 | protected |
101 | 119 |
|
102 | 120 | # @!visibility private |
103 | | - def allocate_storage |
104 | | - raise NotImplementedError |
105 | | - end |
106 | | - |
107 | | - # @!visibility private |
108 | | - def get |
109 | | - raise NotImplementedError |
110 | | - end |
111 | | - |
112 | | - # @!visibility private |
113 | | - def set(value) |
114 | | - raise NotImplementedError |
115 | | - end |
116 | | - end |
117 | | - |
118 | | - # @!visibility private |
119 | | - # @!macro internal_implementation_note |
120 | | - class RubyThreadLocalVar < AbstractThreadLocalVar |
121 | | - |
122 | | - protected |
123 | | - |
124 | | - # @!visibility private |
125 | | - def allocate_storage |
126 | | - @storage = WeakKeyMap.new |
| 121 | + def self.threadlocal_finalizer(index) |
| 122 | + proc do |
| 123 | + LOCK.synchronize do |
| 124 | + FREE.push(index) |
| 125 | + # The cost of GC'ing a TLV is linear in the number of threads using TLVs |
| 126 | + # But that is natural! More threads means more storage is used per TLV |
| 127 | + # So naturally more CPU time is required to free more storage |
| 128 | + ARRAYS.each_value do |array| |
| 129 | + array[index] = nil |
| 130 | + end |
| 131 | + end |
| 132 | + end |
127 | 133 | end |
128 | 134 |
|
129 | 135 | # @!visibility private |
130 | | - def get |
131 | | - @storage[Thread.current] |
| 136 | + def self.thread_finalizer(array) |
| 137 | + proc do |
| 138 | + LOCK.synchronize do |
| 139 | + # The thread which used this thread-local array is now gone |
| 140 | + # So don't hold onto a reference to the array (thus blocking GC) |
| 141 | + ARRAYS.delete(array.object_id) |
| 142 | + end |
| 143 | + end |
132 | 144 | end |
133 | 145 |
|
134 | | - # @!visibility private |
135 | | - def set(value) |
136 | | - key = Thread.current |
137 | | - |
138 | | - @storage[key] = value |
139 | | - |
| 146 | + # Bind the given value to thread local storage during |
| 147 | + # execution of the given block. |
| 148 | + # |
| 149 | + # @param [Object] value the value to bind |
| 150 | + # @yield the operation to be performed with the bound variable |
| 151 | + # @return [Object] the value |
| 152 | + def bind(value, &block) |
140 | 153 | if block_given? |
| 154 | + old_value = self.value |
141 | 155 | begin |
| 156 | + self.value = value |
142 | 157 | yield |
143 | 158 | ensure |
144 | | - @storage.delete(key) |
| 159 | + self.value = old_value |
145 | 160 | end |
146 | 161 | end |
147 | 162 | end |
148 | 163 | end |
149 | | - |
150 | | - if Concurrent.on_jruby? |
151 | | - |
152 | | - # @!visibility private |
153 | | - # @!macro internal_implementation_note |
154 | | - class JavaThreadLocalVar < AbstractThreadLocalVar |
155 | | - |
156 | | - protected |
157 | | - |
158 | | - # @!visibility private |
159 | | - def allocate_storage |
160 | | - @var = java.lang.ThreadLocal.new |
161 | | - end |
162 | | - |
163 | | - # @!visibility private |
164 | | - def get |
165 | | - @var.get |
166 | | - end |
167 | | - |
168 | | - # @!visibility private |
169 | | - def set(value) |
170 | | - @var.set(value) |
171 | | - end |
172 | | - end |
173 | | - end |
174 | | - |
175 | | - # @!visibility private |
176 | | - # @!macro internal_implementation_note |
177 | | - ThreadLocalVarImplementation = case |
178 | | - when Concurrent.on_jruby? |
179 | | - JavaThreadLocalVar |
180 | | - else |
181 | | - RubyThreadLocalVar |
182 | | - end |
183 | | - private_constant :ThreadLocalVarImplementation |
184 | | - |
185 | | - # @!macro thread_local_var |
186 | | - class ThreadLocalVar < ThreadLocalVarImplementation |
187 | | - |
188 | | - # @!method initialize(default = nil) |
189 | | - # @!macro thread_local_var_method_initialize |
190 | | - |
191 | | - # @!method value |
192 | | - # @!macro thread_local_var_method_get |
193 | | - |
194 | | - # @!method value=(value) |
195 | | - # @!macro thread_local_var_method_set |
196 | | - |
197 | | - # @!method bind(value, &block) |
198 | | - # @!macro thread_local_var_method_bind |
199 | | - |
200 | | - end |
201 | 164 | end |
0 commit comments