草庐IT

c# - 如何编写可扩展的基于 TCP/IP 的服务器

coder 2023-07-07 原文

我正处于编写新的 Windows 服务应用程序的设计阶段,该应用程序接受 TCP/IP 连接以进行长时间运行的连接(即,这不像 HTTP 那样有许多短连接,而是客户端连接并保持连接数小时或数天甚至数周)。
我正在寻找设计网络架构的最佳方式的想法。我将需要为该服务至少启动一个线程。我正在考虑使用异步 API(BeginRecieve 等),因为我不知道在任何给定时间(可能是数百个)我将连接多少个客户端。我绝对不想为每个连接启动一个线程。
数据将主要从我的服务器流出到客户端,但有时会从客户端发送一些命令。这主要是一个监控应用程序,我的服务器定期向客户端发送状态数据。
使其尽可能可扩展的最佳方法是什么?基本工作流程?
明确地说,我正在寻找基于 .NET 的解决方案(如果可能,C#,但任何 .NET 语言都可以使用)。
我需要一个解决方案的工作示例,作为指向我可以下载的内容的指针或内嵌的简短示例。并且它必须是基于 .NET 和 Windows 的(任何 .NET 语言都是可以接受的)。

最佳答案

我以前写过类似的东西。我多年前的研究表明,使用异步套接字编写自己的套接字实现是最好的选择。这意味着客户没有真正做任何事情实际上需要相对较少的资源。发生的任何事情都由 .NET 线程池处理。
我把它写成一个管理服务器所有连接的类。
我只是使用一个列表来保存所有客户端连接,但是如果您需要更快地查找更大的列表,您可以随心所欲地编写它。

private List<xConnection> _sockets;
此外,您还需要套接字实际监听传入连接。
private System.Net.Sockets.Socket _serverSocket;
start 方法实际上启动服务器套接字并开始监听任何传入连接。
public bool Start()
{
  System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
  System.Net.IPEndPoint serverEndPoint;
  try
  {
     serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port);
  }
  catch (System.ArgumentOutOfRangeException e)
  {
    throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e);
  }
  try
  {
    _serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
   }
   catch (System.Net.Sockets.SocketException e)
   {
      throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
    }
    try
    {
      _serverSocket.Bind(serverEndPoint);
      _serverSocket.Listen(_backlog);
    }
    catch (Exception e)
    {
       throw new ApplicationException("An error occurred while binding socket. Check inner exception", e);
    }
    try
    {
       //warning, only call this once, this is a bug in .net 2.0 that breaks if
       // you're running multiple asynch accepts, this bug may be fixed, but
       // it was a major pain in the rear previously, so make sure there is only one
       //BeginAccept running
       _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
    }
    catch (Exception e)
    {
       throw new ApplicationException("An error occurred starting listeners. Check inner exception", e);
    }
    return true;
 }
我只想指出异常处理代码看起来很糟糕,但原因是我在那里有异常抑制代码,因此任何异常都会被抑制并返回 false如果设置了配置选项,但为了简洁起见,我想删除它。
上面的 _serverSocket.BeginAccept(new AsyncCallback(acceptCallback)), _serverSocket) 实质上设置了我们的服务器套接字,以便在用户连接时调用 acceptCallback 方法。此方法从 .NET 线程池运行,如果您有许多阻塞操作,它会自动处理创建额外的工作线程。这应该以最佳方式处理服务器上的任何负载。
    private void acceptCallback(IAsyncResult result)
    {
       xConnection conn = new xConnection();
       try
       {
         //Finish accepting the connection
         System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
         conn = new xConnection();
         conn.socket = s.EndAccept(result);
         conn.buffer = new byte[_bufferSize];
         lock (_sockets)
         {
           _sockets.Add(conn);
         }
         //Queue receiving of data from the connection
         conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
         //Queue the accept of the next incoming connection
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (SocketException e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (Exception e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
     }
上面的代码基本上刚刚接受了进来的连接,排队BeginReceive这是一个回调,当客户端发送数据时会运行,然后排队下一个 acceptCallback它将接受进入的下一个客户端连接。BeginReceive方法调用是告诉套接字从客户端接收数据时要做什么。对于 BeginReceive ,你需要给它一个字节数组,这是它在客户端发送数据时会复制数据的地方。 ReceiveCallback方法将被调用,这就是我们处理接收数据的方式。
private void ReceiveCallback(IAsyncResult result)
{
  //get our connection from the callback
  xConnection conn = (xConnection)result.AsyncState;
  //catch any errors, we'd better not have any
  try
  {
    //Grab our buffer and count the number of bytes receives
    int bytesRead = conn.socket.EndReceive(result);
    //make sure we've read something, if we haven't it supposadly means that the client disconnected
    if (bytesRead > 0)
    {
      //put whatever you want to do when you receive data here

      //Queue the next receive
      conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
     }
     else
     {
       //Callback run but no data, close the connection
       //supposadly means a disconnect
       //and we still have to close the socket, even though we throw the event later
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
   catch (SocketException e)
   {
     //Something went terribly wrong
     //which shouldn't have happened
     if (conn.socket != null)
     {
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
 }
编辑:在这种模式中,我忘了在这方面的代码中提到:
//put whatever you want to do when you receive data here

//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
通常,在任何你想要的代码中,我都会将数据包重新组装成消息,然后将它们创建为线程池上的作业。这样,无论消息处理代码正在运行时,来自客户端的下一个块的 BeginReceive 都不会延迟。
接受回调通过调用结束接收完成读取数据套接字。这会填充开始接收函数中提供的缓冲区。一旦你在我留下评论的地方做任何你想做的事情,我们就会调用下一个 BeginReceive如果客户端发送更多数据,该方法将再次运行回调。
现在是真正棘手的部分:当客户端发送数据时,您的接收回调可能只用部分消息调用。重新组装会变得非常复杂。我使用我自己的方法并创建了一种专有协议(protocol)来做到这一点。我省略了它,但如果你要求,我可以添加它。这个处理程序实际上是我写过的最复杂的一段代码。
public bool Send(byte[] message, xConnection conn)
{
  if (conn != null && conn.socket.Connected)
  {
    lock (conn.socket)
    {
    //we use a blocking mode send, no async on the outgoing
    //since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
       conn.socket.Send(bytes, bytes.Length, SocketFlags.None);
     }
   }
   else
     return false;
   return true;
 }
上面的send方法实际上使用了一个同步的Send打电话。由于我的应用程序的消息大小和多线程性质,这对我来说很好。如果你想发送到每个客户端,你只需要遍历 _sockets 列表。
您在上面看到的 xConnection 类基本上是一个简单的套接字包装器,用于包含字节缓冲区,在我的实现中还有一些额外的东西。
public class xConnection : xBase
{
  public byte[] buffer;
  public System.Net.Sockets.Socket socket;
}
此处还可以引用 using s 我包括在内,因为当他们不包括在内时我总是很生气。
using System.Net.Sockets;
我希望这会有所帮助。它可能不是最干净的代码,但它可以工作。代码也有一些细微差别,您应该对更改感到厌烦。对于一个,只有一个 BeginAccept随时调用。曾经有一个非常烦人的 .NET 错误,这是几年前的事了,所以我不记得细节了。
此外,在 ReceiveCallback代码,我们在排队下一次接收之前处理从套接字接收到的任何内容。这意味着对于单个套接字,我们实际上只是在 ReceiveCallback 中。在任何时间点一次,我们不需要使用线程同步。但是,如果您重新排序以在拉取数据后立即调用下一个接收,这可能会快一点,您需要确保正确同步线程。
此外,我砍掉了很多代码,但保留了正在发生的事情的本质。这应该是您设计的良好开端。如果您对此还有任何疑问,请发表评论。

关于c# - 如何编写可扩展的基于 TCP/IP 的服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/869744/

有关c# - 如何编写可扩展的基于 TCP/IP 的服务器的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  4. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  5. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  6. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  7. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  8. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  9. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  10. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

随机推荐