草庐IT

c - TCP server 可以同时处理两个不同的client写请求而不会互相阻塞

coder 2023-09-20 原文

我正在尝试编写一个可以处理两个不同客户端的 TCP 服务器。我有一个请求者和提供者客户。提供程序是多线程的,可以向服务器添加和删除新服务。每次添加或删除新服务时,它都应将其发送到服务器,服务器将打印更新。请求者客户端允许用户输入服务,然后检查服务器以查看该服务是否存在。

我遇到的问题是 recv() 函数。我在我的程序中调用了两次,一次是从生产者客户端读取,另一次是从请求者读取。问题是服务器只收到一条消息然后卡住。它应该在每次线程运行时更新。问题似乎正在发生,因为第二个 recv() 调用阻止了它,因为它正在等待请求者。我试图通过使用非阻塞事件标志 (MSG_DONTWAIT) 使第二个 recv() 调用成为非阻塞,但这并没有解决我的问题。

如何编写一个 TCP 服务器来处理两个不同的客户端写入请求并防止它们相互阻塞?我的代码如下。

客户 1- 提供者

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080

int SIZE = 100;
int counter = 0;
int semaphore = 1; //set to false

struct values 
       {
       int serviceArray[100]; 
       int portArray[100];  
       }input;

void callServer()
{
struct sockaddr_in address; 
    int sock = 0, valread; 
    struct sockaddr_in serv_addr; 

    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        printf("\n Socket creation error \n"); 

    } 

    memset(&serv_addr, '0', sizeof(serv_addr)); 

    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(PORT); 

    // Convert IPv4 and IPv6 addresses from text to binary form 
    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)  
    { 
        printf("\nInvalid address/ Address not supported \n"); 

    } 

    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) 
    { 
        printf("\nConnection Failed \n"); 

    } 
    send(sock , &input , sizeof(input) , 0 ); 


} 






int arraySearch(int number){
    int i=0; 
    for(i=0; i< 100; i++){
        if(input.serviceArray[i] == number)
        {
            return 0; //is found
        }

    }

    return 1;
}


void *createService()
{

    while(counter < SIZE)
    {
        sleep(2);
        if(semaphore ==1)
        {

            int randomValue = rand() % SIZE;

            if(arraySearch(randomValue) == 1)
            {
                input.serviceArray[counter] = randomValue;
                input.portArray[counter] = randomValue + PORT;
                printf("Thread 1 is adding service number: %d and port number: %d\n", input.serviceArray[counter], input.portArray[counter]);
                semaphore = 0;//unlock
                counter = counter + 1;
                callServer();

            }
        }
    }
}
void *removeService()
{

    while(counter < SIZE)
    {
        sleep(4);
        if(semaphore ==0)
        {
            printf("Thread 2 is removing service number: %d and port number: %d\n", input.serviceArray[counter - 1], input.portArray[counter - 1]);
            input.serviceArray[counter -1] = 0;
            input.portArray[counter - 1] = 0;
            semaphore = 1; //lock
            callServer();

        }
    }
}



int main(void)
{

    //create threads
        pthread_t thread_id1, thread_id2;  


        pthread_create(&thread_id1, NULL, createService, NULL);
        pthread_create(&thread_id2, NULL, removeService, NULL); 


        pthread_join(thread_id1, NULL);  
        pthread_join(thread_id2, NULL); 



}

客户 2 - 请求者

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>
#define PORT 8080

void callServer(int serviceNum)
{
struct sockaddr_in address; 
    int sock = 0, valread; 
    struct sockaddr_in serv_addr; 

    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        printf("\n Socket creation error \n"); 

    } 

    memset(&serv_addr, '0', sizeof(serv_addr)); 

    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(PORT); 

    // Convert IPv4 and IPv6 addresses from text to binary form 
    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)  
    { 
        printf("\nInvalid address/ Address not supported \n"); 

    } 

    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) 
    { 
        printf("\nConnection Failed \n"); 

    } 
    send(sock , &serviceNum , sizeof(serviceNum) , 0 ); 


} 


void main(){
int serviceNum;

printf("Which service would you like to run?\n");
scanf("%d",&serviceNum);
printf("You entered: %d", serviceNum);
callServer(serviceNum);

}

服务器

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080 

 int sockfd; 
 int serviceNum;

    struct sockaddr_in servaddr, cliaddr; 

struct values {
       int serviceArray[100]; 
       int portArray[100];
       }input;






int main()
{

    int server_fd, server_fd2, new_socket, valread, valread2; 

    struct sockaddr_in address; 

    int opt = 1; 

    int addrlen = sizeof(address); 





    // Creating socket file descriptor 

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons( PORT ); 



    // Forcefully attaching socket to the port 8080 

    if (bind(server_fd, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 


while(1){
   valread = recv( new_socket , &input, sizeof(input), 0); 




   int j;
        for(j = 0; j < 5; j++)
        {
            printf("service: %d and port: %d\n", input.serviceArray[j], input.portArray[j]);

        }

recv( new_socket , &serviceNum, sizeof(serviceNum), MSG_DONTWAIT)

    printf("The service number you passed is %d", serviceNum);

    }


} 

编辑 - 这是对服务器的更新并且是多线程的。我仍然遇到阻塞问题。

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080 

 int sockfd; 
 int serviceNum;

    struct sockaddr_in servaddr, cliaddr; 

struct values {
       int serviceArray[100]; 
       int portArray[100];
       }input;

int server_fd, server_fd2, new_socket, valread, valread2; 

    struct sockaddr_in address; 

    int opt = 1; 

    int addrlen = sizeof(address);     



void *producer()
{
     // Creating socket file descriptor 

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons( PORT ); 



    // Forcefully attaching socket to the port 8080 

    if (bind(server_fd, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 
    while(1)
    {
   valread = read( new_socket , &input, sizeof(input));   
   int j;
        for(j = 0; j < 5; j++)
        {
            printf("service: %d and port: %d\n", input.serviceArray[j], input.portArray[j]);

        }
}
}

void *requestor()
{
     // Creating socket file descriptor 

    if ((server_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd2, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons(8090); 





    if (bind(server_fd2, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd2, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd2, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 
    while(1)
    {
    valread2 = read( new_socket , &serviceNum, sizeof(serviceNum)); 
    printf("The service number you passed is %d", serviceNum);
    }
}


int main()
{

//create threads
        pthread_t thread_id1, thread_id2;  


        pthread_create(&thread_id1, NULL, producer, NULL);
        pthread_create(&thread_id2, NULL, requestor, NULL); 


        pthread_join(thread_id1, NULL);  
        pthread_join(thread_id2, NULL); 







} 

最佳答案

默认情况下,套接字以阻塞 模式运行。因此,当您在一个套接字上调用 recv() 时,它会在等待数据到达时阻塞同一线程上的其他套接字,这是有道理的。

对于您尝试执行的操作,您需要将服务器代码更改为:

  • 让每个已接受的套接字处于阻塞模式,并在它们各自的工作线程或分支进程中对它们进行操作。

  • 将每个接受的套接字切换到非阻塞模式(fctrl(FIONBIO)等),然后使用select()(e)poll() 或其他类似机制在一个线程中一起监视套接字,然后在它告诉您各个套接字何时有您需要处理的事件时作出 react (即,不要' 从套接字读取数据,直到确实有可读取的数据,等等)。

  • (仅限 Windows)通过重叠 I/O 操作异步使用每个接受的套接字。让操作系统在每个套接字上有事件时通知您。

关于c - TCP server 可以同时处理两个不同的client写请求而不会互相阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54695706/

有关c - TCP server 可以同时处理两个不同的client写请求而不会互相阻塞的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  3. ruby - 我可以使用 Ruby 从 CSV 中删除列吗? - 2

    查看Ruby的CSV库的文档,我非常确定这是可能且简单的。我只需要使用Ruby删除CSV文件的前三列,但我没有成功运行它。 最佳答案 csv_table=CSV.read(file_path_in,:headers=>true)csv_table.delete("header_name")csv_table.to_csv#=>ThenewCSVinstringformat检查CSV::Table文档:http://ruby-doc.org/stdlib-1.9.2/libdoc/csv/rdoc/CSV/Table.html

  4. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  5. ruby - Highline 询问方法不会使用同一行 - 2

    设置:狂欢ruby1.9.2高线(1.6.13)描述:我已经相当习惯在其他一些项目中使用highline,但已经有几个月没有使用它了。现在,在Ruby1.9.2上全新安装时,它似乎不允许在同一行回答提示。所以以前我会看到类似的东西:require"highline/import"ask"Whatisyourfavoritecolor?"并得到:Whatisyourfavoritecolor?|现在我看到类似的东西:Whatisyourfavoritecolor?|竖线(|)符号是我的终端光标。知道为什么会发生这种变化吗? 最佳答案

  6. ruby - 我可以使用 aws-sdk-ruby 在 AWS S3 上使用事务性文件删除/上传吗? - 2

    我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的

  7. ruby-on-rails - 项目升级后 Pow 不会更改 ruby​​ 版本 - 2

    我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby​​版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby​​版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘

  8. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  9. ruby - 有人可以帮助解释类创建的 post_initialize 回调吗 (Sandi Metz) - 2

    我正在阅读SandiMetz的POODR,并且遇到了一个我不太了解的编码原则。这是代码:classBicycleattr_reader:size,:chain,:tire_sizedefinitialize(args={})@size=args[:size]||1@chain=args[:chain]||2@tire_size=args[:tire_size]||3post_initialize(args)endendclassMountainBike此代码将为其各自的属性输出1,2,3,4,5。我不明白的是查找方法。当一辆山地自行车被实例化时,因为它没有自己的initialize方法

  10. ruby - 是否可以覆盖 gemfile 进行本地开发? - 2

    我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI

随机推荐