1 | /*
|
2 | * Souffle - A Datalog Compiler
|
3 | * Copyright (c) 2021, The Souffle Developers. All rights reserved
|
4 | * Licensed under the Universal Permissive License v 1.0 as shown at:
|
5 | * - https://opensource.org/licenses/UPL
|
6 | * - <souffle root>/licenses/SOUFFLE-UPL.txt
|
7 | */
|
8 | #pragma once
|
9 |
|
10 | #include "souffle/utility/ParallelUtil.h"
|
11 |
|
12 | #include <array>
|
13 | #include <atomic>
|
14 | #include <cassert>
|
15 | #include <cmath>
|
16 | #include <memory>
|
17 | #include <mutex>
|
18 | #include <vector>
|
19 |
|
20 | namespace souffle {
|
21 | namespace details {
|
22 |
|
23 | static const std::vector<std::pair<unsigned, unsigned>> ToPrime = {
|
24 | // https://primes.utm.edu/lists/2small/0bit.html
|
25 | // ((2^n) - k) is prime
|
26 | // {n, k}
|
27 | {4, 3}, // 2^4 - 3 = 13
|
28 | {8, 5}, // 8^5 - 5 = 251
|
29 | {9, 3}, {10, 3}, {11, 9}, {12, 3}, {13, 1}, {14, 3}, {15, 19}, {16, 15}, {17, 1}, {18, 5}, {19, 1},
|
30 | {20, 3}, {21, 9}, {22, 3}, {23, 15}, {24, 3}, {25, 39}, {26, 5}, {27, 39}, {28, 57}, {29, 3},
|
31 | {30, 35}, {31, 1}, {32, 5}, {33, 9}, {34, 41}, {35, 31}, {36, 5}, {37, 25}, {38, 45}, {39, 7},
|
32 | {40, 87}, {41, 21}, {42, 11}, {43, 57}, {44, 17}, {45, 55}, {46, 21}, {47, 115}, {48, 59}, {49, 81},
|
33 | {50, 27}, {51, 129}, {52, 47}, {53, 111}, {54, 33}, {55, 55}, {56, 5}, {57, 13}, {58, 27}, {59, 55},
|
34 | {60, 93}, {61, 1}, {62, 57}, {63, 25}};
|
35 |
|
36 | // (2^64)-59 is the largest prime that fits in uint64_t
|
37 | static constexpr uint64_t LargestPrime64 = 18446744073709551557UL;
|
38 |
|
39 | // Return a prime greater or equal to the lower bound.
|
40 | // Return 0 if the next prime would not fit in 64 bits.
|
41 | inline static uint64_t GreaterOrEqualPrime(const uint64_t LowerBound) {
|
42 | if (LowerBound > LargestPrime64) {
|
43 | return 0;
|
44 | }
|
45 |
|
46 | for (std::size_t I = 0; I < ToPrime.size(); ++I) {
|
47 | const uint64_t N = ToPrime[I].first;
|
48 | const uint64_t K = ToPrime[I].second;
|
49 | const uint64_t Prime = (1ULL << N) - K;
|
50 | if (Prime >= LowerBound) {
|
51 | return Prime;
|
52 | }
|
53 | }
|
54 | return LargestPrime64;
|
55 | }
|
56 |
|
57 | template <typename T>
|
58 | struct Factory {
|
59 | template <class... Args>
|
60 | T& replace(T& Place, Args&&... Xs) {
|
61 | Place = T{std::forward<Args>(Xs)...};
|
62 | return Place;
|
63 | }
|
64 | };
|
65 |
|
66 | } // namespace details
|
67 |
|
68 | /**
|
69 | * A concurrent, almost lock-free associative hash-map that can only grow.
|
70 | * Elements cannot be removed, the hash-map can only grow.
|
71 | *
|
72 | * The datastructures enables a configurable number of concurrent access lanes.
|
73 | * Access to the datastructure is lock-free between different lanes.
|
74 | * Concurrent accesses through the same lane is sequential.
|
75 | *
|
76 | * Growing the datastructure requires to temporarily lock all lanes to let a
|
77 | * single lane perform the growing operation. The global lock is amortized
|
78 | * thanks to an exponential growth strategy.
|
79 | */
|
80 | template <class LanesPolicy, class Key, class T, class Hash = std::hash<Key>,
|
81 | class KeyEqual = std::equal_to<Key>, class KeyFactory = details::Factory<Key>>
|
82 | class ConcurrentInsertOnlyHashMap {
|
83 | public:
|
84 | class Node;
|
85 |
|
86 | using key_type = Key;
|
87 | using mapped_type = T;
|
88 | using node_type = Node*;
|
89 | using value_type = std::pair<const Key, const T>;
|
90 | using size_type = std::size_t;
|
91 | using hasher = Hash;
|
92 | using key_equal = KeyEqual;
|
93 | using self_type = ConcurrentInsertOnlyHashMap<Key, T, Hash, KeyEqual, KeyFactory>;
|
94 | using lane_id = typename LanesPolicy::lane_id;
|
95 |
|
96 | class Node {
|
97 | public:
|
98 | virtual ~Node() {}
|
99 | virtual const value_type& value() const = 0;
|
100 | virtual const key_type& key() const = 0;
|
101 | virtual const mapped_type& mapped() const = 0;
|
102 | };
|
103 |
|
104 | private:
|
105 | // Each bucket of the hash-map is a linked list.
|
106 | struct BucketList : Node {
|
107 | virtual ~BucketList() {}
|
108 |
|
109 | BucketList(const Key& K, const T& V, BucketList* N) : Value(K, V), Next(N) {}
|
110 |
|
111 | const value_type& value() const {
|
112 | return Value;
|
113 | }
|
114 |
|
115 | const key_type& key() const {
|
116 | return Value.first;
|
117 | }
|
118 |
|
119 | const mapped_type& mapped() const {
|
120 | return Value.second;
|
121 | }
|
122 |
|
123 | // Stores the couple of a key and its associated value.
|
124 | value_type Value;
|
125 |
|
126 | // Points to next element of the map that falls into the same bucket.
|
127 | BucketList* Next;
|
128 | };
|
129 |
|
130 | public:
|
131 | /**
|
132 | * @brief Construct a hash-map with at least the given number of buckets.
|
133 | *
|
134 | * Load-factor is initialized to 1.0.
|
135 | */
|
136 | ConcurrentInsertOnlyHashMap(const std::size_t LaneCount, const std::size_t Bucket_Count,
|
137 | const Hash& hash = Hash(), const KeyEqual& key_equal = KeyEqual(),
|
138 | const KeyFactory& key_factory = KeyFactory())
|
139 | : Lanes(LaneCount), Hasher(hash), EqualTo(key_equal), Factory(key_factory) {
|
140 | Size = 0;
|
141 | BucketCount = details::GreaterOrEqualPrime(Bucket_Count);
|
142 | if (BucketCount == 0) {
|
143 | // Hopefuly this number of buckets is never reached.
|
144 | BucketCount = std::numeric_limits<std::size_t>::max();
|
145 | }
|
146 | LoadFactor = 1.0;
|
147 | Buckets = std::make_unique<std::atomic<BucketList*>[]>(BucketCount);
|
148 | MaxSizeBeforeGrow = static_cast<std::size_t>(std::ceil(LoadFactor * (double)BucketCount));
|
149 | }
|
150 |
|
151 | ConcurrentInsertOnlyHashMap(const Hash& hash = Hash(), const KeyEqual& key_equal = KeyEqual(),
|
152 | const KeyFactory& key_factory = KeyFactory())
|
153 | : ConcurrentInsertOnlyHashMap(8, hash, key_equal, key_factory) {}
|
154 |
|
155 | ~ConcurrentInsertOnlyHashMap() {
|
156 | for (std::size_t Bucket = 0; Bucket < BucketCount; ++Bucket) {
|
157 | BucketList* L = Buckets[Bucket].load(std::memory_order_relaxed);
|
158 | while (L != nullptr) {
|
159 | BucketList* BL = L;
|
160 | L = L->Next;
|
161 | delete (BL);
|
162 | }
|
163 | }
|
164 | }
|
165 |
|
166 | void setNumLanes(const std::size_t NumLanes) {
|
167 | Lanes.setNumLanes(NumLanes);
|
168 | }
|
169 |
|
170 | /** @brief Create a fresh node initialized with the given value and a
|
171 | * default-constructed key.
|
172 | *
|
173 | * The ownership of the returned node given to the caller.
|
174 | */
|
175 | node_type node(const T& V) {
|
176 | BucketList* BL = new BucketList(Key{}, V, nullptr);
|
177 | return static_cast<node_type>(BL);
|
178 | }
|
179 |
|
180 | /**
|
181 | * @brief Lookup a value associated with a key.
|
182 | *
|
183 | * The search is done concurrently with possible insertion of the
|
184 | * searched key. If the a nullpointer is returned, then the key
|
185 | * was not associated with a value when the search began.
|
186 | */
|
187 | template <class K>
|
188 | const value_type* weakFind(const lane_id H, const K& X) const {
|
189 | const size_t HashValue = Hasher(X);
|
190 | const auto Guard = Lanes.guard(H);
|
191 | const size_t Bucket = HashValue % BucketCount;
|
192 |
|
193 | BucketList* L = Buckets[Bucket].load(std::memory_order_acquire);
|
194 | while (L != nullptr) {
|
195 | if (EqualTo(L->Value.first, X)) {
|
196 | // found the key
|
197 | return &L->Value;
|
198 | }
|
199 | L = L->Next;
|
200 | }
|
201 | return nullptr;
|
202 | }
|
203 |
|
204 | /** @brief Checks if the map contains an element with the given key.
|
205 | *
|
206 | * The search is done concurrently with possible insertion of the
|
207 | * searched key. If return true, then there is definitely an element
|
208 | * with the specified key, if return false then there was no such
|
209 | * element when the search began.
|
210 | */
|
211 | template <class K>
|
212 | inline bool weakContains(const lane_id H, const K& X) const {
|
213 | return weakFind(H, X) != nullptr;
|
214 | }
|
215 |
|
216 | /**
|
217 | * @brief Inserts in-place if the key is not mapped, does nothing if the key already exists.
|
218 | *
|
219 | * @param H is the access lane.
|
220 | *
|
221 | * @param N is a node initialized with the mapped value to insert.
|
222 | *
|
223 | * @param Xs are arguments to forward to the hasher, the comparator and and
|
224 | * the constructor of the key.
|
225 | *
|
226 | *
|
227 | * Be Careful: the inserted node becomes available to concurrent lanes as
|
228 | * soon as it is inserted, thus concurrent lanes may access the inserted
|
229 | * value even before the inserting lane returns from this function.
|
230 | * This is the reason why the inserting lane must prepare the inserted
|
231 | * node's mapped value prior to calling this function.
|
232 | *
|
233 | * Be Careful: the given node remains the ownership of the caller unless
|
234 | * the returned couple second member is true.
|
235 | *
|
236 | * Be Careful: the given node may not be inserted if the key already
|
237 | * exists. The caller is in charge of handling that case and either
|
238 | * dispose of the node or save it for the next insertion operation.
|
239 | *
|
240 | * Be Careful: Once the given node is actually inserted, its ownership is
|
241 | * transfered to the hash-map. However it remains valid.
|
242 | *
|
243 | * If the key that compares equal to arguments Xs exists, then nothing is
|
244 | * inserted. The returned value is the couple of the pointer to the
|
245 | * existing value and the false boolean value.
|
246 | *
|
247 | * If the key that compares equal to arguments Xs does not exist, then the
|
248 | * node N is updated with the key constructed from Xs, and inserted in the
|
249 | * hash-map. The returned value is the couple of the pointer to the
|
250 | * inserted value and the true boolean value.
|
251 | *
|
252 | */
|
253 | template <class... Args>
|
254 | std::pair<const value_type*, bool> get(const lane_id H, const node_type N, Args&&... Xs) {
|
255 | // At any time a concurrent lane may insert the key before this lane.
|
256 | //
|
257 | // The synchronisation point is the atomic compare-and-exchange of the
|
258 | // head of the bucket list that must contain the inserted node.
|
259 | //
|
260 | // The insertion algorithm is as follow:
|
261 | //
|
262 | // 1) Compute the key hash from Xs.
|
263 | //
|
264 | // 2) Lock the lane, that also prevent concurrent lanes from growing of
|
265 | // the datastructure.
|
266 | //
|
267 | // 3) Determine the bucket where the element must be inserted.
|
268 | //
|
269 | // 4) Read the "last known head" of the bucket list. Other lanes
|
270 | // inserting in the same bucket may update the bucket head
|
271 | // concurrently.
|
272 | //
|
273 | // 5) Search the bucket list for the key by comparing with Xs starting
|
274 | // from the last known head. If it is not the first round of search,
|
275 | // then stop searching where the previous round of search started.
|
276 | //
|
277 | // 6) If the key is found return the couple of the value pointer and
|
278 | // false (to indicate that this lane did not insert the node N).
|
279 | //
|
280 | // 7) It the key is not found prepare N for insertion by updating its
|
281 | // key with Xs and chaining the last known head.
|
282 | //
|
283 | // 8) Try to exchange to last known head with N at the bucket head. The
|
284 | // atomic compare and exchange operation guarantees that it only
|
285 | // succeed if not other node was inserted in the bucket since we
|
286 | // searched it, otherwise it fails when another lane has concurrently
|
287 | // inserted a node in the same bucket.
|
288 | //
|
289 | // 9) If the atomic compare and exchange succeeded, the node has just
|
290 | // been inserted by this lane. From now-on other lanes can also see
|
291 | // the node. Return the couple of a pointer to the inserted value and
|
292 | // the true boolean.
|
293 | //
|
294 | // 10) If the atomic compare and exchange failed, another node has been
|
295 | // inserted by a concurrent lane in the same bucket. A new round of
|
296 | // search is required -> restart from step 4.
|
297 | //
|
298 | //
|
299 | // The datastructure is optionaly grown after step 9) before returning.
|
300 |
|
301 | const value_type* Value = nullptr;
|
302 | bool Inserted = false;
|
303 |
|
304 | size_t NewSize;
|
305 |
|
306 | // 1)
|
307 | const size_t HashValue = Hasher(std::forward<Args>(Xs)...);
|
308 |
|
309 | // 2)
|
310 | Lanes.lock(H); // prevent the datastructure from growing
|
311 |
|
312 | // 3)
|
313 | const size_t Bucket = HashValue % BucketCount;
|
314 |
|
315 | // 4)
|
316 | // the head of the bucket's list last time we checked
|
317 | BucketList* LastKnownHead = Buckets[Bucket].load(std::memory_order_acquire);
|
318 | // the head of the bucket's list we already searched from
|
319 | BucketList* SearchedFrom = nullptr;
|
320 | // the node we want to insert
|
321 | BucketList* const Node = static_cast<BucketList*>(N);
|
322 |
|
323 | // Loop until either the node is inserted or the key is found in the bucket.
|
324 | // Assuming bucket collisions are rare this loop is not executed more than once.
|
325 | while (true) {
|
326 | // 5)
|
327 | // search the key in the bucket, stop where we already search at a
|
328 | // previous iteration.
|
329 | BucketList* L = LastKnownHead;
|
330 | while (L != SearchedFrom) {
|
331 | if (EqualTo(L->Value.first, std::forward<Args>(Xs)...)) {
|
332 | // 6)
|
333 | // Found the key, no need to insert.
|
334 | // Although it's not strictly necessary, clear the node
|
335 | // chaining to avoid leaving a dangling pointer there.
|
336 | Value = &(L->Value);
|
337 | Node->Next = nullptr;
|
338 | goto Done;
|
339 | }
|
340 | L = L->Next;
|
341 | }
|
342 | SearchedFrom = LastKnownHead;
|
343 |
|
344 | // 7)
|
345 | // Not found in bucket, prepare node chaining.
|
346 | Node->Next = LastKnownHead;
|
347 | // The factory step could be done only once, but assuming bucket collisions are
|
348 | // rare this whole loop is not executed more than once.
|
349 | Factory.replace(const_cast<key_type&>(Node->Value.first), std::forward<Args>(Xs)...);
|
350 |
|
351 | // 8)
|
352 | // Try to insert the key in front of the bucket's list.
|
353 | // This operation also performs step 4) because LastKnownHead is
|
354 | // updated in the process.
|
355 | if (Buckets[Bucket].compare_exchange_strong(
|
356 | LastKnownHead, Node, std::memory_order_release, std::memory_order_relaxed)) {
|
357 | // 9)
|
358 | Inserted = true;
|
359 | NewSize = ++Size;
|
360 | Value = &(Node->Value);
|
361 | goto AfterInserted;
|
362 | }
|
363 |
|
364 | // 10) concurrent insertion detected in this bucket, new round required.
|
365 | }
|
366 |
|
367 | AfterInserted : {
|
368 | if (NewSize > MaxSizeBeforeGrow) {
|
369 | tryGrow(H);
|
370 | }
|
371 | }
|
372 |
|
373 | Done:
|
374 |
|
375 | Lanes.unlock(H);
|
376 |
|
377 | // 6,9)
|
378 | return std::make_pair(Value, Inserted);
|
379 | }
|
380 |
|
381 | private:
|
382 | // The concurrent lanes manager.
|
383 | LanesPolicy Lanes;
|
384 |
|
385 | /// Hash function.
|
386 | Hash Hasher;
|
387 |
|
388 | /// Current number of buckets.
|
389 | std::size_t BucketCount;
|
390 |
|
391 | /// Atomic pointer to head bucket linked-list head.
|
392 | std::unique_ptr<std::atomic<BucketList*>[]> Buckets;
|
393 |
|
394 | /// The Equal-to function.
|
395 | KeyEqual EqualTo;
|
396 |
|
397 | KeyFactory Factory;
|
398 |
|
399 | /// Current number of elements stored in the map.
|
400 | std::atomic<std::size_t> Size;
|
401 |
|
402 | /// Maximum size before the map should grow.
|
403 | std::size_t MaxSizeBeforeGrow;
|
404 |
|
405 | /// The load-factor of the map.
|
406 | double LoadFactor;
|
407 |
|
408 | // Grow the datastructure.
|
409 | // Must be called while owning lane H.
|
410 | bool tryGrow(const lane_id H) {
|
411 | Lanes.beforeLockAllBut(H);
|
412 |
|
413 | if (Size <= MaxSizeBeforeGrow) {
|
414 | // Current size is fine
|
415 | Lanes.beforeUnlockAllBut(H);
|
416 | return false;
|
417 | }
|
418 |
|
419 | Lanes.lockAllBut(H);
|
420 |
|
421 | { // safe section
|
422 |
|
423 | // Compute the new number of buckets:
|
424 | // Chose a prime number of buckets that ensures the desired load factor
|
425 | // given the current number of elements in the map.
|
426 | const std::size_t CurrentSize = Size;
|
427 | assert(LoadFactor > 0);
|
428 | const std::size_t NeededBucketCount =
|
429 | static_cast<std::size_t>(std::ceil(static_cast<double>(CurrentSize) / LoadFactor));
|
430 | std::size_t NewBucketCount = NeededBucketCount;
|
431 | for (std::size_t I = 0; I < details::ToPrime.size(); ++I) {
|
432 | const uint64_t N = details::ToPrime[I].first;
|
433 | const uint64_t K = details::ToPrime[I].second;
|
434 | const uint64_t Prime = (1ULL << N) - K;
|
435 | if (Prime >= NeededBucketCount) {
|
436 | NewBucketCount = Prime;
|
437 | break;
|
438 | }
|
439 | }
|
440 |
|
441 | std::unique_ptr<std::atomic<BucketList*>[]> NewBuckets =
|
442 | std::make_unique<std::atomic<BucketList*>[]>(NewBucketCount);
|
443 |
|
444 | // Rehash, this operation is costly because it requires to scan
|
445 | // the existing elements, compute its hash to find its new bucket
|
446 | // and insert in the new bucket.
|
447 | //
|
448 | // Maybe concurrent lanes could help using some job-stealing algorithm.
|
449 | //
|
450 | // Use relaxed memory ordering since the whole operation takes place
|
451 | // in a critical section.
|
452 | for (std::size_t B = 0; B < BucketCount; ++B) {
|
453 | BucketList* L = Buckets[B].load(std::memory_order_relaxed);
|
454 | while (L) {
|
455 | BucketList* const Elem = L;
|
456 | L = L->Next;
|
457 |
|
458 | const auto& Value = Elem->Value;
|
459 | std::size_t NewHash = Hasher(Value.first);
|
460 | const std::size_t NewBucket = NewHash % NewBucketCount;
|
461 | Elem->Next = NewBuckets[NewBucket].load(std::memory_order_relaxed);
|
462 | NewBuckets[NewBucket].store(Elem, std::memory_order_relaxed);
|
463 | }
|
464 | }
|
465 |
|
466 | Buckets = std::move(NewBuckets);
|
467 | BucketCount = NewBucketCount;
|
468 | MaxSizeBeforeGrow =
|
469 | static_cast<std::size_t>(std::ceil(static_cast<double>(NewBucketCount) * LoadFactor));
|
470 | }
|
471 |
|
472 | Lanes.beforeUnlockAllBut(H);
|
473 | Lanes.unlockAllBut(H);
|
474 | return true;
|
475 | }
|
476 | };
|
477 |
|
478 | } // namespace souffle
|