支持 PingPong、Oneway、Streaming。
目前 Kitex 支持的消息類型、編解碼協(xié)議和傳輸協(xié)議
消息類型 | 編碼協(xié)議 | 傳輸協(xié)議 |
---|---|---|
PingPong | Thrift / Protobuf | TTHeader / HTTP2(gRPC) |
Oneway | Thrift | TTHeader |
Streaming | Protobuf | HTTP2(gRPC) |
目前 Thrift 支持 PingPong 和 Oneway。Kitex 計(jì)劃支持 Thrift Streaming。
IDL 定義 :
namespace go echo
struct Request {
1: string Msg
}
struct Response {
1: string Msg
}
service EchoService {
Response Echo(1: Request req); // pingpong method
oneway void VisitOneway(1: Request req); // oneway method
}
生成的代碼組織結(jié)構(gòu) :
.
└── kitex_gen
└── echo
├── echo.go
├── echoservice
│ ├── client.go
│ ├── echoservice.go
│ ├── invoker.go
│ └── server.go
└── k-echo.go
Server 的處理代碼形如 :
package main
import (
"xx/echo"
"xx/echo/echoservice"
)
type handler struct {}
func (handler) Echo(ctx context.Context, req *echo.Request) (r *echo.Response, err error) {
//...
return &echo.Response{ Msg: "world" }
}
func (handler) VisitOneway(ctx context.Context, req *echo.Request) (err error) {
//...
return nil
}
func main() {
svr, err := echoservice.NewServer(handler{})
if err != nil {
panic(err)
}
svr.Run()
}
Client 側(cè)代碼 :
package main
import (
"xx/echo"
"xx/echo/echoservice"
)
func main() {
cli, err := echoservice.NewClient("destServiceName")
if err != nil {
panic(err)
}
req := echo.NewRequest()
req.Msg = "hello"
resp, err := cli.Echo(req)
if err != nil {
panic(err)
}
// resp.Msg == "world"
}
Client 側(cè)代碼 :
package main
import (
"xx/echo"
"xx/echo/echoservice"
)
func main() {
cli, err := echoservice.NewClient("destServiceName")
if err != nil {
panic(err)
}
req := echo.NewRequest()
req.Msg = "hello"
err = cli.VisitOneway(req)
if err != nil {
panic(err)
}
// no response return
}
Kitex 支持兩種承載 Protobuf 負(fù)載的協(xié)議:
以下給出 Streaming 的使用示例。
IDL 定義 :
syntax = "proto3";
option go_package = "echo";
package echo;
message Request {
string msg = 1;
}
message Response {
string msg = 1;
}
service EchoService {
rpc ClientSideStreaming(stream Request) returns (Response) {} // 客戶端側(cè) streaming
rpc ServerSideStreaming(Request) returns (stream Response) {} // 服務(wù)端側(cè) streaming
rpc BidiSideStreaming(stream Request) returns (stream Response) {} // 雙向流
}
生成的代碼組織結(jié)構(gòu) :
.
└── kitex_gen
└── echo
├── echo.pb.go
└── echoservice
├── client.go
├── echoservice.go
├── invoker.go
└── server.go
Server 側(cè)代碼 :
package main
import (
"sync"
"xx/echo"
"xx/echo/echoservice"
}
type handler struct{}
func (handler) ClientSideStreaming(stream echo.EchoService_ClientSideStreamingServer) (err error) {
for {
req, err := stream.Recv()
if err != nil {
return err
}
}
}
func (handler) ServerSideStreaming(req *echo.Request, stream echo.EchoService_ServerSideStreamingServer) (err error) {
_ = req
for {
resp := &echo.Response{Msg: "world"}
if err := stream.Send(resp); err != nil {
return err
}
}
}
func (handler) BidiSideStreaming(stream echo.EchoService_BidiSideStreamingServer) (err error) {
var once sync.Once
go func() {
for {
req, err2 := stream.Recv()
log.Println("received:", req.GetMsg())
if err2 != nil {
once.Do(func() {
err = err2
})
break
}
}
}()
for {
resp := &echo.Response{Msg: "world"}
if err2 := stream.Send(resp); err2 != nil {
once.Do(func() {
err = err2
})
return
}
}
return
}
func main() {
svr, err := echoservice.NewServer(handler{})
if err != nil {
panic(err)
}
svr.Run()
}
客戶端流
package main
import (
"xx/echo"
"xx/echo/echoservice"
}
func main() {
cli, err := echoservice.NewClient("destServiceName")
if err != nil {
panic(err)
}
cliStream, err := cli.ClientSideStreaming(context.Background())
if err != nil {
panic(err)
}
for {
req := &echo.Request{Msg: "hello"}
if err := cliStream.Send(req); err != nil {
panic(err)
}
}
}
服務(wù)器端流
package main
import (
"xx/echo"
"xx/echo/echoservice"
}
func main() {
cli, err := echoservice.NewClient("destServiceName")
if err != nil {
panic(err)
}
req := &echo.Request{Msg: "hello"}
svrStream, err := cli.ServerSideStreaming(context.Background(), req)
if err != nil {
panic(err)
}
for {
resp, err := svrStream.Recv()
if err != nil {
panic(err)
}
// resp.Msg == "world"
}
}
雙向流
package main
import (
"xx/echo"
"xx/echo/echoservice"
}
func main() {
cli, err := echoservice.NewClient("destServiceName")
if err != nil {
panic(err)
}
req := &echo.Request{Msg: "hello"}
bidiStream, err := cli.BidiSideStreaming(context.Background())
if err != nil {
panic(err)
}
go func() {
for {
req := &echo.Request{Msg: "hello"}
err := bidiStream.Send(req)
if err != nil {
panic(err)
}
}
}()
for {
resp, err := bidiStream.Recv()
if err != nil {
panic(err)
}
// resp.Msg == "world"
}
}
更多建議: