草庐IT

c# - 合并暂时关闭的事件时提高效率和公平性

coder 2023-07-11 原文

我有一堆线程生成 A 类型和 B 类型的事件。

我的程序接收这些事件,将它们包装在一条消息中并通过网络发送它们。一条消息可以包含一个 A 事件、一个 B 事件,或者一个 A 事件和一个 B 事件:

SendMessage(new Message(a: 1,    b: null));
SendMessage(new Message(a: null, b: 2   ));
SendMessage(new Message(a: 3,    b: 4   ));

A 类型的事件发生的频率很高,而 B 类型的事件发生的频率要低得多。因此,当一个线程生成一个 B 事件时,我的程序稍等片刻,看看另一个线程是否生成一个 A 事件并结合 A 事件和 B 事件(如果可能的话)。

这是我的代码:

object gate = new object();
int? pendingB;

Message WrapA(int a, int millisecondsTimeout)
{
    int? b;

    lock (gate)
    {
        b = pendingB;
        pendingB = null;
        Monitor.Pulse(gate);
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    lock (gate)
    {
        if (pendingB == null)
        {
            pendingB = b;
            Monitor.Wait(gate, millisecondsTimeout);
            if (pendingB != b) return null;
            pendingB = null;
        }
    }

    return new Message(null, b);
}

到目前为止,这是有效的。但是,有两个问题:

  • 如果有很多 A 事件和很多 B 事件,算法不是很有效:只有一定比例的 B 事件附加到 A 事件,即使有足够的 A 事件也是如此。

  • 如果一段时间内没有生成A事件(不常见,但并非不可能),该算法是完全不公平的:一个生成B事件的线程有每次都等待,而所有其他线程可以立即发送它们的 B 事件。

如何提高算法的效率和公平性?

<子> 约束条件:
WrapAWrapB 必须在确定的短时间内终止。
SendMessage 必须在任何锁之外调用。
• 除了gate 之外没有可用的同步机制。
• 没有额外的线程、任务、计时器等可用。
• 由于A 类型的事件在正常情况下发生得如此频繁,因此WrapB 中的忙等待是可以的。


下面是一个可以作为基准的测试程序:

public static class Program
{
    static int counter0 = 0;
    static int counterA = 0;
    static int counterB = 0;
    static int counterAB = 0;

    static void SendMessage(Message m)
    {
        if (m != null)
            if (m.a != null)
                if (m.b != null)
                    Interlocked.Increment(ref counterAB);
                else
                    Interlocked.Increment(ref counterA);
            else
                if (m.b != null)
                    Interlocked.Increment(ref counterB);
                else
                    Interlocked.Increment(ref counter0);
    }

    static Thread[] Start(int threadCount, int eventCount,
        int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
    {
        Thread[] threads = new Thread[threadCount * eventCount];
        for (int i = 0; i < threadCount; i++)
        {
            for (int j = 0; j < eventCount; j++)
            {
                int k = i * 1000 + j;
                int l = j * eventInterval + i;
                threads[i * eventCount + j] = new Thread(() =>
                {
                    Thread.Sleep(l);
                    SendMessage(wrap(k, wrapTimeout));
                });
                threads[i * eventCount + j].Start();
            }
        }
        return threads;
    }

    static void Join(params Thread[] threads)
    {
        for (int i = 0; i < threads.Length; i++)
        {
            threads[i].Join();
        }
    }

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        // Only A events
        var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
        Join(t0);

        // A and B events
        var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
        var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
        Join(t1);
        Join(t2);

        // Only B events
        var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", counter0);
        Console.WriteLine("A:  {0}", counterA);
        Console.WriteLine("B:  {0}", counterB);
        Console.WriteLine("AB: {0}", counterAB);

        Console.WriteLine("Generated A: {0}, Sent A: {1}",
            10 * 40 + 10 * 40, counterA + counterAB);
        Console.WriteLine("Generated B: {0}, Sent B: {1}",
            10 * 10 + 10 * 20, counterB + counterAB);
    }
}

最佳答案

为了好玩,这里有一个无锁的实现:

public sealed class MessageWrapper
{
    private int pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int b = Interlocked.Exchange(ref pendingB, -1);
        return new Message(a, b == -1 ? null : b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        var sw = new SpinWait();
        while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
        {
            // Spin
            sw.SpinOnce();

            if (sw.NextSpinWillYield)
            {
                // Let us make progress instead of yielding the processor
                // (avoid context switch)
                return new Message(null, b);
            }
        }

        return null;
    }
}

结果

原始实现:

00:00:02.0433298
0:  0
A:  733
B:  233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

无锁实现:

00:00:01.2546310
0:  0
A:  717
B:  217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

更新

不幸的是,上面的实现有一个错误加上一些缺点。这是一个改进的版本:

public class MessageWrapper
{
    private int pendingB = EMPTY;
    private const int EMPTY = -1;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;
        int count = 0;
        while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
        {
            if (count % 7 == 0)
            {
                Thread.Sleep(0);
            }
            else if (count % 23 == 0)
            {
                Thread.Sleep(1);
            }
            else
            {
                Thread.Yield();
            }
            if (++count == 480)
            {
                return new Message(a, null);
            }
        }
        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int count = 0;
        while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
        {
            // Spin
            Thread.SpinWait((4 << count++));
            if (count > 10)
            {
                // We didn't manage to place our payload.
                // Let's send it ourselves:
                return new Message(null, b);
            }
        }

        // We placed our payload. 
        // Wait some more to see if some WrapA snatches it.
        while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
        {
            Thread.SpinWait((4 << count++));
            if (count > 20)
            {
                // No WrapA came along. Pity, we will have to send it ourselves
                int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
                return payload == b ? new Message(null, b) : null;
            }
        }
        return null;
    }
}

结果:

OP的实现

00:00:02.1389474
0:  0
A:  722
B:  222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

第二次实现:

00:00:01.2752425
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

关于c# - 合并暂时关闭的事件时提高效率和公平性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14983713/

有关c# - 合并暂时关闭的事件时提高效率和公平性的更多相关文章

  1. ruby - 如果指定键的值在数组中相同,如何合并哈希 - 2

    我有一个这样的哈希数组:[{:foo=>2,:date=>Sat,01Sep2014},{:foo2=>2,:date=>Sat,02Sep2014},{:foo3=>3,:date=>Sat,01Sep2014},{:foo4=>4,:date=>Sat,03Sep2014},{:foo5=>5,:date=>Sat,02Sep2014}]如果:date相同,我想合并哈希值。我对上面数组的期望是:[{:foo=>2,:foo3=>3,:date=>Sat,01Sep2014},{:foo2=>2,:foo5=>5:date=>Sat,02Sep2014},{:foo4=>4,:dat

  2. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

  3. C# 到 Ruby sha1 base64 编码 - 2

    我正在尝试在Ruby中复制Convert.ToBase64String()行为。这是我的C#代码:varsha1=newSHA1CryptoServiceProvider();varpasswordBytes=Encoding.UTF8.GetBytes("password");varpasswordHash=sha1.ComputeHash(passwordBytes);returnConvert.ToBase64String(passwordHash);//returns"W6ph5Mm5Pz8GgiULbPgzG37mj9g="当我在Ruby中尝试同样的事情时,我得到了相同sha

  4. ruby - 如何关闭 ruby​​ gem "Spreadsheet?"中的文件 - 2

    下面的代码在我第一次运行它时就可以正常工作:require'rubygems'require'spreadsheet'book=Spreadsheet.open'/Users/me/myruby/Mywks.xls'sheet=book.worksheet0row=sheet.row(1)putsrow[1]book.write'/Users/me/myruby/Mywks.xls'当我再次运行它时,我会收到更多消息,例如:/Library/Ruby/Gems/1.8/gems/spreadsheet-0.6.5.9/lib/spreadsheet/excel/reader.rb:11

  5. ruby-on-rails - 事件管理员日期过滤器日期格式自定义 - 2

    是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s

  6. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  7. git使用常见问题(提交代码,合并冲突) - 2

    文章目录git常用命令(简介,详细参数往下看)Git提交代码步骤gitpullgitstatusgitaddgitcommitgitpushgit代码冲突合并问题方法一:放弃本地代码方法二:合并代码常用命令以及详细参数gitadd将文件添加到仓库:gitdiff比较文件异同gitlog查看历史记录gitreset代码回滚版本库相关操作远程仓库相关操作分支相关操作创建分支查看分支:gitbranch合并分支:gitmerge删除分支:gitbranch-ddev查看分支合并图:gitlog–graph–pretty=oneline–abbrev-commit撤消某次提交git用户名密码相关配置g

  8. ruby-on-rails - 事件记录 : Select max of limit - 2

    我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).

  9. Ruby 哈希直接访问与合并 - 2

    有什么区别:@attr[:field]=new_value和@attr.merge(:field=>new_value) 最佳答案 如果您使用的是merge!而不是merge,则没有区别。唯一的区别是您可以在合并参数中使用多个字段(意思是:另一个散列)。例子:h1={"a"=>100,"b"=>200}h2={"b"=>254,"c"=>300}h3=h1.merge(h2)putsh1#=>{"a"=>100,"b"=>200}putsh3#=>{"a"=>100,"b"=>254,"c"=>300}h1.merge!(h2)pu

  10. ruby-on-rails - Ruby 的 'open_uri' 是否在读取或失败后可靠地关闭套接字? - 2

    一段时间以来,我一直在使用open_uri下拉ftp路径作为数据源,但突然发现我几乎连续不断地收到“530抱歉,允许的最大客户端数(95)已经连接。”我不确定我的代码是否有问题,或者是否是其他人在访问服务器,不幸的是,我无法真正确定谁有问题。本质上,我正在读取FTPURI:defself.read_uri(uri)beginuri=open(uri).readuri=="Error"?nil:urirescueOpenURI::HTTPErrornilendend我猜我需要在这里添加一些额外的错误处理代码...我想确保我采取一切预防措施来关闭所有连接,这样我的连接就不是问题所在,但是我

随机推荐