transporter/grpc 中基于谷歌的 grpc 框架實現(xiàn)了?Transporter
?,用以注冊 grpc 到 ?kratos.Server()
? 中。
Network()
?配置服務(wù)端的 network 協(xié)議,如 tcp
Address()
?配置服務(wù)端監(jiān)聽的地址
Timeout()
?配置服務(wù)端的超時設(shè)置
Logger()
?配置服務(wù)端使用日志
Middleware()
?配置服務(wù)端的 kratos 中間件
UnaryInterceptor()
?配置服務(wù)端使用的 grpc 攔截器
Options()
?配置一些額外的 grpc.ServerOption
NewServer()
?func NewServer(opts ...ServerOption) *Server {
// grpc server 默認配置
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
log: log.NewHelper(log.DefaultLogger),
}
// 遞歸 opts
for _, o := range opts {
o(srv)
}
// kratos middleware 轉(zhuǎn)換成 grpc 攔截器,并處理一些細節(jié)
var ints = []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(),
}
if len(srv.ints) > 0 {
ints = append(ints, srv.ints...)
}
// 將 UnaryInterceptor 轉(zhuǎn)換成 ServerOption
var grpcOpts = []grpc.ServerOption{
grpc.ChainUnaryInterceptor(ints...),
}
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
// 創(chuàng)建 grpc server
srv.Server = grpc.NewServer(grpcOpts...)
// 創(chuàng)建 metadata server
srv.metadata = apimd.NewServer(srv.Server)
// 內(nèi)部注冊
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
apimd.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
return srv
}
unaryServerInterceptor()
?func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 把兩個 ctx 合并成一個
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
// 從 ctx 中取出 metadata
md, _ := grpcmd.FromIncomingContext(ctx)
// 把一些信息綁定到 ctx 上
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
header: headerCarrier(md),
})
// ctx 超時設(shè)置
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// 中間件處理
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if len(s.middleware) > 0 {
h = middleware.Chain(s.middleware...)(h)
}
return h(ctx, req)
}
}
簡單列舉了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 倉庫中查看。
gs := grpc.NewServer()
app := kratos.New(
kratos.Name("kratos"),
kratos.Version("v1.0.0"),
kratos.Server(gs),
)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
logging.Server(),
),
)
if info, ok := transport.FromServerContext(ctx); ok {
kind = info.Kind().String()
operation = info.Operation()
}
WithEndpoint()
?配置客戶端使用的對端連接地址,如果不使用服務(wù)發(fā)現(xiàn)則為ip:port,如果使用服務(wù)發(fā)現(xiàn)則格式為discovery://\<authority>/\<serviceName>
WithTimeout()
?配置客戶端的請求默認超時時間,如果有鏈路超時優(yōu)先使用鏈路超時時間
WithMiddleware()
?配置客戶端使用的 kratos 中間件
WithDiscovery()
?配置客戶端使用的服務(wù)發(fā)現(xiàn)
WithUnaryInterceptor()
?配置客戶端使用的 grpc 原生攔截器
WithOptions()
?配置一些額外的 grpc.ClientOption
dial()
?func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// 默認配置
options := clientOptions{
timeout: 500 * time.Millisecond,
}
// 遍歷 opts
for _, o := range opts {
o(&options)
}
// 將 kratos 中間件轉(zhuǎn)化成 grpc 攔截器
var ints = []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
var grpcOpts = []grpc.DialOption{
// 負載均衡
grpc.WithBalancerName(roundrobin.Name),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
// 如果存在服務(wù)發(fā)現(xiàn)配置,就配置 grpc 的 Resolvers
grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
}
if insecure {
// 跳過證書驗證
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 把一些信息綁定到 ctx 上
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
header: headerCarrier{},
})
if timeout > 0 {
// timeout 如果大于 0,就重新設(shè)置一下 ctx 的超時時間
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// 中間件處理
h := func(ctx context.Context, req interface{}) (interface{}, error) {
if tr, ok := transport.FromClientContext(ctx); ok {
keys := tr.Header().Keys()
keyvals := make([]string, 0, len(keys))
for _, k := range keys {
keyvals = append(keyvals, k, tr.Header().Get(k))
}
ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}
_, err := h(ctx, req)
return err
}
}
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)
conn, err := grpc.DialInsecure(
context.Background(),
transport.WithEndpoint("127.0.0.1:9000"),
transport.WithMiddleware(
recovery.Recovery(),
),
)
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)
更多建議: