草庐IT

c++ - 带中断的互斥安全(嵌入式固件)

coder 2024-02-11 原文

编辑 @Mike 指出我下面代码中的 try_lock 函数是不安全的,并且访问器创建也会产生竞争条件。 (来自每个人的)建议使我确信我走上了错误的道路。
原始问题
嵌入式微 Controller 上的锁定要求与多线程不同,我无法将多线程示例转换为我的嵌入式应用程序。通常我没有任何类型的操作系统或线程,只有 main以及硬件定期调用的任何中断函数。
我需要从中断中填充缓冲区是很常见的,但在 main 中处理它是很常见的。 .我创建了 IrqMutex下面的类来尝试安全地实现这一点。每个试图访问缓冲区的人都通过 IrqMutexAccessor 被分配了一个唯一的 ID。 ,那么他们每个人都可以 try_lock()unlock() .阻塞的想法 lock()函数在中断时不起作用,因为除非您允许中断完成,否则没有其他代码可以执行,因此 unlock()代码永远不会运行。然而,我确实使用了 main() 中的阻塞锁。偶尔打码。
但是,我知道没有 C++11 内存屏障(在许多嵌入式平台上不可用),双重检查锁不起作用。老实说,尽管阅读了很多关于它的信息,但我真的不明白内存访问重新排序如何/为什么会导致问题。我认为使用 volatile sig_atomic_t(可能结合使用唯一 ID)使其与双重检查锁不同。但我希望有人可以:确认以下代码正确 , 解释为什么它不安全 , 或 提供一种更好的方法来实现此目的 .

class IrqMutex {
friend class IrqMutexAccessor;

private:
    std::sig_atomic_t accessorIdEnum;
    volatile std::sig_atomic_t owner;
protected:
    std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }
    bool have_lock(std::sig_atomic_t accessorId) {
        return (owner == accessorId);
    }
    bool try_lock(std::sig_atomic_t accessorId) {
        // Only try to get a lock, while it isn't already owned.
        while (owner == SIG_ATOMIC_MIN) {
            // <-- If an interrupt occurs here, both attempts can get a lock at the same time.

            // Try to take ownership of this Mutex.
            owner = accessorId; // SET

            // Double check that we are the owner.
            if (owner == accessorId) return true;

            // Someone else must have taken ownership between CHECK and SET.
            // If they released it after CHECK, we'll loop back and try again.
            // Otherwise someone else has a lock and we have failed.
        }        

        // This shouldn't happen unless they called try_lock on something they already owned.
        if (owner == accessorId) return true;

        // If someone else owns it, we failed.
        return false;
    }
    bool unlock(std::sig_atomic_t accessorId) {
        // Double check that the owner called this function (not strictly required)
        if (owner == accessorId) {
            owner = SIG_ATOMIC_MIN;
            return true;
        }
        
        // We still return true if the mutex was unlocked anyway.
        return (owner == SIG_ATOMIC_MIN);
    }
public:
    IrqMutex(void) : accessorIdEnum(SIG_ATOMIC_MIN), owner(SIG_ATOMIC_MIN) {}
};

// This class is used to manage our unique accessorId.
class IrqMutexAccessor {
friend class IrqMutex;
private:
    IrqMutex& mutex;
    const std::sig_atomic_t accessorId;
public:
    IrqMutexAccessor(IrqMutex& m) : mutex(m), accessorId(m.nextAccessor()) {}
    bool have_lock(void) { return mutex.have_lock(accessorId); }
    bool try_lock(void) { return mutex.try_lock(accessorId); }
    bool unlock(void) { return mutex.unlock(accessorId); }
};
因为只有一个处理器,并且没有线程处理互斥锁,我认为它的用途与正常用途略有不同。我反复遇到两个主要用例。
  • 中断是一个生产者,它拥有一个空闲缓冲区的所有权,并用一个数据包加载它。中断/生产者可能会在多次中断调用中长时间保持其所有权锁定。主要功能是消费者,并在准备好处理它时获得完整缓冲区的所有权。竞争条件很少发生,但如果中断/生产者完成一个数据包并需要一个新缓冲区,但它们都已满,它将尝试使用最旧的缓冲区(这是一个丢弃的数据包事件)。如果主要/消费者在完全相同的时间开始读取和处理最旧的缓冲区,它们将相互践踏。
  • 中断只是某物(如计数器)的快速更改或增加。但是,如果我们想通过调用 main() 代码来重置计数器或跳转到某个新值,我们不想在计数器发生变化时尝试写入它。这里 main 实际上做了一个阻塞循环来获取锁,但是我认为几乎不可能在这里等待超过两次尝试。一旦它有一个锁,任何对计数器中断的调用都将被跳过,但这对于像计数器这样的东西来说通常没什么大不了的。然后我更新计数器值并解锁它,以便它可以再次开始递增。

  • 我意识到这两个示例被简化了一点,但是这些模式的某些版本出现在我工作的每个项目的许多外围设备中,我想要一段可重用的代码,可以在各种嵌入式平台上安全地处理这个问题。我包含了 C 标记,因为所有这些都可以直接转换为 C 代码,并且在某些嵌入式编译器上,这些都是可用的。所以我试图找到一种保证在 C 和 C++ 中都能工作的通用方法。
    struct ExampleCounter {
        volatile long long int value;
        IrqMutex mutex;
    } exampleCounter;
    
    struct ExampleBuffer {
        volatile char data[256];
        volatile size_t index;
        IrqMutex mutex; // One mutex per buffer.
    } exampleBuffers[2];
    
    const volatile char * const REGISTER;
    
    // This accessor shouldn't be created in an interrupt or a race condition can occur.
    static IrqMutexAccessor myMutex(exampleCounter.mutex);
    void __irqQuickFunction(void) {
        // Obtain a lock, add the data then unlock all within one function call.
        if (myMutex.try_lock()) {
            exampleCounter.value++;
            myMutex.unlock();
        } else {
            // If we failed to obtain a lock, we skipped this update this one time.
        }
    }
    
    // These accessors shouldn't be created in an interrupt or a race condition can occur.
    static IrqMutexAccessor myMutexes[2] = {
        IrqMutexAccessor(exampleBuffers[0].mutex),
        IrqMutexAccessor(exampleBuffers[1].mutex)
    };
    void __irqLongFunction(void) {
        static size_t bufferIndex = 0;
    
        // Check if we have a lock.
        if (!myMutex[bufferIndex].have_lock() and !myMutex[bufferIndex].try_lock()) {
            // If we can't get a lock try the other buffer
            bufferIndex = (bufferIndex + 1) % 2;
    
            // One buffer should always be available so the next line should always be successful.
            if (!myMutex[bufferIndex].try_lock()) return;
        }
        
        // ... at this point we know we have a lock ...
    
        // Get data from the hardware and modify the buffer here.
        const char c = *REGISTER;
        exampleBuffers[bufferIndex].data[exampleBuffers[bufferIndex].index++] = c;
    
        // We may keep the lock for multiple function calls until the end of packet.
        static const char END_PACKET_SIGNAL = '\0';    
        if (c == END_PACKET_SIGNAL) {
            // Unlock this buffer so it can be read from main.
            myMutex[bufferIndex].unlock();
    
            // Switch to the other buffer for next time.
            bufferIndex = (bufferIndex + 1) % 2;
        }
    }
    
    int main(void) {
        while (true) {
            // Mutex for counter
            static IrqMutexAccessor myCounterMutex(exampleCounter.mutex);
    
            // Change counter value
            if (EVERY_ONCE_IN_A_WHILE) {
                // Skip any updates that occur while we are updating the counter.
                while(!myCounterMutex.try_lock()) {
                    // Wait for the interrupt to release its lock.
                }
    
                // Set the counter to a new value.
                exampleCounter.value = 500;
    
                // Updates will start again as soon as we unlock it.
                myCounterMutex.unlock();
            }
    
            // Mutexes for __irqLongFunction.
            static IrqMutexAccessor myBufferMutexes[2] = {
                IrqMutexAccessor(exampleBuffers[0].mutex),
                IrqMutexAccessor(exampleBuffers[1].mutex)
            };
    
            // Process buffers from __irqLongFunction.
            for (size_t i = 0; i < 2; i++)  {
                // Obtain a lock so we can read the data.
                if (!myBufferMutexes[i].try_lock()) continue;
                    // Check that the buffer isn't empty.
                    if (exampleBuffers[i].index == 0) {
                        myBufferMutexes[i].unlock(); // Don't forget to unlock.
                        continue;
                    }
    
                    // ... read and do something with the data here ...
                    exampleBuffer.index = 0;
    
                    myBufferMutexes[i].unlock();
                }
            }
        }
    }
    
    另请注意,我使用了 volatile在任何由中断例程读取或写入的变量上(除非该变量仅从中断访问,如 static bufferIndex 中的 __irqLongFunction 值)。我读过互斥锁消除了对 volatile 的一些需要在多线程代码中,但我认为这不适用于这里。 我是否使用了正确数量的 volatile ? 我用它:ExampleBuffer[].data[256] , ExampleBuffer[].index , 和 ExampleCounter.value .

    最佳答案

    我为长答案道歉,但也许它适合一个长问题。

    要回答你的第一个问题,我会说你的 IrqMutex 实现是不安全的。让我试着解释我在哪里看到问题。

    功能 nextAccessor

    std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }
    

    该函数存在竞争条件,因为增量运算符不是原子的,尽管它位于标记为 volatile 的原子值上。 .涉及3个操作:读取accessorIdEnum的当前值,增加它,然后写回结果。如果两个 IrqMutexAccessor s 是同时创建的,它们可能获得相同的 ID。

    功能 try_locktry_lock函数也有竞争条件。一个线程(例如主线程)可以进入 while循环,然后在获得所有权之前,另一个线程(例如中断)也可以进入 while循环并取得锁的所有权(返回 true )。然后第一个线程可以继续,移动到 owner = accessorId ,因此“也”取得了锁。所以两个线程(或者你的 main 线程和一个中断)可以 try_lock同时在一个无主互斥锁上并且都返回 true .

    通过 RAII 禁用中断

    我们可以通过使用 RAII 禁用中断来实现某种程度的简单性和封装,例如以下类:

    class InterruptLock {
    public:
        InterruptLock() { 
            prevInterruptState = currentInterruptState();
            disableInterrupts();
        }
    
        ~InterruptLock() { 
            restoreInterrupts(prevInterruptState);
        }
    private:
        int prevInterruptState; // Whatever type this should be for the platform
        InterruptLock(const InterruptLock&); // Not copy-constructable
    };
    

    我建议禁用中断以获得互斥体实现本身所需的原子性。例如类似的东西:

    bool try_lock(std::sig_atomic_t accessorId) {
        InterruptLock lock;
        if (owner == SIG_ATOMIC_MIN) {
            owner = accessorId;
            return true;
        }
        return false;
    }
    bool unlock(std::sig_atomic_t accessorId) {
        InterruptLock lock;
        if (owner == accessorId) {
            owner = SIG_ATOMIC_MIN;
            return true;
        }
        return false;
    }
    

    根据您的平台,这可能看起来不同,但您明白了。

    正如您所说,这提供了一个平台,可以将通用代码中的禁用和启用中断抽象出来,并将其封装到这一类中。

    互斥和中断

    说了我将如何考虑实现互斥类之后,我实际上不会为您的用例使用互斥类。正如您所指出的,互斥锁与中断并不能很好地配合使用,因为中断无法“阻止”尝试获取互斥锁。出于这个原因,对于直接与中断交换数据的代码,我会强烈考虑直接禁用中断(在主“线程”接触数据的很短的时间内)。

    所以你的计数器可能看起来像这样:

    volatile long long int exampleCounter;
    
    void __irqQuickFunction(void) {
        exampleCounter++;
    }
    ...
    // Change counter value
    if (EVERY_ONCE_IN_A_WHILE) {
        InterruptLock lock;
        exampleCounter = 500;
    }
    

    在我看来,这更容易阅读,更容易推理,并且在出现争用(即错过计时器节拍)时不会“滑倒”。

    关于缓冲区用例,我强烈建议不要为多个中断周期持有锁。锁/互斥锁应该只保留“接触”一块内存所需的最轻微的时刻——刚好足以读取或写入它。进来,出去。

    所以这就是缓冲示例的样子:

    struct ExampleBuffer {
        char data[256];
    } exampleBuffers[2];
    
    ExampleBuffer* volatile bufferAwaitingConsumption = nullptr;
    ExampleBuffer* volatile freeBuffer = &exampleBuffers[1];
    
    const volatile char * const REGISTER;
    
    void __irqLongFunction(void) {
    
        static const char END_PACKET_SIGNAL = '\0';    
        static size_t index = 0;
        static ExampleBuffer* receiveBuffer = &exampleBuffers[0];
    
        // Get data from the hardware and modify the buffer here.
        const char c = *REGISTER;
        receiveBuffer->data[index++] = c;
    
        // End of packet?
        if (c == END_PACKET_SIGNAL) {
            // Make the packet available to the consumer
            bufferAwaitingConsumption = receiveBuffer;
            // Move on to the next buffer
            receiveBuffer = freeBuffer;
            freeBuffer = nullptr;
            index = 0;
        }
    }
    
    
    int main(void) {
    
        while (true) {
    
            // Fetch packet from shared variable
            ExampleBuffer* packet;
            {
                InterruptLock lock;
                packet = bufferAwaitingConsumption;
                bufferAwaitingConsumption = nullptr;
            }
    
            if (packet) {
                // ... read and do something with the data here ...
    
                // Once we're done with the buffer, we need to release it back to the producer
                {
                    InterruptLock lock;
                    freeBuffer = packet;
                }
            }
        }
    }
    

    这段代码可以说更容易推理,因为中断和主循环之间只有两个共享内存位置:一个将数据包从中断传递到主循环,另一个将空缓冲区传递回中断。我们也只接触“锁定”下的那些变量,并且只在“移动”值所需的最短时间。 (为简单起见,当主循环需要很长时间才能释放缓冲区时,我跳过了缓冲区溢出逻辑)。

    确实,在这种情况下甚至可能不需要锁,因为我们只是读取和写入简单的值,但是禁用中断的成本并不高,否则犯错误的风险在我看来是不值得的观点。

    编辑

    正如评论中所指出的,上述解决方案旨在仅解决多线程问题,并省略了溢出检查。这是更完整的解决方案,在溢出条件下应该是健壮的:
    const size_t BUFFER_COUNT = 2; 
    
    struct ExampleBuffer {
        char data[256];
        ExampleBuffer* next;
    } exampleBuffers[BUFFER_COUNT];
    
    volatile size_t overflowCount = 0;
    
    class BufferList {
    public:
        BufferList() : first(nullptr), last(nullptr) { }
    
        // Atomic enqueue
        void enqueue(ExampleBuffer* buffer) {
            InterruptLock lock;
            if (last)
                last->next = buffer;
            else {
                first = buffer;
                last = buffer;
            }
        }
    
        // Atomic dequeue (or returns null)
        ExampleBuffer* dequeueOrNull() {
            InterruptLock lock;
            ExampleBuffer* result = first;
            if (first) {
                first = first->next;
                if (!first)
                    last = nullptr;
            }
            return result;
        }
    private:
        ExampleBuffer* first;
        ExampleBuffer* last;
    } freeBuffers, buffersAwaitingConsumption;
    
    const volatile char * const REGISTER;
    
    void __irqLongFunction(void) {
    
        static const char END_PACKET_SIGNAL = '\0';    
        static size_t index = 0;
        static ExampleBuffer* receiveBuffer = &exampleBuffers[0];
    
        // Recovery from overflow?
        if (!receiveBuffer) {
            // Try get another free buffer
            receiveBuffer = freeBuffers.dequeueOrNull();
            // Still no buffer?
            if (!receiveBuffer) {
                overflowCount++;
                return; 
            }
        }
    
        // Get data from the hardware and modify the buffer here.
        const char c = *REGISTER;
    
        if (index < sizeof(receiveBuffer->data))
            receiveBuffer->data[index++] = c;
    
        // End of packet, or out of space?
        if (c == END_PACKET_SIGNAL) {
            // Make the packet available to the consumer
            buffersAwaitingConsumption.enqueue(receiveBuffer);
            // Move on to the next free buffer
            receiveBuffer = freeBuffers.dequeueOrNull();
            index = 0;
        }
    }
    
    size_t getAndResetOverflowCount() {
        InterruptLock lock;
        size_t result = overflowCount;
        overflowCount = 0;
        return result;
    }
    
    
    int main(void) {
    
        // All buffers are free at the start
        for (int i = 0; i < BUFFER_COUNT; i++)
            freeBuffers.enqueue(&exampleBuffers[i]);
    
        while (true) {
    
            // Fetch packet from shared variable
            ExampleBuffer* packet = dequeueOrNull();
    
            if (packet) {
                // ... read and do something with the data here ...
    
                // Once we're done with the buffer, we need to release it back to the producer
                freeBuffers.enqueue(packet);
            }
    
            size_t overflowBytes = getAndResetOverflowCount();
            if (overflowBytes) {
                // ...
            }
        }
    }
    

    关键变化:
  • 如果中断用完可用缓冲区,它将恢复
  • 如果中断在没有接收缓冲区的情况下接收到数据,它将通过 getAndResetOverflowCount 将其传达给主线程。
  • 如果您不断收到缓冲区溢出,您可以简单地增加缓冲区计数
  • 我将多线程访问封装到一个队列类中,实现为一个链表( BufferList ),它支持原子出队和入队。前面的示例也使用了队列,但长度为 0-1(一个项目已入队或未入队),因此队列的实现只是一个变量。在空闲缓冲区用完的情况下,接收队列可能有 2 个项目,因此我将其升级为适当的队列,而不是添加更多共享变量。
  • 关于c++ - 带中断的互斥安全(嵌入式固件),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27409024/

    有关c++ - 带中断的互斥安全(嵌入式固件)的更多相关文章

    1. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

      我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

    2. ruby - 如何使用 Ruby aws/s3 Gem 生成安全 URL 以从 s3 下载文件 - 2

      我正在编写一个小脚本来定位aws存储桶中的特定文件,并创建一个临时验证的url以发送给同事。(理想情况下,这将创建类似于在控制台上右键单击存储桶中的文件并复制链接地址的结果)。我研究过回形针,它似乎不符合这个标准,但我可能只是不知道它的全部功能。我尝试了以下方法:defauthenticated_url(file_name,bucket)AWS::S3::S3Object.url_for(file_name,bucket,:secure=>true,:expires=>20*60)end产生这种类型的结果:...-1.amazonaws.com/file_path/file.zip.A

    3. ruby - 如何安全地删除文件? - 2

      在Ruby中是否有Gem或安全删除文件的方法?我想避免系统上可能不存在的外部程序。“安全删除”指的是覆盖文件内容。 最佳答案 如果您使用的是*nix,一个很好的方法是使用exec/open3/open4调用shred:`shred-fxuz#{filename}`http://www.gnu.org/s/coreutils/manual/html_node/shred-invocation.html检查这个类似的帖子:Writingafileshredderinpythonorruby?

    4. ruby - 使用 `+=` 和 `send` 方法 - 2

      如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

    5. ruby - 用 YAML.load 解析 json 安全吗? - 2

      我正在使用ruby2.1.0我有一个json文件。例如:test.json{"item":[{"apple":1},{"banana":2}]}用YAML.load加载这个文件安全吗?YAML.load(File.read('test.json'))我正在尝试加载一个json或yaml格式的文件。 最佳答案 YAML可以加载JSONYAML.load('{"something":"test","other":4}')=>{"something"=>"test","other"=>4}JSON将无法加载YAML。JSON.load("

    6. ruby - ruby 乘法语句中星号中断语法前的空格 - 2

      在添加一些空格以使代码更具可读性时(与上面的代码对齐),我遇到了这个:classCdefx42endendm=C.new现在这将给出“错误数量的参数”:m.x*m.x这将给出“语法错误,意外的tSTAR,期待$end”:2/m.x*m.x这里的解析器到底发生了什么?我使用Ruby1.9.2和2.1.5进行了测试。 最佳答案 *用于运算符(42*42)和参数解包(myfun*[42,42])。当你这样做时:m.x*m.x2/m.x*m.xRuby将此解释为参数解包,而不是*运算符(即乘法)。如果您不熟悉它,参数解包(有时也称为“spl

    7. ruby - 如何计算 Liquid 中的变量 +1 - 2

      我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

    8. ruby - 在 ASP 页面上 Mechanize 中断 - 2

      require'mechanize'agent=Mechanize.newlogin=agent.get('http://www.schoolnet.ch/DE/HomeDE.htm')agent.clicklogin.link_withtext:/Login/然后我得到Mechanize::UnsupportedSchemeError。 最佳答案 Mechanize不支持javascript但您可以将搜索字段添加到表单并为其分配搜索词并使用mechanize提交表单form=page.forms.firstform.add_fie

    9. ruby - 可以正常中断的来自 Rake 的长时间运行的 shell 命令? - 2

      在几个项目中,我希望有一个类似rakeserver的rake任务,它将通过任何需要的方式开始为该应用程序提供服务。这是一个示例:task:serverdo%x{bundleexecrackup-p1234}end这行得通,但是当我准备停止它时,按Ctrl+c并没有正常关闭;它中断了Rake任务本身,它说rakeaborted!并给出堆栈跟踪。在某些情况下,我必须执行Ctrl+c两次。我可能可以用Signal.trap写一些东西来更优雅地中断它。有没有更简单的方法? 最佳答案 trap('SIGINT'){puts"Yourmessa

    10. ruby-on-rails - 安全地显示使用回形针 gem 上传的图像 - 2

      默认情况下:回形针gem将所有附件存储在公共(public)目录中。出于安全原因,我不想将附件存储在公共(public)目录中,所以我将它们保存在应用程序根目录的uploads目录中:classPost我没有指定url选项,因为我不希望每个图像附件都有一个url。如果指定了url:那么拥有该url的任何人都可以访问该图像。这是不安全的。在user#show页面中:我想实际显示图像。如果我使用所有回形针默认设置,那么我可以这样做,因为图像将在公共(public)目录中并且图像将具有一个url:Someimage:看来,如果我将图像附件保存在公共(public)目录之外并且不指定url(同

    随机推荐