草庐IT

探索:优雅地实现异步方法的并行执行

0611163 2023-03-28 原文

接上篇 通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制

(废话删了)

没有语法糖,代码一样写,java8没有语法糖,一样能写出高性能代码。但有了C# async await语法糖,水平一般的普通的业务程序员,哪怕很菜,也能写出高性能高吞吐量的代码。
标题我用了"探索"这个词,所以有没有更好的实践?

注意:在本文最后补充了最佳实践

ElasticSearch的性能

下面通过一个es的查询,来展示并行异步代码的实用价值。
下面是真实环境中部署的服务的测试截图:

379次es查询,仅需0.185秒(当然耗时会有波动,零点几秒都是正常的)。
es最怕的是什么?是慢查询,是条件复杂的大范围模糊查询。
我的策略是多次精确查询,这样可以利用es极高的吞吐能力。

有多快?

  1. 上述截图只是其中一个测试,查询分析的时间范围较小(一个多月的数据量)
  2. 另一个服务接口,分析半年的数据量,大约72亿+18亿=90亿,通过几千次es请求,从这些数据中分析出结果,仅需几秒。

为什么这么快?

  1. es集群的服务器较多,内存很大(300G,当然服务器上不只有es),集群本身的吞吐量很高。
  2. 并行异步性能高且吞吐量大!而C#语法糖使得并行异步容易编写。

为什么要使用并行异步?

既然查询次数多,单线程或同步方式肯定是不行的,必须并行查询。
并行代码,python、java也能写。
但前同事写的在双层循环体中多次查询es的python代码,就是同步方式。为什么不并行查询呢?并行肯定可以写,但是能不写就不写,为什么?因为写起来复杂,不好写,不好调试,还容易写出BUG。
重点是什么?不仅要写并行代码,还要写的简单,不破坏代码原有逻辑结构。

通过两个测试截图,形象地理解并行异步

1. 顺序执行的异步

2. 并行执行的异步

实际项目中顺序执行的异步

普通的异步方法大家都会写,用async await就行了,很简单。下面是我自己写的,主要是在双循环中多次异步请求(由于是实际代码,不是Demo,所以代码有点长,可以大致看一下,主要看await xxx是怎样写的):

/// <summary>
/// xxx查询
/// </summary>
public async Task<List<AccompanyInfo>> Query2(string strStartTime, string strEndTime, int kpCountThreshold, int countThreshold, int distanceThreshold, int timeThreshold, List<PeopleCluster> peopleClusterList)
{
    List<AccompanyInfo> resultList = new List<AccompanyInfo>();
    Stopwatch sw = Stopwatch.StartNew();

    //创建字典
    Dictionary<string, PeopleCluster> clusterIdPeopleDict = new Dictionary<string, PeopleCluster>();
    foreach (PeopleCluster peopleCluster in peopleClusterList)
    {
        foreach (string clusterId in peopleCluster.ClusterIds)
        {
            if (!clusterIdPeopleDict.ContainsKey(clusterId))
            {
                clusterIdPeopleDict.Add(clusterId, peopleCluster);
            }
        }
    }

    int queryCount = 0;
    Dictionary<string, AccompanyInfo> dict = new Dictionary<string, AccompanyInfo>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = await ServiceFactory.Get<PeopleFeatureQueryService>().Query(strStartTime, strEndTime, people1);
        queryCount++;
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            DateTime capturedTime = DateTime.ParseExact(peopleFeatureInfo1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
            string strStartTime2 = capturedTime.AddSeconds(-timeThreshold).ToString("yyyyMMddHHmmss");
            string strEndTime2 = capturedTime.AddSeconds(timeThreshold).ToString("yyyyMMddHHmmss");
            List<PeopleFeatureInfo> peopleFeatureList2 = await ServiceFactory.Get<PeopleFeatureQueryService>().QueryExcludeSelf(strStartTime2, strEndTime2, people1);
            queryCount++;
            if (peopleFeatureList2.Count > 0)
            {
                foreach (PeopleFeatureInfo peopleFeatureInfo2 in peopleFeatureList2)
                {
                    string key = null;
                    PeopleCluster people2 = null;
                    string people2ClusterId = null;
                    if (clusterIdPeopleDict.ContainsKey(peopleFeatureInfo2.cluster_id.ToString()))
                    {
                        people2 = clusterIdPeopleDict[peopleFeatureInfo2.cluster_id.ToString()];
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2.ClusterIds)}";
                    }
                    else
                    {
                        people2ClusterId = peopleFeatureInfo2.cluster_id.ToString();
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2ClusterId)}";
                    }

                    double distance = LngLatUtil.CalcDistance(peopleFeatureInfo1.Longitude, peopleFeatureInfo1.Latitude, peopleFeatureInfo2.Longitude, peopleFeatureInfo2.Latitude);
                    if (distance > distanceThreshold) continue;

                    AccompanyInfo accompanyInfo;
                    if (dict.ContainsKey(key))
                    {
                        accompanyInfo = dict[key];
                    }
                    else
                    {
                        accompanyInfo = new AccompanyInfo();
                        dict.Add(key, accompanyInfo);
                    }

                    accompanyInfo.People1 = people1;
                    if (people2 != null)
                    {
                        accompanyInfo.People2 = people2;
                    }
                    else
                    {
                        accompanyInfo.ClusterId2 = people2ClusterId;
                    }

                    AccompanyItem accompanyItem = new AccompanyItem();
                    accompanyItem.Info1 = peopleFeatureInfo1;
                    accompanyItem.Info2 = peopleFeatureInfo2;
                    accompanyInfo.List.Add(accompanyItem);

                    accompanyInfo.Count++;

                    resultList.Add(accompanyInfo);
                }
            }
        }
    }

    resultList = resultList.FindAll(a => (a.People2 != null && a.Count >= kpCountThreshold) || a.Count >= countThreshold);

    //去重
    int beforeDistinctCount = resultList.Count;
    resultList = resultList.DistinctBy(a =>
    {
        string str1 = string.Join(",", a.People1.ClusterIds);
        string str2 = a.People2 != null ? string.Join(",", a.People2.ClusterIds) : string.Empty;
        string str3 = a.ClusterId2 ?? string.Empty;
        StringBuilder sb = new StringBuilder();
        foreach (AccompanyItem item in a.List)
        {
            var info2 = item.Info2;
            sb.Append($"{info2.camera_id},{info2.captured_time},{info2.cluster_id}");
        }
        return $"{str1}_{str2}_{str3}_{sb}";
    }).ToList();

    sw.Stop();
    string msg = $"xxx查询,耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,查询次数:{queryCount},去重:{beforeDistinctCount}-->{resultList.Count}";
    Console.WriteLine(msg);
    LogUtil.Info(msg);

    return resultList;
}

实际项目中并行执行的异步

上述代码逻辑上是没有问题的,但性能上有问题。在双循环中多次请求,虽然用了async await异步,但不是并行,耗时会很长,如何优化?下面是并行异步的写法(由于是实际代码,不是Demo,所以代码有点长,可以大致看一下,主要看tasks1和tasks2怎样组织,怎样await,以及返回值怎么获取):

/// <summary>
/// xxx查询
/// </summary>
public async Task<List<AccompanyInfo>> Query(string strStartTime, string strEndTime, int kpCountThreshold, int countThreshold, int distanceThreshold, int timeThreshold, List<PeopleCluster> peopleClusterList)
{
    List<AccompanyInfo> resultList = new List<AccompanyInfo>();
    Stopwatch sw = Stopwatch.StartNew();

    //创建字典
    Dictionary<string, PeopleCluster> clusterIdPeopleDict = new Dictionary<string, PeopleCluster>();
    foreach (PeopleCluster peopleCluster in peopleClusterList)
    {
        foreach (string clusterId in peopleCluster.ClusterIds)
        {
            if (!clusterIdPeopleDict.ContainsKey(clusterId))
            {
                clusterIdPeopleDict.Add(clusterId, peopleCluster);
            }
        }
    }

    //组织第一层循环task
    Dictionary<PeopleCluster, Task<List<PeopleFeatureInfo>>> tasks1 = new Dictionary<PeopleCluster, Task<List<PeopleFeatureInfo>>>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        var task1 = ServiceFactory.Get<PeopleFeatureQueryService>().Query(strStartTime, strEndTime, people1);
        tasks1.Add(people1, task1);
    }

    //计算第一层循环task并缓存结果,组织第二层循环task
    Dictionary<string, Task<List<PeopleFeatureInfo>>> tasks2 = new Dictionary<string, Task<List<PeopleFeatureInfo>>>();
    Dictionary<PeopleCluster, List<PeopleFeatureInfo>> cache1 = new Dictionary<PeopleCluster, List<PeopleFeatureInfo>>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = await tasks1[people1];
        cache1.Add(people1, peopleFeatureList);
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            DateTime capturedTime = DateTime.ParseExact(peopleFeatureInfo1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
            string strStartTime2 = capturedTime.AddSeconds(-timeThreshold).ToString("yyyyMMddHHmmss");
            string strEndTime2 = capturedTime.AddSeconds(timeThreshold).ToString("yyyyMMddHHmmss");
            var task2 = ServiceFactory.Get<PeopleFeatureQueryService>().QueryExcludeSelf(strStartTime2, strEndTime2, people1);
            string task2Key = $"{strStartTime2}_{strEndTime2}_{string.Join(",", people1.ClusterIds)}";
            tasks2.TryAdd(task2Key, task2);
        }
    }

    //读取第一层循环task缓存结果,计算第二层循环task
    Dictionary<string, AccompanyInfo> dict = new Dictionary<string, AccompanyInfo>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = cache1[people1];
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            DateTime capturedTime = DateTime.ParseExact(peopleFeatureInfo1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
            string strStartTime2 = capturedTime.AddSeconds(-timeThreshold).ToString("yyyyMMddHHmmss");
            string strEndTime2 = capturedTime.AddSeconds(timeThreshold).ToString("yyyyMMddHHmmss");
            string task2Key = $"{strStartTime2}_{strEndTime2}_{string.Join(",", people1.ClusterIds)}";
            List<PeopleFeatureInfo> peopleFeatureList2 = await tasks2[task2Key];
            if (peopleFeatureList2.Count > 0)
            {
                foreach (PeopleFeatureInfo peopleFeatureInfo2 in peopleFeatureList2)
                {
                    string key = null;
                    PeopleCluster people2 = null;
                    string people2ClusterId = null;
                    if (clusterIdPeopleDict.ContainsKey(peopleFeatureInfo2.cluster_id.ToString()))
                    {
                        people2 = clusterIdPeopleDict[peopleFeatureInfo2.cluster_id.ToString()];
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2.ClusterIds)}";
                    }
                    else
                    {
                        people2ClusterId = peopleFeatureInfo2.cluster_id.ToString();
                        key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2ClusterId)}";
                    }

                    double distance = LngLatUtil.CalcDistance(peopleFeatureInfo1.Longitude, peopleFeatureInfo1.Latitude, peopleFeatureInfo2.Longitude, peopleFeatureInfo2.Latitude);
                    if (distance > distanceThreshold) continue;

                    AccompanyInfo accompanyInfo;
                    if (dict.ContainsKey(key))
                    {
                        accompanyInfo = dict[key];
                    }
                    else
                    {
                        accompanyInfo = new AccompanyInfo();
                        dict.Add(key, accompanyInfo);
                    }

                    accompanyInfo.People1 = people1;
                    if (people2 != null)
                    {
                        accompanyInfo.People2 = people2;
                    }
                    else
                    {
                        accompanyInfo.ClusterId2 = people2ClusterId;
                    }

                    AccompanyItem accompanyItem = new AccompanyItem();
                    accompanyItem.Info1 = peopleFeatureInfo1;
                    accompanyItem.Info2 = peopleFeatureInfo2;
                    accompanyInfo.List.Add(accompanyItem);

                    accompanyInfo.Count++;

                    resultList.Add(accompanyInfo);
                }
            }
        }
    }

    resultList = resultList.FindAll(a => (a.People2 != null && a.Count >= kpCountThreshold) || a.Count >= countThreshold);

    //去重
    int beforeDistinctCount = resultList.Count;
    resultList = resultList.DistinctBy(a =>
    {
        string str1 = string.Join(",", a.People1.ClusterIds);
        string str2 = a.People2 != null ? string.Join(",", a.People2.ClusterIds) : string.Empty;
        string str3 = a.ClusterId2 ?? string.Empty;
        StringBuilder sb = new StringBuilder();
        foreach (AccompanyItem item in a.List)
        {
            var info2 = item.Info2;
            sb.Append($"{info2.camera_id},{info2.captured_time},{info2.cluster_id}");
        }
        return $"{str1}_{str2}_{str3}_{sb}";
    }).ToList();

    //排序
    foreach (AccompanyInfo item in resultList)
    {
        item.List.Sort((a, b) => -string.Compare(a.Info1.captured_time, b.Info1.captured_time));
    }

    sw.Stop();
    string msg = $"xxx查询,耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,查询次数:{tasks1.Count + tasks2.Count},去重:{beforeDistinctCount}-->{resultList.Count}";
    Console.WriteLine(msg);
    LogUtil.Info(msg);

    return resultList;
}

上述代码说明

  1. 为了使异步并行化,业务逻辑的双层循环要写三遍。第三遍双层循环代码结构和前面所述普通的异步方法中的双层循环代码结构是一样的。
  2. 第一、二遍双层循环代码是多出来的。第一遍只有一层循环。第二遍有两层循环(第三层循环是处理数据和请求无关,这里不讨论)。
  3. 写的时候,可以先写好普通的异步方法,然后再通过复粘贴修改成并行化的异步方法。当然,脑力好的可以直接写。

几点说明

  1. 我觉得没有人这样写过!所以分享给大家。
  2. 并行代码,很多人都会写,java、python也能写,但问题是,水平一般的普通的业务程序员,如何无脑地写这种并行代码?
  3. 最重要的,是怎么优雅地接收返回值
  4. 最差的写法,例如java的CompletableFuture,和复杂的业务逻辑结合起来,写法就很复杂了。
  5. 其次的写法,也是官方文档上有的,大家都能想到的写法,例如:
List<PeopleFeatureInfo>[] listArray = await Task.WhenAll(tasks2.Values);

在双循环体中,怎么拿结果?肯定能拿,但又要思考怎么写了不是?
而我的写法,在双循环体中是可以直接拿结果的:

List<PeopleFeatureInfo> list = await tasks2[task2Key];

并行代码用Python怎么写?

只放C#代码没有说服力,python代码我不太会写,不过,一个同事python写的很6,他写的数据挖掘代码很多都是并行,例如:

def get_es_multiprocess(index_list, people_list, core_percent, rev_clusterid_idcard_dict):
    '''
    多进程读取es数据,转为整个数据帧,按时间排序
    :return: 规模较大的数据帧
    '''
    col_list = ["cluster_id", "camera_id", "captured_time"]
    pool = Pool(processes=int(mp.cpu_count() * core_percent))
    input_list = [(i, people_list, col_list) for i in index_list]
    res = pool.map(get_es, input_list)
    if not res:
        return None
    pool.close()
    pool.join()
    df_all = pd.DataFrame(columns=col_list+['longitude', 'latitude'])
    for df in res:
        df_all = pd.concat([df_all, df])
    # 这里强制转换为字符串!
    df_all['cluster_id_'] = df_all['cluster_id'].apply(lambda x: rev_clusterid_idcard_dict[str(x)])
    del df_all['cluster_id']
    df_all.rename(columns={'cluster_id_': 'cluster_id'}, inplace=True)
    df_all.sort_values(by='captured_time', inplace=True)
    print('=' * 100)
    print('整个数据(聚类前):')
    print(df_all.info())
    cluster_id_list = [(i, df) for i, df in df_all.groupby(['cluster_id'])]
    cluster_id_list_split = [j for j in func(cluster_id_list, 1000000)]
    # todo 缩小数据集,用于调试!
    data_all = df_all.iloc[:, :]
    return data_all, cluster_id_list_split

上述python代码解析

  1. 核心代码:
res = pool.map(get_es, input_list)
...省略
pool.join()
...省略

核心代码说明:其中get_es是查询es的方法,应该不是异步方法,不过这不是重点
2. res是查询结果,通过并行的方式一次性查出来,放到res中,然后把结果再解出来
3. 注意,这只是单层循环,想想双层循环怎么写
4. pool.join()会阻塞当前线程,失去异步的好处,这个不好
5. 同事注释中写的是"多进程",是写错了吗?实际是多线程?还是多进程?
6. 当然,python是有async await异步写法的,应该不比C#差,只是同事没有使用
7. python代码,字符串太多,字符串是最不好维护的。我写的C#代码中的字符串里面都是强类型变量。

把脑力活变成体力活

照葫芦画瓢,把脑力活变成体力活,我又写了一个并行异步方法(业务逻辑依然有点复杂,主要看tasks1和tasks2怎样组织,怎样await,以及返回值怎么获取,注释"比对xxx"下面的代码和并行异步无关,可以略过):

/// <summary>
/// xxx查询
/// </summary>
public async Task<List<SameVehicleInfo>> Query(string strStartTime, string strEndTime, int kpCountThreshold, int timeThreshold, List<PeopleCluster> peopleClusterList)
{
    List<SameVehicleInfo> resultList = new List<SameVehicleInfo>();
    Stopwatch sw = Stopwatch.StartNew();

    //组织第一层循环task,查xxx
    Dictionary<PeopleCluster, Task<List<PeopleFeatureInfo>>> tasks1 = new Dictionary<PeopleCluster, Task<List<PeopleFeatureInfo>>>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        var task1 = ServiceFactory.Get<PeopleFeatureQueryService>().Query(strStartTime, strEndTime, people1);
        tasks1.Add(people1, task1);
    }

    //计算第一层循环task并缓存结果,组织第二层循环task,精确搜xxx
    Dictionary<string, Task<List<MotorVehicleInfo>>> tasks2 = new Dictionary<string, Task<List<MotorVehicleInfo>>>();
    Dictionary<PeopleCluster, List<PeopleFeatureInfo>> cache1 = new Dictionary<PeopleCluster, List<PeopleFeatureInfo>>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = await tasks1[people1];
        cache1.Add(people1, peopleFeatureList);
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            string task2Key = $"{peopleFeatureInfo1.camera_id}_{peopleFeatureInfo1.captured_time}";
            var task2 = ServiceFactory.Get<MotorVehicleQueryService>().QueryExact(peopleFeatureInfo1.camera_id, peopleFeatureInfo1.captured_time);
            tasks2.TryAdd(task2Key, task2);
        }
    }

    //读取第一层循环task缓存结果,计算第二层循环task
    Dictionary<PersonVehicleKey, PersonVehicleInfo> dictPersonVehicle = new Dictionary<PersonVehicleKey, PersonVehicleInfo>();
    foreach (PeopleCluster people1 in peopleClusterList)
    {
        List<PeopleFeatureInfo> peopleFeatureList = cache1[people1];
        foreach (PeopleFeatureInfo peopleFeatureInfo1 in peopleFeatureList)
        {
            string task2Key = $"{peopleFeatureInfo1.camera_id}_{peopleFeatureInfo1.captured_time}";
            List<MotorVehicleInfo> motorVehicleList = await tasks2[task2Key];
            motorVehicleList = motorVehicleList.DistinctBy(a => a.plate_no).ToList();
            foreach (MotorVehicleInfo motorVehicleInfo in motorVehicleList)
            {
                PersonVehicleKey key = new PersonVehicleKey(people1, motorVehicleInfo.plate_no);
                PersonVehicleInfo personVehicleInfo;
                if (dictPersonVehicle.ContainsKey(key))
                {
                    personVehicleInfo = dictPersonVehicle[key];
                }
                else
                {
                    personVehicleInfo = new PersonVehicleInfo()
                    {
                        People = people1,
                        PlateNo = motorVehicleInfo.plate_no,
                        List = new List<PeopleFeatureInfo>()
                    };
                    dictPersonVehicle.Add(key, personVehicleInfo);
                }
                personVehicleInfo.List.Add(peopleFeatureInfo1);
            }
        }
    }

    //比对xxx
    List<PersonVehicleKey> keys = dictPersonVehicle.Keys.ToList();
    Dictionary<string, SameVehicleInfo> dict = new Dictionary<string, SameVehicleInfo>();
    for (int i = 0; i < keys.Count - 1; i++)
    {
        for (int j = i + 1; j < keys.Count; j++)
        {
            var key1 = keys[i];
            var key2 = keys[j];
            var personVehicle1 = dictPersonVehicle[key1];
            var personVehicle2 = dictPersonVehicle[key2];
            if (key1.PlateNo == key2.PlateNo)
            {
                foreach (PeopleFeatureInfo peopleFeature1 in personVehicle1.List)
                {
                    double minTimeDiff = double.MaxValue;
                    int minIndex = -1;
                    for (int k = 0; k < personVehicle2.List.Count; k++)
                    {
                        PeopleFeatureInfo peopleFeature2 = personVehicle2.List[k];
                        DateTime capturedTime1 = DateTime.ParseExact(peopleFeature1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
                        DateTime capturedTime2 = DateTime.ParseExact(peopleFeature2.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
                        var timeDiff = Math.Abs(capturedTime2.Subtract(capturedTime1).TotalSeconds);
                        if (timeDiff < minTimeDiff)
                        {
                            minTimeDiff = timeDiff;
                            minIndex = k;
                        }
                    }
                    if (minIndex >= 0 && minTimeDiff <= timeThreshold * 60)
                    {
                        PeopleCluster people1 = key1.People;
                        PeopleCluster people2 = key2.People;
                        PeopleFeatureInfo peopleFeatureInfo2 = personVehicle2.List[minIndex];

                        string key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2.ClusterIds)}"; ;

                        SameVehicleInfo accompanyInfo;
                        if (dict.ContainsKey(key))
                        {
                            accompanyInfo = dict[key];
                        }
                        else
                        {
                            accompanyInfo = new SameVehicleInfo();
                            dict.Add(key, accompanyInfo);
                        }

                        accompanyInfo.People1 = people1;
                        accompanyInfo.People2 = people2;

                        SameVehicleItem accompanyItem = new SameVehicleItem();
                        accompanyItem.Info1 = peopleFeature1;
                        accompanyItem.Info2 = peopleFeatureInfo2;
                        accompanyInfo.List.Add(accompanyItem);

                        accompanyInfo.Count++;

                        resultList.Add(accompanyInfo);
                    }
                }
            }
        }
    }

    resultList = resultList.FindAll(a => a.Count >= kpCountThreshold);

    //筛选,排除xxx
    resultList = resultList.FindAll(a =>
    {
        if (string.Join(",", a.People1.ClusterIds) == string.Join(",", a.People2.ClusterIds))
        {
            return false;
        }
        return true;
    });

    //去重
    int beforeDistinctCount = resultList.Count;
    resultList = resultList.DistinctBy(a =>
    {
        string str1 = string.Join(",", a.People1.ClusterIds);
        string str2 = string.Join(",", a.People2.ClusterIds);
        StringBuilder sb = new StringBuilder();
        foreach (SameVehicleItem item in a.List)
        {
            var info2 = item.Info2;
            sb.Append($"{info2.camera_id},{info2.captured_time},{info2.cluster_id}");
        }
        return $"{str1}_{str2}_{sb}";
    }).ToList();

    //排序
    foreach (SameVehicleInfo item in resultList)
    {
        item.List.Sort((a, b) => -string.Compare(a.Info1.captured_time, b.Info1.captured_time));
    }

    sw.Stop();
    string msg = $"xxx查询,耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,查询次数:{tasks1.Count + tasks2.Count},去重:{beforeDistinctCount}-->{resultList.Count}";
    Console.WriteLine(msg);
    LogUtil.Info(msg);

    return resultList;
}

C#的优点

  1. 有人说:我们开发的低代码平台很优秀。C#:我就是低代码!
  2. 有人说:我们开发的平台功能很强大,支持写SQL、支持写脚本。C#:我就是脚本语言!
  3. 有人说:我们用spark、flink分布式。C#:并行异步高性能高吞吐,单机就可以,只要用到的例如kafka和es是集群就行。

后续:上述并行异步代码的依据

微软的官方文档:


上述文档地址

使用 Async 和 Await 的异步编程

后续:使用Parallel.ForEachAsync实现异步方法的并行执行(最佳实践)

/// <summary>
/// xxx查询
/// </summary>
public async Task<List<SameVehicleInfo>> Query(string strStartTime, string strEndTime, int kpCountThreshold, int timeThreshold, List<PeopleCluster> peopleClusterList)
{
    List<SameVehicleInfo> resultList = new List<SameVehicleInfo>();
    Stopwatch sw = Stopwatch.StartNew();

    //并行查询
    int queryCount = 0;
    ConcurrentDictionary<PersonVehicleKey, PersonVehicleInfo> dictPersonVehicle = new ConcurrentDictionary<PersonVehicleKey, PersonVehicleInfo>();
    await Parallel.ForEachAsync(peopleClusterList, new ParallelOptions() { MaxDegreeOfParallelism = 50 }, async (people1, c) =>
    {
        List<PeopleFeatureInfo> peopleFeatureList = await ServiceFactory.Get<PeopleFeatureQueryService>().Query(strStartTime, strEndTime, people1);
        Interlocked.Increment(ref queryCount);
        await Parallel.ForEachAsync(peopleFeatureList, new ParallelOptions() { MaxDegreeOfParallelism = 50 }, async (peopleFeatureInfo1, c) =>
        {
            List<MotorVehicleInfo> motorVehicleList = await ServiceFactory.Get<MotorVehicleQueryService>().QueryExact(peopleFeatureInfo1.camera_id, peopleFeatureInfo1.captured_time);
            Interlocked.Increment(ref queryCount);
            motorVehicleList = motorVehicleList.DistinctBy(a => a.plate_no).ToList();
            foreach (MotorVehicleInfo motorVehicleInfo in motorVehicleList)
            {
                PersonVehicleKey key = new PersonVehicleKey(people1, motorVehicleInfo.plate_no);
                PersonVehicleInfo personVehicleInfo;
                if (dictPersonVehicle.ContainsKey(key))
                {
                    personVehicleInfo = dictPersonVehicle[key];
                }
                else
                {
                    personVehicleInfo = new PersonVehicleInfo()
                    {
                        People = people1,
                        PlateNo = motorVehicleInfo.plate_no,
                        List = new List<PeopleFeatureInfo>()
                    };
                    dictPersonVehicle.TryAdd(key, personVehicleInfo);
                }
                personVehicleInfo.List.Add(peopleFeatureInfo1);
            }
        });
    });

    //比对xxx
    List<PersonVehicleKey> keys = dictPersonVehicle.Keys.ToList();
    Dictionary<string, SameVehicleInfo> dict = new Dictionary<string, SameVehicleInfo>();
    for (int i = 0; i < keys.Count - 1; i++)
    {
        for (int j = i + 1; j < keys.Count; j++)
        {
            var key1 = keys[i];
            var key2 = keys[j];
            var personVehicle1 = dictPersonVehicle[key1];
            var personVehicle2 = dictPersonVehicle[key2];
            if (key1.PlateNo == key2.PlateNo)
            {
                foreach (PeopleFeatureInfo peopleFeature1 in personVehicle1.List)
                {
                    double minTimeDiff = double.MaxValue;
                    int minIndex = -1;
                    for (int k = 0; k < personVehicle2.List.Count; k++)
                    {
                        PeopleFeatureInfo peopleFeature2 = personVehicle2.List[k];
                        DateTime capturedTime1 = DateTime.ParseExact(peopleFeature1.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
                        DateTime capturedTime2 = DateTime.ParseExact(peopleFeature2.captured_time, "yyyyMMddHHmmss", CultureInfo.InvariantCulture);
                        var timeDiff = Math.Abs(capturedTime2.Subtract(capturedTime1).TotalSeconds);
                        if (timeDiff < minTimeDiff)
                        {
                            minTimeDiff = timeDiff;
                            minIndex = k;
                        }
                    }
                    if (minIndex >= 0 && minTimeDiff <= timeThreshold * 60)
                    {
                        PeopleCluster people1 = key1.People;
                        PeopleCluster people2 = key2.People;
                        PeopleFeatureInfo peopleFeatureInfo2 = personVehicle2.List[minIndex];

                        string key = $"{string.Join(",", people1.ClusterIds)}_{string.Join(",", people2.ClusterIds)}"; ;

                        SameVehicleInfo accompanyInfo;
                        if (dict.ContainsKey(key))
                        {
                            accompanyInfo = dict[key];
                        }
                        else
                        {
                            accompanyInfo = new SameVehicleInfo();
                            dict.Add(key, accompanyInfo);
                        }

                        accompanyInfo.People1 = people1;
                        accompanyInfo.People2 = people2;

                        SameVehicleItem accompanyItem = new SameVehicleItem();
                        accompanyItem.Info1 = peopleFeature1;
                        accompanyItem.Info2 = peopleFeatureInfo2;
                        accompanyInfo.List.Add(accompanyItem);

                        accompanyInfo.Count++;

                        resultList.Add(accompanyInfo);
                    }
                }
            }
        }
    }

    resultList = resultList.FindAll(a => a.Count >= kpCountThreshold);

    //筛选,排除xxx
    resultList = resultList.FindAll(a =>
    {
        if (string.Join(",", a.People1.ClusterIds) == string.Join(",", a.People2.ClusterIds))
        {
            return false;
        }
        return true;
    });

    //去重
    int beforeDistinctCount = resultList.Count;
    resultList = resultList.DistinctBy(a =>
    {
        string str1 = string.Join(",", a.People1.ClusterIds);
        string str2 = string.Join(",", a.People2.ClusterIds);
        StringBuilder sb = new StringBuilder();
        foreach (SameVehicleItem item in a.List)
        {
            var info2 = item.Info2;
            sb.Append($"{info2.camera_id},{info2.captured_time},{info2.cluster_id}");
        }
        return $"{str1}_{str2}_{sb}";
    }).ToList();

    //排序
    foreach (SameVehicleInfo item in resultList)
    {
        item.List.Sort((a, b) => -string.Compare(a.Info1.captured_time, b.Info1.captured_time));
    }

    sw.Stop();
    string msg = $"xxx查询,耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,查询次数:{queryCount},去重:{beforeDistinctCount}-->{resultList.Count}";
    Console.WriteLine(msg);
    LogUtil.Info(msg);

    return resultList;
}

上述代码说明

  1. 在并行执行的异步方法中操作集合,要使用线程安全的集合:
ConcurrentDictionary<PersonVehicleKey, PersonVehicleInfo> dictPersonVehicle = new ConcurrentDictionary<PersonVehicleKey, PersonVehicleInfo>();
  1. 在并行执行的异步方法中计算数量,要使用Interlocked:
Interlocked.Increment(ref queryCount);

后续博客:并行执行异步方法的最佳实践

有关探索:优雅地实现异步方法的并行执行的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  5. ruby-openid:执行发现时未设置@socket - 2

    我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass

  6. Ruby 方法() 方法 - 2

    我想了解Ruby方法methods()是如何工作的。我尝试使用“ruby方法”在Google上搜索,但这不是我需要的。我也看过ruby​​-doc.org,但我没有找到这种方法。你能详细解释一下它是如何工作的或者给我一个链接吗?更新我用methods()方法做了实验,得到了这样的结果:'labrat'代码classFirstdeffirst_instance_mymethodenddefself.first_class_mymethodendendclassSecond使用类#returnsavailablemethodslistforclassandancestorsputsSeco

  7. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  8. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  9. ruby - Highline 询问方法不会使用同一行 - 2

    设置:狂欢ruby1.9.2高线(1.6.13)描述:我已经相当习惯在其他一些项目中使用highline,但已经有几个月没有使用它了。现在,在Ruby1.9.2上全新安装时,它似乎不允许在同一行回答提示。所以以前我会看到类似的东西:require"highline/import"ask"Whatisyourfavoritecolor?"并得到:Whatisyourfavoritecolor?|现在我看到类似的东西:Whatisyourfavoritecolor?|竖线(|)符号是我的终端光标。知道为什么会发生这种变化吗? 最佳答案

  10. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

随机推荐