草庐IT

GRPC Interceptor的使用

申_9a33 2023-03-28 原文

Interceptor 使用

上一篇我们介绍了metadata的使用方法,但是我们在每个方法内部都需要设置相同重复的metadata,比如调用时间戳,调用链等;能不能把这些相同的重复性设置,统一放在一个地方,方便后面修改和维护,答案就是拦截器-Interceptor.

1.普通调用 Interceptor的使用

1.1 服务端修改后代码

  • 服务端拦截器代码
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    fmt.Println("---unaryInterceptor---")

    // 解析请求携带的信息
    str, _ := json.Marshal(req)
    fmt.Printf("req: %s\n", str)
    fmt.Printf("Method: %s\n", info.FullMethod)

    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        grpc.SetTrailer(ctx, trailer)
    }()

    // 解析请求的metadata
    md, ok := metadata.FromIncomingContext(ctx)

    if !ok {
        return nil, status.Errorf(codes.DataLoss, "无法获取元数据")
    }

    if t, ok := md["timestamp"]; ok {
        fmt.Println("timestamp from metadata:")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    }

    // 创建携带metadata的Header
    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    grpc.SendHeader(ctx, header)

    // 方法调用
    m, err := handler(ctx, req)
    if err != nil {
        fmt.Printf("RPC failed with error %v", err)
    }
    return m, err
}
  • 服务端Handle代码
func (s *server) UnaryEcho(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Println("---UnaryEcho---")

    fmt.Printf("已接受到的请求:%v,发送响应\n", in)

    return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}

    1. 在拦截器里面我们可以打印出调用方法名和调用方法时,请求的参数
    1. context中解析metadata
    1. 设置Header里面的metadata
    1. 调用业务处理handle
    1. defer时,设置Trailer里面的metadata

1.2客户端修改后代码

  • 客户端拦截器代码
// unaryInterceptor is an example unary interceptor.
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    fmt.Printf("---unaryInterceptor---\n")

    // 创建metadata到context中.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx = metadata.NewOutgoingContext(ctx, md)

    reqStr, _ := json.Marshal(req)
    fmt.Printf("RPC: %s,req:%s\n", method, reqStr)

    var header, trailer metadata.MD
    opts = append(opts, grpc.Header(&header), grpc.Trailer(&trailer))

    err := invoker(ctx, method, req, reply, cc, opts...)

    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要location,但是header中不存在location")
    }

    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

    return err
}
  • 客户端Handle代码
func unaryCallWithMetadata(c pb.GreeterClient) {
    fmt.Println("--- unaryCall ---")

    // 使用metadata的上下文创建RPC

    r, err := c.UnaryEcho(context.Background(), &pb.HelloRequest{Name: "unaryCall"})
    if err != nil {
        log.Fatalf("调用UnaryEcho失败:%v", err)
    }

    fmt.Println("response:")
    fmt.Printf(" - %s\n", r.Message)
}
  • 1.创建metadata并且放入context

  • 2.打印请求方法名和请求方法参数

  • 3.定义用于存放服务端返回header, trailer

  • 4.调用业务处理handle

  • 5.解析header, trailer

总结

  • 可以看到用了Interceptor之后,我们业务代码变的很干净,只用关心业务层面的逻辑
  • 再拦截器里我们可以加入公共逻辑,log,错误处理,以及recover

2.stream调用 Interceptor的使用

2.1 服务端修改后代码

拦截器代码

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    fmt.Printf("--- streamInterceptor ---\n")

    // 调用完成时设置SetTrailer
    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        ss.SetTrailer(trailer)
    }()

    // 从Stream的Context中解析出metadata
    md, ok := metadata.FromIncomingContext(ss.Context())
    if !ok {
        return status.Errorf(codes.DataLoss, "ServerStreamingEcho: 无法获取metadata")
    }
    if t, ok := md["timestamp"]; ok {
        fmt.Printf("timestamp from metadata:\n")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    }

    // 设置Header里面的metadata
    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    ss.SendHeader(header)

    err := handler(srv, ss)
    if err != nil {
        fmt.Printf("RPC failed with error %v", err)
    }
    return err
}

服务端双向流Handle代码

func (s *server) BidirectionalStreamingEcho(stream pb.Greeter_BidirectionalStreamingEchoServer) error {
    fmt.Printf("--- BidirectionalStreamingEcho ---\n")

    // Read requests and send responses.
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        fmt.Printf("request received %v, sending echo\n", in)
        if err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()}); err != nil {
            return err
        }
    }
}
  • 1.stream模式下调用与普通调用修改逻辑基本一直,可以将所有的metadata操作都放入Interceptor

2.2 客户端修改后代码

客户端拦截器代码

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    fmt.Printf("---streamInterceptor---\n")

    // 创建metadata到context中.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx = metadata.NewOutgoingContext(ctx, md)

    // 执行具体业务
    s, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }

    // 解析 header
    header, err := s.Header()
    if err != nil {
        log.Fatalf("无法从stream中获取header: %v", err)
    }

    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }
    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要location,但是header中不存在location")
    }

    return s, nil
}

客户端双向流Handle代码

func bidirectionalWithMetadata(c pb.GreeterClient) {
    fmt.Printf("--- bidirectional ---\n")

    // Make RPC using the context with the metadata.
    stream, err := c.BidirectionalStreamingEcho(context.Background())
    if err != nil {
        log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
    }

    go func() {
        // Send all requests to the server.
        for i := 0; i < 10; i++ {
            if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
                log.Fatalf("failed to send streaming: %v\n", err)
            }
        }
        stream.CloseSend()
    }()

    // Read all the responses.
    var rpcStatus error
    fmt.Printf("response:\n")
    for {
        r, err := stream.Recv()
        if err != nil {
            rpcStatus = err
            break
        }
        fmt.Printf(" - %s\n", r.Message)
    }
    if rpcStatus != io.EOF {
        log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    }

    // 解析trailer
    trailer := stream.Trailer()

    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }
}
    1. stream.Trailer是不能放入Interceptor中的,只能写在对应的handle中,否则会报error:RPC failed with error rpc error: code = Canceled desc = context canceled

源码

有关GRPC Interceptor的使用的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐