上一篇我们介绍了metadata的使用方法,但是我们在每个方法内部都需要设置相同重复的metadata,比如调用时间戳,调用链等;能不能把这些相同的重复性设置,统一放在一个地方,方便后面修改和维护,答案就是拦截器-Interceptor.
Interceptor的使用func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
fmt.Println("---unaryInterceptor---")
// 解析请求携带的信息
str, _ := json.Marshal(req)
fmt.Printf("req: %s\n", str)
fmt.Printf("Method: %s\n", info.FullMethod)
defer func() {
trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
grpc.SetTrailer(ctx, trailer)
}()
// 解析请求的metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.DataLoss, "无法获取元数据")
}
if t, ok := md["timestamp"]; ok {
fmt.Println("timestamp from metadata:")
for i, e := range t {
fmt.Printf("%d.%s\n", i, e)
}
}
// 创建携带metadata的Header
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
grpc.SendHeader(ctx, header)
// 方法调用
m, err := handler(ctx, req)
if err != nil {
fmt.Printf("RPC failed with error %v", err)
}
return m, err
}
func (s *server) UnaryEcho(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
fmt.Println("---UnaryEcho---")
fmt.Printf("已接受到的请求:%v,发送响应\n", in)
return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}
context中解析metadata
Header里面的metadata
defer时,设置Trailer里面的metadata
// unaryInterceptor is an example unary interceptor.
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Printf("---unaryInterceptor---\n")
// 创建metadata到context中.
md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
ctx = metadata.NewOutgoingContext(ctx, md)
reqStr, _ := json.Marshal(req)
fmt.Printf("RPC: %s,req:%s\n", method, reqStr)
var header, trailer metadata.MD
opts = append(opts, grpc.Header(&header), grpc.Trailer(&trailer))
err := invoker(ctx, method, req, reply, cc, opts...)
if t, ok := header["timestamp"]; ok {
fmt.Printf("timestamp from header:\n")
for i, e := range t {
fmt.Printf("%d.%s\n", i, e)
}
} else {
log.Fatal("需要timestamp,但header中不存在timestamp")
}
if l, ok := header["location"]; ok {
fmt.Printf("location from header:\n")
for i, e := range l {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("需要location,但是header中不存在location")
}
if t, ok := trailer["timestamp"]; ok {
fmt.Printf("timestamp from trailer:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("需要timestamp,但header中不存在timestamp")
}
return err
}
func unaryCallWithMetadata(c pb.GreeterClient) {
fmt.Println("--- unaryCall ---")
// 使用metadata的上下文创建RPC
r, err := c.UnaryEcho(context.Background(), &pb.HelloRequest{Name: "unaryCall"})
if err != nil {
log.Fatalf("调用UnaryEcho失败:%v", err)
}
fmt.Println("response:")
fmt.Printf(" - %s\n", r.Message)
}
1.创建metadata并且放入context中
2.打印请求方法名和请求方法参数
3.定义用于存放服务端返回header, trailer
4.调用业务处理handle
5.解析header, trailer
Interceptor之后,我们业务代码变的很干净,只用关心业务层面的逻辑Interceptor的使用func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fmt.Printf("--- streamInterceptor ---\n")
// 调用完成时设置SetTrailer
defer func() {
trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
ss.SetTrailer(trailer)
}()
// 从Stream的Context中解析出metadata
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok {
return status.Errorf(codes.DataLoss, "ServerStreamingEcho: 无法获取metadata")
}
if t, ok := md["timestamp"]; ok {
fmt.Printf("timestamp from metadata:\n")
for i, e := range t {
fmt.Printf("%d.%s\n", i, e)
}
}
// 设置Header里面的metadata
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
ss.SendHeader(header)
err := handler(srv, ss)
if err != nil {
fmt.Printf("RPC failed with error %v", err)
}
return err
}
func (s *server) BidirectionalStreamingEcho(stream pb.Greeter_BidirectionalStreamingEchoServer) error {
fmt.Printf("--- BidirectionalStreamingEcho ---\n")
// Read requests and send responses.
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
fmt.Printf("request received %v, sending echo\n", in)
if err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()}); err != nil {
return err
}
}
}
stream模式下调用与普通调用修改逻辑基本一直,可以将所有的metadata操作都放入Interceptor中func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
fmt.Printf("---streamInterceptor---\n")
// 创建metadata到context中.
md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
ctx = metadata.NewOutgoingContext(ctx, md)
// 执行具体业务
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
// 解析 header
header, err := s.Header()
if err != nil {
log.Fatalf("无法从stream中获取header: %v", err)
}
if t, ok := header["timestamp"]; ok {
fmt.Printf("timestamp from header:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("需要timestamp,但header中不存在timestamp")
}
if l, ok := header["location"]; ok {
fmt.Printf("location from header:\n")
for i, e := range l {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("需要location,但是header中不存在location")
}
return s, nil
}
func bidirectionalWithMetadata(c pb.GreeterClient) {
fmt.Printf("--- bidirectional ---\n")
// Make RPC using the context with the metadata.
stream, err := c.BidirectionalStreamingEcho(context.Background())
if err != nil {
log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
}
go func() {
// Send all requests to the server.
for i := 0; i < 10; i++ {
if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
log.Fatalf("failed to send streaming: %v\n", err)
}
}
stream.CloseSend()
}()
// Read all the responses.
var rpcStatus error
fmt.Printf("response:\n")
for {
r, err := stream.Recv()
if err != nil {
rpcStatus = err
break
}
fmt.Printf(" - %s\n", r.Message)
}
if rpcStatus != io.EOF {
log.Fatalf("failed to finish server streaming: %v", rpcStatus)
}
// 解析trailer
trailer := stream.Trailer()
if t, ok := trailer["timestamp"]; ok {
fmt.Printf("timestamp from trailer:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("需要timestamp,但header中不存在timestamp")
}
}
stream.Trailer是不能放入Interceptor中的,只能写在对应的handle中,否则会报error:RPC failed with error rpc error: code = Canceled desc = context canceled
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po