草庐IT

c++ - IO 完成端口初始读取和双向数据

coder 2024-02-23 原文

我有以下简化的 IO 完成端口服务器 C++ 代码:

int main(..)
{
    startCompletionPortThreadProc();

    // Await client connection

    sockaddr_in clientAddress;
    int clientAddressSize = sizeof( clientAddress );
    SOCKET acceptSocket = WSAAccept( serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL);  

    // Connected

    CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, 0, 0 );

    // Issue initial read
    read( acceptSocket );
}


DWORD WINAPI completionPortThreadProc( LPVOID param )
{
    DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    LPPER_IO_DATA perIoData = NULL;

    while( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
    {
        if( WaitForSingleObject( exitEvent, 0 ) == WAIT_OBJECT_0 )
        {
            break;
        }

        if( !perIoData )
            continue;

        if( bytesTransferred == 0 )
        {
            //TODO
        }

        switch( perIoData->operation )
        {
            case OPERATION_READ:
            {
                // Bytes have been received

                if( bytesTransferred < perIoData->WSABuf.len )
                {
                    // Terminate string
                    perIoData->WSABuf.buf[bytesTransferred]   = '\0';
                    perIoData->WSABuf.buf[bytesTransferred+1] = '\0';
                }

                // Add data to message build
                message += std::tstring( (TCHAR*)perIoData->WSABuf.buf );

                // Perform next read
                    perIoData->WSABuf.len = sizeof( perIoData->inOutBuffer );
                    perIoData->flags = 0; 

                    if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == 0 )
                    {
                        // Part message
                        continue;
                    }

                    if( WSAGetLastError() == WSA_IO_PENDING )
                    {
                        // End of message
//TODO: Process message here
                        continue;
                    }
                }
            }
            break;

            case OPERATION_WRITE:
            {
                perIoData->bytesSent += bytesTransferred;

                if( perIoData->bytesSent < perIoData->bytesToSend )
                {
                    perIoData->WSABuf.buf = (char*)&( perIoData->inOutBuffer[perIoData->bytesSent] );
                    perIoData->WSABuf.len = ( perIoData->bytesToSend - perIoData->bytesSent);
                }
                else
                {
                    perIoData->WSABuf.buf  = (char*)perIoData->inOutBuffer;
                    perIoData->WSABuf.len  = _tcslen( perIoData->inOutBuffer ) * sizeof( TCHAR );
                    perIoData->bytesSent   = 0;
                    perIoData->bytesToSend = perIoData->WSABuf.len;
                }

                if( perIoData->bytesToSend )
                {
                    if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, 0, &( perIoData->overlapped ), NULL ) == 0 )
                        continue;

                    if( WSAGetLastError() == WSA_IO_PENDING )
                        continue;
                }
            }
            break;
        }
    }

    return 0;
}

bool SocketServer::read( SOCKET socket, HANDLE completionPort )
{
    PER_IO_DATA* perIoData = new PER_IO_DATA;
    ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );

    perIoData->socket            = socket;
    perIoData->operation         = OPERATION_READ;
    perIoData->WSABuf.buf        = (char*)perIoData->inOutBuffer; 
    perIoData->WSABuf.len        = sizeof( perIoData->inOutBuffer );
    perIoData->overlapped.hEvent = WSACreateEvent();

    DWORD bytesReceived = 0;
    if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesReceived, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
    {
        int gle = WSAGetLastError();
        if( WSAGetLastError() != WSA_IO_PENDING )
        {
            delete perIoData;
            return false;
        }
    }

    return true;
}

bool SocketServer::write( SOCKET socket, std::tstring& data )
{
    PER_IO_DATA* perIoData = new PER_IO_DATA;
    ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );

    perIoData->socket            = socket; 
    perIoData->operation         = OPERATION_WRITE;
    perIoData->WSABuf.buf        = (char*)data.c_str();
    perIoData->WSABuf.len        = _tcslen( data.c_str() ) * sizeof( TCHAR );
    perIoData->bytesToSend       = perIoData->WSABuf.len;  
    perIoData->overlapped.hEvent = WSACreateEvent();

    DWORD bytesSent = 0;
    if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesSent, 0, &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
    {
        if( WSAGetLastError() != WSA_IO_PENDING )
        {
            delete perIoData;
            return false;
        }
    }

    return true;
}

1) 我遇到的第一个问题是初始阅读。

在客户端连接(接受)时,我发出读取。由于客户端尚未发送任何数据,WSAGetLastError() 为 WSA_IO_PENDING 且读取方法返回。

当客户端随后发送数据时,线程仍然停留在 GetQueuedCompletionStatus 调用中(因为我假设我需要另一个 WSARecv 调用?)。

我是否应该一直循环读取方法直到数据到达?这似乎不合逻辑,我认为通过发出初始读取 GetQueuedCompletionStatus 将在数据到达时完成。

2) 我需要在没有确认的情况下双向读写数据。因此,我还创建了一个带有 IOCP 线程的客户端。是否真的可以使用完成端口来执行此操作,或者是否必须在读取之后进行写入?

很抱歉,我问的是基本问题,但在搜索互联网并构建 IOCP 示例后,我仍然无法回答这些问题。

非常感谢。

最佳答案

On client connection (accept), I issue a read. As the client hasn't sent any data yet, WSAGetLastError() is WSA_IO_PENDING and the read method returns.

这是正常行为。

When the client then sends data, the thread remains stuck in the GetQueuedCompletionStatus call (as I assume I need another WSARecv call?).

不,您不需要再打电话。如果它卡住了,那么您就没有正确地将读取与 I/O 完成端口相关联。

Am I supposed to keep looping the read method until data arrives?

没有。您需要为初始读取调用一次 WSARecv()WSA_IO_PENDING 错误表示读取正在等待数据,并且会在数据实际到达时向 I/O 完成端口发出信号。在该信号实际到达之前,请勿调用 WSARecv()(或任何其他读取函数)。然后您可以再次调用 WSARecv() 以等待更多数据。重复直到套接字断开连接。

I thought by issuing the initial read GetQueuedCompletionStatus would complete when data arrived.

这正是应该发生的事情。

2) I need to read and write data bi-directional without acknowledgements. Therefore I've also created a client with the IOCP thread. Is it actually possible to do this with completion ports

是的。读和写是分开的操作,它们不相互依赖。

does a read have to be followed by a write?

如果您的协议(protocol)不需要,则不需要。

话虽如此,您的代码存在一些问题。

注意一点,WSAAccept() 是同步的,您应该考虑使用 AcceptEx(),这样它就可以使用相同的 I/O 完成端口来报告新的连接。

但更重要的是,当一个挂起的I/O操作失败时,GetQueuedCompletionStatus()返回FALSE,返回的LPOVERLAPPED指针将是非-NULL,并且 GetLastError() 将报告 I/O 操作失败的原因。但是,如果 GetQueuedCompletionStatus() 本身 失败,则返回的 LPOVERLAPPED 指针将为 NULL,并且 GetLastError() 将报告 GetQueuedCompletionStatus() 失败的原因。 documentation 中清楚地解释了这种差异。 ,但是您的 while 循环没有考虑到它。改为使用 do..while 循环并根据 LPOVERLAPPED 指针执行操作:

DWORD WINAPI completionPortThreadProc( LPVOID param )
{
    DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    LPPER_IO_DATA perIoData = NULL;

    do
    {
        if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
        {
            // I/O success, handle perIoData based on completionKey as needed...
        }
        else if( perIoData )
        {
            // I/O failed, handle perIoData based on completionKey as needed...
        }
        else
        {
            // GetQueuedCompletionStatus() failure...
            break;
        }    
    }
    while( WaitForSingleObject( exitEvent, 0 ) == WAIT_TIMEOUT );

    return 0;
}

附带说明一下,考虑使用 PostQueuedCompletionionStatus() 而不是使用事件对象来指示 completionPortThreadProc() 应该退出的时间。而不是将终止 completionKey 发布到 I/O 完成端口,然后您的循环可以查找该值:

DWORD WINAPI completionPortThreadProc( LPVOID param )
{
    DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    LPPER_IO_DATA perIoData = NULL;

    do
    {
        if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
        {
            if( completionKey == MyTerminateKey )
                break;

            if( completionKey == MySocketIOKey )
            {
                // I/O success, handle perIoData as needed...
            }
        }
        else if( perIoData )
        {
            // I/O failed, handle perIoData based on completionKey as needed...
        }
        else
        {
            // GetQueuedCompletionStatus() failure...
            break;
        }    
    }
    while( true );

    return 0;
}

CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, MySocketIOKey, 0 );

PostQueuedCompletionStatus( completionPort, 0, MyTerminateKey, NULL );

关于c++ - IO 完成端口初始读取和双向数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37686690/

有关c++ - IO 完成端口初始读取和双向数据的更多相关文章

  1. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  2. ruby-on-rails - 未初始化的常量 Psych::Syck (NameError) - 2

    在我的gem中,我需要yaml并且在我的本地计算机上运行良好。但是在将我的gem推送到ruby​​gems.org之后,当我尝试使用我的gem时,我收到一条错误消息=>"uninitializedconstantPsych::Syck(NameError)"谁能帮我解决这个问题?附言RubyVersion=>ruby1.9.2,GemVersion=>1.6.2,Bundlerversion=>1.0.15 最佳答案 经过几个小时的研究,我发现=>“YAML使用未维护的Syck库,而Psych使用现代的LibYAML”因此,为了解决

  3. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  4. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  5. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  6. ruby-on-rails - 未在 Ruby 中初始化的对象 - 2

    我在Rails工作并有以下类(class):classPlayer当我运行时bundleexecrailsconsole然后尝试:a=Player.new("me",5.0,"UCLA")我回来了:=>#我不知道为什么Player对象不会在这里初始化。关于可能导致此问题的操作/解释的任何建议?谢谢,马里奥格 最佳答案 havenoideawhythePlayerobjectwouldn'tbeinitializedhere它没有初始化很简单,因为你还没有初始化它!您已经覆盖了ActiveRecord::Base初始化方法,但您没有调

  7. ruby - 如何验证 IO.copy_stream 是否成功 - 2

    这里有一个很好的答案解释了如何在Ruby中下载文件而不将其加载到内存中:https://stackoverflow.com/a/29743394/4852737require'open-uri'download=open('http://example.com/image.png')IO.copy_stream(download,'~/image.png')我如何验证下载文件的IO.copy_stream调用是否真的成功——这意味着下载的文件与我打算下载的文件完全相同,而不是下载一半的损坏文件?documentation说IO.copy_stream返回它复制的字节数,但是当我还没有下

  8. ruby-on-rails - ActionController::RoutingError: 未初始化常量 Api::V1::ApiController - 2

    我有用于控制用户任务的Rails5API项目,我有以下错误,但并非总是针对相同的Controller和路由。ActionController::RoutingError:uninitializedconstantApi::V1::ApiController我向您描述了一些我的项目,以更详细地解释错误。应用结构路线scopemodule:'api'donamespace:v1do#=>Loginroutesscopemodule:'login'domatch'login',to:'sessions#login',as:'login',via::postend#=>Teamroutessc

  9. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  10. Ruby 文件 IO 定界符? - 2

    我正在尝试解析一个文本文件,该文件每行包含可变数量的单词和数字,如下所示:foo4.500bar3.001.33foobar如何读取由空格而不是换行符分隔的文件?有什么方法可以设置File("file.txt").foreach方法以使用空格而不是换行符作为分隔符? 最佳答案 接受的答案将slurp文件,这可能是大文本文件的问题。更好的解决方案是IO.foreach.它是惯用的,将按字符流式传输文件:File.foreach(filename,""){|string|putsstring}包含“thisisanexample”结果的

随机推荐