草庐IT

python如何实现GRPC服务,python实现简单的grpc通信

尚拙谨言 2024-03-26 原文

😘引流个人主页:尚拙谨言的博客_CSDN博客-技术实战,学习经验分享,大道至简系列领域博主

grpc是一种基于某种协议实现不同机器间进行通信的服务框架。不同机器可以是不同的服务端、客户端,当服务端实现好某些功能后,提供一个服务接口,供不同客户端进行接口调用,从而让不同客户端都能够“享用”到服务端提供的功能。

在实际业务场景,比如我是做算法的,那么当我的模型训练完成后,要放到线上让别人调用,则经常是以grpc的方式进行实现的。简单流程就是我写个服务端,实现接收客户端传来的数据,并进行模型推理计算,计算结果再返回客户端,那么客户端实际只需要提供数据即可得到它们想要的结果,中间数据的处理过程均由服务端来完成,因此,它还是很方便的有木有~

目录

1. 定义protobuf文件

2. 服务端实现

2. 客户端实现

3. 总结


本文将简单介绍一下如何使用Python来实现一个简单的grpc服务。由于涉及到数据的传输,因此必然涉及到数据格式化问题,就是需要有一种通用的协议将数据格式化,方便在不同机器间都能够高效传输。grpc这里用protobuf(Protocol Buffers)作为通信协议,它是一种高效便捷的数据存储格式,与平台和编程语言无关,因此,只要定义好protobuf协议,用Python也好用Java也好,都可以相互通信,十分方便。

1. 定义protobuf文件

文件命名为ReceiveData.proto,注意,我本人试验过,你并不能创建一个.txt文件后强行将后缀改为.proto,用pycharm的同学可以参考这篇博客:pycharm添加proto格式文件插件,就是你需要在pycharm中添加相应能够兼容.proto格式文件的插件。

创建一个空白的proto文件,添加以下内容:

// 这里我们用的是proto3版本,版本号是一定要指定的哦!
syntax = "proto3";

import "google/protobuf/wrappers.proto";

//这是供java端调用时用到的,咱们python的不用管
//option java_package = "xx.xx.core.grpc.lib.label";
//option java_multiple_files = true;

//package label;

service LabelService {
  /**
  样本集接口
   */
  rpc receiveSample(SampleRequestList) returns (google.protobuf.BoolValue) {}
}
message SampleRequestList {
  repeated SampleRequest reqList = 1;
}

message SampleRequest {
  /**
  标签值
   */
  string labelValue = 1;
  /**
  样本集数据
   */
  string text = 2;
}

我们定义一个简单的接收数据的服务,客户端传labelValue和text数据,服务端接收数据并进行清洗和分词,我们可以打印出来看结果。数据接收成功后向客户端返回BoolValue类型的响应,true或false。

文件定义好后,我们进入到proto文件所在目录,执行

python -m grpc_tools.protoc -I. --python_out=../grpc_file --grpc_python_out=../grpc_file ReceiveData.proto

执行以上脚本后,会在grpc_file文件夹下生成两个.py文件:

2. 服务端实现

我们实现一个服务端,用于接收客户端发来的请求,并将接收到的数据进行清洗、分词等处理,请求格式为List,在proto文件中用repeated关键词修饰。

新建python文件,命名receive.py,编写以下内容:

import logging
import sys

sys.path.append('..')
import time
import datetime
from concurrent import futures

import grpc_file
import jieba
import re
import grpc

from grpc_file import ReceiveData_pb2_grpc
# 这个地方的BoolValue可能会飘红,不用管,用就行了
from google.protobuf.wrappers_pb2 import BoolValue

jieba.setLogLevel(log_level=0)
pattern = re.compile('(?:https?|ftp|file)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]')
logger = logging.getLogger('logger')
# 设置日志等级_
logger.setLevel(logging.INFO)


class ReceiveDataServer:
    def __init__(self, host, port):
        self.host = host
        self.port = port

    def match_url(self, text):
        """
        去除url
        :param text:
        :return:
        """
        clear = pattern.sub('', text)
        return clear

    def seg_sentence(self, data):
        """
        分词
        :param data:
        :return:
        """
        data = self.match_url(data)
        seg_text = jieba.cut(data.replace('\t', '').replace('\n', '').replace(' ', ''))
        context = ' '.join(seg_text)
        return context

    def receiveSample(self, request, context):
        """
        接收数据的grpc服务接口,该函数的名称要和proto文件中定义的接口名保持一致
        :param request:
        :param context:
        :return:
        """
        requestlist = request.reqList
        try:
            for item in requestlist:
                text = item.text.replace('\n', '。').replace('\r', '。')
                text = self.seg_sentence(text)
                print(datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'),
                  '*** 添加标签:{},添加文本:{}... ***'.format(item.labelValue, text[:10]))
            return BoolValue(value=True)
        except:
            return BoolValue(value=False)


def serve(host, port):
    # 启动 rpc 服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
    ReceiveData_pb2_grpc.add_LabelServiceServicer_to_server(ReceiveDataServer(host, port), server)
    server.add_insecure_port('{}:{}'.format(host, port))
    server.start()
    print('Grpc server connect successful!')

    try:
        while True:
            time.sleep(600)
            print(datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))
    except KeyboardInterrupt:
        print(datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))
        server.stop(0)


if __name__ == '__main__':
    # host和port写上服务端运行的机器ip和端口;
    serve(host='172.xxx.xxx.xxx', port=50001)

 我们运行一下这个py:

python receive.py

只要ip地址正确,发现服务启动成功!

2. 客户端实现

启动Server.py,我们再实现一个客户端,命名为client.py,让它能够连接服务端并发送数据:

import sys

sys.path.append('..')
import grpc
from grpc_file import ReceiveData_pb2, ReceiveData_pb2_grpc


def run():
    # 连接 rpc 服务器,ip和端口号必须和服务端设置的一致
    channel = grpc.insecure_channel('192.xxx.xxx.xx:50001')
    # 调用 rpc 服务
    stub = ReceiveData_pb2_grpc.LabelServiceStub(channel)
    requestlist = ReceiveData_pb2.SampleRequestList()
    for data in range(10):
        label = str(data)
        text = '第{}个标签的数据'.format(label)
        # 由于我们在proto文件中定义的接收数据格式为List,所以这里我们需要先定义一个向List中添加数据的对象
        request = requestlist.reqList.add()
        request.labelValue = label
        request.text = text
    response = stub.receiveSample(requestlist)
    print(response)

if __name__ == '__main__':
    run()

我们再运行一下客户端:

python client.py

数据发送给服务端成功!

 当服务端成功接收数据后,会返回给客户端一个 value: true。现在我们再来看下服务端有什么反应,切换回刚才启服务端所在的终端界面:

 从我们打印的日志中可以看出,服务端成功接收来自客户端发送的数据,而且我们可以看到添加文本后的字符串,已经进行了分词处理,这里为了演示,所以没加入url验证,且只取了前10个字符展示。其实大家可以根据自己的需求,从客户端接收数据后,在服务端代码里添加自己的业务逻辑,实现自己的业务需求,甚至可以做一系列处理后,将数据返回给客户端,只要定义好proto文件的返回值和返回格式即可。例如我们服务端的算法模型接收客户端数据,并对数据进行标签预测和打分,最后将标签和分数返回给客户端,那么proto中相应位置的定义可以如下:

syntax = "proto3";

service Predict {
  rpc predict(LabelRequest) returns (ResultReply) {}
}

message LabelRequest {
  string text = 1;
}

message ResultReply {
  string label = 1;
  string score = 2;
}

在服务端中,接口函数应当做如下返回:

return PredictServer_pb2.ResultReply(label=str(labels), score=str(probs))

在客户端中,应当做如下请求与接收相应:

response = stub.predict(PredictServer_pb2.LabelRequest(text='我热爱自然语言处理算法'))
print("预测类别:{}, 预测得分:{}: ".format(response.label, response.score))

3. 总结

通常我们利用grpc来接收不同客户端的请求,并将请求数据做业务逻辑处理,最后再返回给客户端。就好像通信基站一样,接收四面八方传来的手机通信请求,基站将这些请求做相应的处理,再返回给不同客户端,达到互联的目的。

总的来说,python实现简单的grpc需要以下步骤:定义protobuf——>通过protobuf生成2份方法文件——>定义服务端——>定义客户端。

有关python如何实现GRPC服务,python实现简单的grpc通信的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  4. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  5. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  6. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  7. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  8. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  9. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  10. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

随机推荐