Spaces:
Running
Running
| /* | |
| * Copyright 2021 Google LLC | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| namespace csrblocksparse { | |
| // A re-usable barrier. Keeps threads in extremely tight sync without | |
| // relinquishing control. All memory writes _before_ this barrier are visible | |
| // to all threads _after_ this barrier. Similar in spirit to | |
| // pthreads_barrier. If you expect arrival times at this barrier to be varied | |
| // by more than microseconds, this is probably not the right synchronization | |
| // primitive for you. If |num_threads| exceeds the number of physical threads | |
| // that can run simultaneously, then using this is certainly a bad idea | |
| // (although it should still be correct). | |
| // | |
| // Callers MUST NOT call barrier from more threads than |num_threads|. The | |
| // result is undefined behavior. | |
| class SpinBarrier { | |
| public: | |
| explicit SpinBarrier(int num_threads) | |
| : num_threads_(num_threads), threads_at_barrier_(0), barrier_step_(0) {} | |
| void barrier(); | |
| private: | |
| const int num_threads_; | |
| std::atomic<int32_t> threads_at_barrier_; | |
| std::atomic<uint32_t> barrier_step_; // unsigned to make overflow defined. | |
| }; | |
| // Producer-consumer API using the same underlying mechanism as SpinBarrier. | |
| // This class is intended to allow >=1 producers to produce data for >=1 | |
| // consumers, without blocking the producers. | |
| // The consumer will block if it is ready before all the producer(s) have | |
| // produced. | |
| // WARNING: By design this lock does not work without some other barrier that | |
| // prevents any producer from producing again, or consumer from consuming again | |
| // until all consumers have consumed. Basically any loop that uses | |
| // ProducerConsumer must have at least two consume() calls in each thread (on | |
| // different instances) in order for the lock to work correctly. | |
| class ProducerConsumer { | |
| public: | |
| ProducerConsumer(int num_producers, int num_consumers) | |
| : num_producers_(num_producers), | |
| num_consumers_(num_consumers), | |
| producers_ready_(0), | |
| consumers_passed_(0) {} | |
| // Indicates that the data produced by this thread is ready. Does NOT block. | |
| // NOTE that some other lock must exist between the call to this produce and | |
| // looping back to call produce again on the same ProducerConsumer, that | |
| // depends on all consumers having called consume. One such candidate would | |
| // be a call to SpinBarrier above by all producers and consumers. | |
| // Another candidate would be a separate ProducerConsumer object in which | |
| // these producers consume some data produced by the threads that consume | |
| // the data produced here. Eg. | |
| // tid 0 1 2 3 | |
| // action 1 produce produce consume consume (on ProducerConsumer 1) | |
| // action 2 consume consume produce produce (on ProducerConsumer 2) | |
| // action 3 produce produce consume consume (on ProducerConsumer 3) | |
| // action 4 consume consume produce produce (on ProducerConsumer 4) | |
| // loop back to action 1. | |
| // NOTE: It is inadequate to loop back after action2, as thread 0 could loop | |
| // back and consume again on PC2 while thread 1 is still completing its call | |
| // to consume. It is still inadequate to loop back after action 3 for the same | |
| // reason (but tsan doesn't seem to pick this up.) | |
| inline void produce() { | |
| producers_ready_.fetch_add(1, std::memory_order_acq_rel); | |
| } | |
| // Waits if necessary for all producers to have produced before proceeding. | |
| // The ProducerConsumer cannot be reused until all consumers have consumed. | |
| // See detailed comment and example on produce(). | |
| inline void consume() { | |
| // We can't do anything until all the producers have produced. | |
| while (producers_ready_.load(std::memory_order_acquire) < num_producers_) { | |
| asm volatile("yield\n" ::: "memory"); | |
| // No pause for x86! The pause instruction on Skylake takes 141 clock | |
| // cycles, which in an AVX2-down-clocked CPU is getting on for 70ns. | |
| } | |
| // NOTE: It is tempting to move this fetch_add to before the wait loop to | |
| // reduce contention for the memory location, but that would break the lock, | |
| // as then the last to arrive could zero out the producers_ready before the | |
| // other consumers have noticed that all producers have produced. | |
| // With the fetch_add after the wait loop, we are guaranteed that all | |
| // producers have produced AND all consumers have noticed that they have | |
| // produced before we zero out the counters. | |
| int consumers = consumers_passed_.fetch_add(1, std::memory_order_acq_rel); | |
| if (consumers == num_consumers_ - 1) { | |
| // The last consumer to pass has to reset everything for the next time. | |
| producers_ready_.store(0, std::memory_order_relaxed); | |
| consumers_passed_.store(0, std::memory_order_relaxed); | |
| } | |
| } | |
| int num_producers() const { return num_producers_; } | |
| int num_consumers() const { return num_consumers_; } | |
| private: | |
| const int num_producers_; | |
| const int num_consumers_; | |
| std::atomic<int32_t> producers_ready_; | |
| std::atomic<int32_t> consumers_passed_; | |
| }; | |
| // We define Thread here, so we can easily change its type later. | |
| using Thread = std::thread; | |
| using ThreadId = std::thread::id; | |
| // Creates (|num_threads|-1) threads and executes a total of |num_threads| | |
| // copies of |func| (executes one on the calling thread). | |
| // | |
| // Useful for long running func bodies that are intended to run in lock step. | |
| // A possible use case for this style parallelism over a thread pool is when | |
| // we want tight control over which memory is resident in the L2 cache of a | |
| // processor. With a pool we have no control over which thread gets assigned | |
| // which portion of the computation resulting in L2 thrashing. With this | |
| // breakdown we can make sure each thread only acceses a specific L2-sized | |
| // portion of memory. | |
| // | |
| // func's signature must be (SpinBarrier*, int thread_id, ...); | |
| template <class Function, class... Args> | |
| void LaunchOnThreadsWithBarrier(int num_threads, Function&& func, | |
| Args&&... args) { | |
| SpinBarrier spin_barrier(num_threads); | |
| std::vector<std::unique_ptr<Thread>> threads; | |
| threads.reserve(num_threads); | |
| for (int tid = 1; tid < num_threads; ++tid) { | |
| auto f = [&, tid]() { func(&spin_barrier, tid, args...); }; | |
| threads.emplace_back(absl::make_unique<Thread>(f)); | |
| CHECK_OK(threads.back()->Start()); | |
| } | |
| const int kLocalTid = 0; | |
| func(&spin_barrier, kLocalTid, args...); | |
| for (auto& thread : threads) { | |
| thread->join(); | |
| CHECK_OK(thread->Join()); | |
| } | |
| } | |
| } // namespace csrblocksparse | |