Kratos gRPC

2022-04-25 10:27 更新

gRPC

transporter/grpc 中基于谷歌的 grpc 框架實現(xiàn)了?Transporter?,用以注冊 grpc 到 ?kratos.Server()? 中。

server

配置

  • ?Network() ?

配置服務(wù)端的 network 協(xié)議,如 tcp

  • ?Address() ?

配置服務(wù)端監(jiān)聽的地址

  • ?Timeout() ?

配置服務(wù)端的超時設(shè)置

  • ?Logger() ?

配置服務(wù)端使用日志

  • ?Middleware() ?

配置服務(wù)端的 kratos 中間件

  • ?UnaryInterceptor() ?

配置服務(wù)端使用的 grpc 攔截器

  • ?Options() ?

配置一些額外的 grpc.ServerOption

主要的實現(xiàn)細節(jié)

  • ?NewServer() ?

func NewServer(opts ...ServerOption) *Server {
  // grpc server 默認配置
    srv := &Server{
        network: "tcp",
        address: ":0",
        timeout: 1 * time.Second,
        health:  health.NewServer(),
        log:     log.NewHelper(log.DefaultLogger),
    }
  // 遞歸 opts
    for _, o := range opts {
        o(srv)
    }
  // kratos middleware 轉(zhuǎn)換成 grpc 攔截器,并處理一些細節(jié)
    var ints = []grpc.UnaryServerInterceptor{
        srv.unaryServerInterceptor(),
    }

    if len(srv.ints) > 0 {
        ints = append(ints, srv.ints...)
    }

  // 將 UnaryInterceptor 轉(zhuǎn)換成 ServerOption
    var grpcOpts = []grpc.ServerOption{
        grpc.ChainUnaryInterceptor(ints...),
    }
    if len(srv.grpcOpts) > 0 {
        grpcOpts = append(grpcOpts, srv.grpcOpts...)
    }
  // 創(chuàng)建 grpc server
    srv.Server = grpc.NewServer(grpcOpts...)
  // 創(chuàng)建 metadata server
    srv.metadata = apimd.NewServer(srv.Server)
    // 內(nèi)部注冊
    grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
    apimd.RegisterMetadataServer(srv.Server, srv.metadata)
    reflection.Register(srv.Server)
    return srv
}

  • ?unaryServerInterceptor() ?

func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // 把兩個 ctx 合并成一個
        ctx, cancel := ic.Merge(ctx, s.ctx)
        defer cancel()
    // 從 ctx 中取出 metadata
        md, _ := grpcmd.FromIncomingContext(ctx)
    // 把一些信息綁定到 ctx 上
        ctx = transport.NewServerContext(ctx, &Transport{
            endpoint:  s.endpoint.String(),
            operation: info.FullMethod,
            header:    headerCarrier(md),
        })
    // ctx 超時設(shè)置
        if s.timeout > 0 {
            ctx, cancel = context.WithTimeout(ctx, s.timeout)
            defer cancel()
        }
    // 中間件處理
        h := func(ctx context.Context, req interface{}) (interface{}, error) {
            return handler(ctx, req)
        }
        if len(s.middleware) > 0 {
            h = middleware.Chain(s.middleware...)(h)
        }
        return h(ctx, req)
    }
}

使用方式

簡單列舉了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 倉庫中查看。

  • 注冊 grpc server

gs := grpc.NewServer()
app := kratos.New(
    kratos.Name("kratos"),
    kratos.Version("v1.0.0"),
    kratos.Server(gs),
)

  • grpc server 中使用 kratos middleware

grpcSrv := grpc.NewServer(
    grpc.Address(":9000"),
    grpc.Middleware(
        logging.Server(),
    ),
)

  • middleware 中處理 grpc 請求

if info, ok := transport.FromServerContext(ctx); ok {
  kind = info.Kind().String()
  operation = info.Operation()
}

client

配置

  • ?WithEndpoint() ?

配置客戶端使用的對端連接地址,如果不使用服務(wù)發(fā)現(xiàn)則為ip:port,如果使用服務(wù)發(fā)現(xiàn)則格式為discovery://\<authority>/\<serviceName>

  • ?WithTimeout() ?

配置客戶端的請求默認超時時間,如果有鏈路超時優(yōu)先使用鏈路超時時間

  • ?WithMiddleware() ?

配置客戶端使用的 kratos 中間件

  • ?WithDiscovery() ?

配置客戶端使用的服務(wù)發(fā)現(xiàn)

  • ?WithUnaryInterceptor() ?

配置客戶端使用的 grpc 原生攔截器

  • ?WithOptions() ?

配置一些額外的 grpc.ClientOption

主要的實現(xiàn)細節(jié)

  • ?dial() ?

func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
    // 默認配置
  options := clientOptions{
        timeout: 500 * time.Millisecond,
    }
  // 遍歷 opts
    for _, o := range opts {
        o(&options)
    }
  // 將 kratos 中間件轉(zhuǎn)化成 grpc 攔截器
    var ints = []grpc.UnaryClientInterceptor{
        unaryClientInterceptor(options.middleware, options.timeout),
    }
    if len(options.ints) > 0 {
        ints = append(ints, options.ints...)
    }
    var grpcOpts = []grpc.DialOption{
    // 負載均衡
        grpc.WithBalancerName(roundrobin.Name),
        grpc.WithChainUnaryInterceptor(ints...),
    }
    if options.discovery != nil {
    // 如果存在服務(wù)發(fā)現(xiàn)配置,就配置 grpc 的 Resolvers
        grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
    }
    if insecure {
    // 跳過證書驗證
        grpcOpts = append(grpcOpts, grpc.WithInsecure())
    }
    if len(options.grpcOpts) > 0 {
        grpcOpts = append(grpcOpts, options.grpcOpts...)
    }
    return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    // 把一些信息綁定到 ctx 上
        ctx = transport.NewClientContext(ctx, &Transport{
            endpoint:  cc.Target(),
            operation: method,
            header:    headerCarrier{},
        })
        if timeout > 0 {
      // timeout 如果大于 0,就重新設(shè)置一下 ctx 的超時時間
            var cancel context.CancelFunc
            ctx, cancel = context.WithTimeout(ctx, timeout)
            defer cancel()
        }
    // 中間件處理
        h := func(ctx context.Context, req interface{}) (interface{}, error) {
            if tr, ok := transport.FromClientContext(ctx); ok {
                keys := tr.Header().Keys()
                keyvals := make([]string, 0, len(keys))
                for _, k := range keys {
                    keyvals = append(keyvals, k, tr.Header().Get(k))
                }
                ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
            }
            return reply, invoker(ctx, method, req, reply, cc, opts...)
        }
        if len(ms) > 0 {
            h = middleware.Chain(ms...)(h)
        }
        _, err := h(ctx, req)
        return err
    }
}

使用方式

  • 創(chuàng)建客戶端連接

conn, err := grpc.DialInsecure(
    context.Background(),
    grpc.WithEndpoint("127.0.0.1:9000"),
)

  • 使用中間件

conn, err := grpc.DialInsecure(
    context.Background(),
    transport.WithEndpoint("127.0.0.1:9000"),
    transport.WithMiddleware(
          recovery.Recovery(),
    ),
)

  • 使用服務(wù)發(fā)現(xiàn)

conn, err := grpc.DialInsecure(
    context.Background(),
    grpc.WithEndpoint("discovery:///helloworld"),
    grpc.WithDiscovery(r),
)


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號