草庐IT

.NET与大数据

0611163 2023-03-28 原文

前言

当别人做大数据用Java、Python的时候,我使用.NET做大数据、数据挖掘,这确实是值得一说的事。
写的并不全面,但都是实际工作中的内容。

.NET在大数据项目中,可以做什么?

  1. 写脚本(使用控制台程序+顶级语句)
  2. 写工具(使用Winform)
  3. 写接口、写服务

使用C#写代码的优点是什么?

  1. 静态类型+匿名类型,一次性使用的实体类就用匿名类型,多次或多个地方使用的实体类就用静态类型,静态类型优于Python,匿名类型优于Java。你是不是想说Python也有静态类型?你倒是写啊?!
  2. 代码的可维护性好,这是相对于Python说的,不一定是语言的锅,还有固有的代码组织习惯,静态类型本身就是很好的注释
  3. 性能好,异步并发的代码易编写。
    想起来一个事,就是前同事用Python2做数据挖掘,先用的es,性能差,改用的clickhouse,我就纳闷,es性能差?现在我想我明白了,我看了其中一个挖掘算法,它需要在双层循环遍历中去请求es进行查询,它没有使用异步,也没有使用多线程,那不就是一个线程在查询吗?我们现网es服务器配置这么强这么多,它居然用一个线程去同步请求,能快才怪!实际上一个请求耗时极短,因为es有各种缓存,而查询条件精确可以命中缓存,所以可以并发请求多个es节点。
    那前同事为什么没有使用异步并发或多线程呢?Python2不支持吗?或者Python2支持,但写起来不方便?或者前同事不会写?(原因:写起来不方便,C#一样也不太方便,而且会使整个程序的并发请求量变得难以控制,可以针对某个接口单独优化,但所有接口都这样写,也挺麻烦的)

使用.NET开发的优点是什么?

其中一个优点是应用程序类型丰富,目前我用到的应用程序类型有:

  1. 控制台
  2. Winform
  3. Web API
  4. Blazor
    你是不是想说Java和Python也可以写控制台、窗体程序、Web API?一个熟悉Ptyhon的程序员,可不一定会写窗体程序,需要一点时间学习,一个做了几年.NET的程序员天然会写Winform,就是拖控件啊。当然,也可能他们不用Windows。
    每一种应用程序类型,都意味着学习成本,而这些我已经会了,时间就省下了(Blazor一开始不会,学习花了一两天)。

.NET与ClickHouse

我写了一个大杂烩脚本项目,里面有很多工程是查询ClickHouse统计分析,代码流程就是读取Excel数据作为查询输入条件,查询ClickHouse统计分析,统计结果导出到Excel。一个统计分析工作任务小半天就完成了。
用的ORM是我自己写的Dapper.LiteSql。没什么人用,可能是功能不强吧。不过很适合我自己的需求,我自己经常用。
比如:

int count = session.CreateSql<XXX>(@"
    select count(distinct t.xxx, t.xxx, t.xxx) as cnt
    from xxx t
")
.Where(t => t.PassTime >= startTime && t.PassTime <= endTime)
.Where("t.Name in (" + kkNames + ")")
.QuerySingle<int>();

再比如:

var query = session.CreateSql<XXX>(@"
        select t.xxx, t.xxx, t.xxx
        from xxx t
    ")
    .Where(t => t.PassTime >= firstTime && t.PassTime <= firstTime.AddDays(7).AddSeconds(-1));
query.Where(t => plateList.Skip((page - 1) * pageSize).Take(pageSize).ToList().Contains(t.PlateNo));
var temp = query.ToList();

对于统计查询,我经常SQL和Lambda表达式混写,感觉这样非常灵活。
某些情况下,混写比纯Lambda写法,是要清晰的:

List<XXX> list = session.CreateSql<XXX>(@"
    select xxx, xxx as xxx, max(xxx) as xxx
    from (
    select xxx, toDate(xxx) as xxx, xxx, count(*) as xxx
    from (
    select distinct t.xxx, t.xxx, t.xxx
    from xxx t
").Where(t => t.Xxx != "xxx")
.Where(t => t.XxxTime >= startTime && t.XxxTime <= endTime)
.Where(t => xxxList.Contains(t.Xxx))
.Where(@"(
    (formatDateTime(t.xxx_time ,'%H:%M:%S') >= '07:00:00' and formatDateTime(t.xxx_time ,'%H:%M:%S') <= '08:59:59') or
    (formatDateTime(t.xxx_time ,'%H:%M:%S') >= '14:00:00' and formatDateTime(t.xxx_time ,'%H:%M:%S') <= '20:59:59')
)")
.Append(@")")
.GroupBy("xxx, xxx, xxx")
.Append(@") 
    group by xxx, xxx
")
.QueryList<XXX>();

上述代码说明:

  1. group by写了两种写法比较随意
  2. 三层select嵌套,当然主流ORM都能实现,但不一定易编写、易阅读
  3. 我不用针对ClickHouse去实现formatDateTime,也不用实现toDate、max、distinct、count,也不用纠结是count(*)还是count(1),只要实现的功能足够少,BUG就少。

.NET与ElasticSearch

本打算使用Elasticsearch.Net,为什么没有使用?

  1. 学习成本,项目中没有学习时间,虽然造测试数据是本职工作,但写小工具不是本职工作不能耽误太多时间,所以没有学习时间
  2. 我使用HttpClient查询es,这种查询es的方式和kibana中写的查询语句、以及前同事留下的创建索引的文档、模板最接近,方便抄现成的。下面是一个完整的查询es方法:
public async Task<TicketAgg> QueryAgg(string strStartTime, string strEndTime, string idCard)
{
    Stopwatch sw = Stopwatch.StartNew();

    string esUrl = $"http://{esIPs[_rnd.Next(0, esIPs.Length)]}:24100/out_xxx/_search";

    var esQueryBody = new
    {
        size = 0,
        query = new
        {
            @bool = new
            {
                must = new dynamic[]
                {
                    new
                    {
                        range = new
                        {
                            travel_time = new
                            {
                                gte = strStartTime,
                                lte = strEndTime,
                                format = "yyyyMMddHHmmss"
                            }
                        }
                    },
                    new
                    {
                        match_phrase = new
                        {
                            zjhm = idCard
                        }
                    }
                }
            }
        },
        aggs = new
        {
            countByZjhm = new
            {
                terms = new
                {
                    field = "zjhm",
                    size = 10000
                }
            }
        }
    };

    string esPostData = JsonConvert.SerializeObject(esQueryBody);
    Console.WriteLine($"ES请求URL:{esUrl}");
    Console.WriteLine($"ES请求参数:{esPostData}");
    HttpClient httpClient = HttpClientFactory.GetClient();
    HttpContent content = new StringContent(esPostData);
    content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
    string strEsResult = await (await httpClient.PostAsync(esUrl, content)).Content.ReadAsStringAsync();
    var resultObj = new
    {
        took = 0,
        aggregations = new
        {
            countByZjhm = new
            {
                buckets = new[]
                {
                    new
                    {
                        key = "",
                        doc_count = 0
                    }
                }
            }
        }
    };
    var esResult = JsonConvert.DeserializeAnonymousType(strEsResult, resultObj);

    TicketAgg agg = new TicketAgg();
    agg.IdCard = idCard;
    agg.Count = esResult.aggregations.countByZjhm.buckets[0].doc_count;

    sw.Stop();
    Console.WriteLine($"统计数据,耗时:{sw.Elapsed.TotalSeconds.ToString("0.000")} 秒");

    return agg;
}

代码中esQueryBody和resultObj都是一次性使用的,直接用匿名动态类型,而TicketAgg是需要实例化作为返回值给其它方法使用的,所以定义成静态类型。
评论区有人问可选条件怎么写,代码如下:

string strStartTime = DateTime.Now.AddDays(-7).ToString("yyyyMMddHHmmss");
string strEndTime = DateTime.Now.ToString("yyyyMMddHHmmss");
string idCard = "33";

var esQueryBody = new
{
    size = 10000,
    query = new
    {
        @bool = new
        {
            must = new List<dynamic>
            {
                new
                {
                    range = new
                    {
                        travel_time = new
                        {
                            gte = strStartTime,
                            lte = strEndTime,
                            format="yyyyMMddHHmmss"
                        }
                    }
                }
            }
        }
    }
};

if (idCard != null)
{
    esQueryBody.query.@bool.must.Add(new
    {
        match_phrase = new
        {
            zjhm = idCard
        }
    });
}

string esPostData = JsonConvert.SerializeObject(esQueryBody);

上述代码说明:

  1. must原来是dynamic[],它的长度是不可变的,不方便追加,所以修改成List,就可以动态追加了。
  2. 写这段代码,我没有百度,没有找文档,花了几分钟试出来的。优秀的语法可以让使用者举一反三。

下面一段代码,生产测试数据用的:

public async Task MockXxxData(string indexName, int count, DateTime startDate, DateTime endDate, string[] departures, string[] destinations, dynamic peoples)
{
    int days = (int)endDate.Subtract(startDate).TotalDays;

    List<Task> taskList = new List<Task>();
    for (int i = 0; i < count; i++)
    {
        DateTime date = startDate.AddDays(_rnd.Next(0, days + 1));
        long time = (long)(_rnd.NextDouble() * 3600 * 24);
        var people = peoples[_rnd.Next(0, peoples.Length)];

        var esRequestBody = new
        {
            xxx_type = _rnd.Next(1, 4).ToString(),
            zjlx = "xxx",
            zjhm = people.zjhm,
            xm = people.xm,
            departure = departures[_rnd.Next(0, departures.Length)],
            destination = destinations[_rnd.Next(0, destinations.Length)],
            xxx_date = date.ToString("yyyyMMdd"),
            xxx_time = date.AddSeconds(time).ToString("yyyyMMddHHmmss"),
            xxx_time = date.AddSeconds(time).AddHours(0.5 + _rnd.NextDouble()).ToString("yyyyMMddHHmmss"),
            xxx_time = date.AddSeconds(time).AddDays(-2 + _rnd.NextDouble()).ToString("yyyyMMddHHmmss"),
            xxx = "",
            xxx = ""
        };

        var task = ServiceFactory.Get<EsWriteService>().Write(indexName, esRequestBody);
        taskList.Add(task);
    }
    await Task.WhenAll(taskList);
}

上述代码说明:

  1. 程序跑起来生产数据,一般会有几十个线程,也就是请求es的并发量是几十
  2. 如果你觉得几十的并发量,还是有点高,可以在调用的Write异步方法中使用Semaphore类限制一下并发量,代码如下:
private Semaphore _sem = new Semaphore(20, 20); //限制异步请求的并发数量

public async Task<bool> Write(string indexName, dynamic esRequestBody)
{
    _sem.WaitOne();
    try
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();

        indexName = $"{indexName}-{DateTime.Now.Year}-{DateTime.Now.Month:00}";
        string esUrl = $"http://{esIPs[_rnd.Next(0, esIPs.Length)]}:24100/{indexName}/doc";

        string esRequestData = JsonConvert.SerializeObject(esRequestBody);
        HttpContent content = new StringContent(esRequestData);
        content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
        HttpClient httpClient = HttpClientFactory.GetClient();
        string strEsResult = await (await httpClient.PostAsync(esUrl, content)).Content.ReadAsStringAsync();
        var resultObj = new
        {
            status = 0
        };
        var esResult = JsonConvert.DeserializeAnonymousType(strEsResult, resultObj);

        sw.Stop();
        _log?.Info($"【写入ES索引】【{(esResult.status == 0 ? "成功" : "失败")}】耗时:{sw.Elapsed.TotalSeconds:0.000} 秒,索引名称:{indexName},请求URL:{esUrl},请求参数:{esRequestData}");
        return esResult.status == 0;
    }
    catch
    {
        throw;
    }
    finally
    {
        _sem.Release();
    }
}

用到的库

评论区有人问技术栈,这里列一下主要的库:

  1. Microsoft.Extensions.DependencyInjection 和 Autofac (依赖注入)
  2. AutoMapper (实体类映射)
  3. Microsoft.Extensions.Http (HttpClient,用于操作ElasticSearch、网络请求)
  4. Quartz (定时任务)
  5. Dapper、Dapper.LiteSql (ORM)
  6. Newtonsoft.Json (Json序列化)
  7. ClickHouse.Client (操作ClickHouse)
  8. Oracle.ManagedDataAccess.Core (操作Oracle)
  9. MySqlConnector (操作MySQL)

我最近写了哪些工程

  1. 大杂烩脚本工程,包括查询clickhouse统计分析输出Excel、查询MySQL和Oracle、各种小脚本工具
  2. Blazor工程,做了一套简单的增删改查,精力有限,自己测试用,不用手动改数据库了
  3. 数据挖掘服务,主要是Web API和定时任务
  4. Winform工具,用于测试时创建ES索引、生产模拟数据。为什么写这个?因为做数据挖掘,不给数据,只能自己造了。

为什么从这篇博客看起来这个项目只有我一个人在做?没团队?

还有项目经理、产品经理、前端等一共几个人,项目资金投入少,所以不可能有很多人的。

为什么没有使用Python?

我一开始是想使用Python的,但就我用.NET写的这些东西,如果改用Python,没个2、3年经验,写不顺畅。

我用.NET做一个项目,Swagger有了,创建工程时自带的,当然Python的Swagger也是有的,你可以百度"python 从注释自动生成 swagger",之前看到过一个不错的,没保存,一时半会就找不到了。
用Blazor做了简单的配置页面,测试时不用去手动修改数据库了
写了一个Mock工程,生产模拟测试数据,写入速度可以达到6000条/秒(一条数据请求一次,不是批量写入),界面如下:

最后

写此博客是为了给.NET正名,在大数据项目中,.NET大有可为。
我写代码没有用到什么特别的技术,看起来很简单,但也不是随便学学就能写,没个3、5年经验,很难写的这么快。
我写代码也没有什么条条框框,可能不规范,但很灵活。
例如,winform程序注入日志工具类怎么写?来不急百度了,就这么写吧,一样每秒6000条的狂写日志,还不卡界面:

public partial class Form1 : Form, ILog
{
    ...省略

    public Form1()
    {
        InitializeComponent();

        ...省略

        //注入日志工具类
        ServiceFactory.Get<IndexCreationService>().InjectLog(this);
        ServiceFactory.Get<EsWriteService>().InjectLog(this);
        ServiceFactory.Get<MockDataService>().InjectLog(this);
    }
}

internal class EsWriteService : ServiceBase
{
    ...省略
    private ILog? _log;
    public void InjectLog(ILog log) => _log = log;

    public async Task<bool> Write(string indexName, dynamic esRequestBody)
    {
        ...省略
        _log?.Info("xxx");
        ...省略
    }
}

就目前这些项目、脚本、工具而言,感觉这就是我写的最佳实践。不知道最佳实践,代码也能写,容易写成屎山,要么写的服务三天两头崩。

有关.NET与大数据的更多相关文章

  1. ruby-on-rails - Ruby net/ldap 模块中的内存泄漏 - 2

    作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 如何模拟 Net::HTTP::Post? - 2

    是的,我知道最好使用webmock,但我想知道如何在RSpec中模拟此方法:defmethod_to_testurl=URI.parseurireq=Net::HTTP::Post.newurl.pathres=Net::HTTP.start(url.host,url.port)do|http|http.requestreq,foo:1endresend这是RSpec:let(:uri){'http://example.com'}specify'HTTPcall'dohttp=mock:httpNet::HTTP.stub!(:start).and_yieldhttphttp.shou

  4. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  5. ruby - Net::HTTP 获取源代码和状态 - 2

    我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur

  6. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  7. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  8. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  9. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  10. Get https://registry-1.docker.io/v2/: net/http: request canceled while waiting - 2

    1.错误信息:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:requestcanceledwhilewaitingforconnection(Client.Timeoutexceededwhileawaitingheaders)或者:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:TLShandshaketimeout2.报错原因:docker使用的镜像网址默认为国外,下载容易超时,需要修改成国内镜像地址(首先阿里

随机推荐