Go 語言 玩轉(zhuǎn) RPC

2023-03-22 15:02 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch4-rpc/ch4-03-netrpc-hack.html


4.3 玩轉(zhuǎn) RPC

在不同的場景中 RPC 有著不同的需求,因此開源的社區(qū)就誕生了各種 RPC 框架。本節(jié)我們將嘗試 Go 內(nèi)置 RPC 框架在一些比較特殊場景的用法。

4.3.1 客戶端 RPC 的實現(xiàn)原理

Go 語言的 RPC 庫最簡單的使用方式是通過 Client.Call 方法進行同步阻塞調(diào)用,該方法的實現(xiàn)如下:

func (client *Client) Call(
    serviceMethod string, args interface{},
    reply interface{},
) error {
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}

首先通過 Client.Go 方法進行一次異步調(diào)用,返回一個表示這次調(diào)用的 Call 結(jié)構(gòu)體。然后等待 Call 結(jié)構(gòu)體的 Done 管道返回調(diào)用結(jié)果。

我們也可以通過 Client.Go 方法異步調(diào)用前面的 HelloService 服務(wù):

func doClientWork(client *rpc.Client) {
    helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)

    // do some thing

    helloCall = <-helloCall.Done
    if err := helloCall.Error; err != nil {
        log.Fatal(err)
    }

    args := helloCall.Args.(string)
    reply := helloCall.Reply.(*string)
    fmt.Println(args, *reply)
}

在異步調(diào)用命令發(fā)出后,一般會執(zhí)行其他的任務(wù),因此異步調(diào)用的輸入?yún)?shù)和返回值可以通過返回的 Call 變量進行獲取。

執(zhí)行異步調(diào)用的 Client.Go 方法實現(xiàn)如下:

func (client *Client) Go(
    serviceMethod string, args interface{},
    reply interface{},
    done chan *Call,
) *Call {
    call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    call.Done = make(chan *Call, 10) // buffered.

    client.send(call)
    return call
}

首先是構(gòu)造一個表示當前調(diào)用的 call 變量,然后通過 client.send 將 call 的完整參數(shù)發(fā)送到 RPC 框架。client.send 方法調(diào)用是線程安全的,因此可以從多個 Goroutine 同時向同一個 RPC 連接發(fā)送調(diào)用指令。

當調(diào)用完成或者發(fā)生錯誤時,將調(diào)用 call.done 方法通知完成:

func (call *Call) done() {
    select {
    case call.Done <- call:
        // ok
    default:
        // We don't want to block here. It is the caller's responsibility to make
        // sure the channel has enough buffer space. See comment in Go().
    }
}

從 Call.done 方法的實現(xiàn)可以得知 call.Done 管道會將處理后的 call 返回。

4.3.2 基于 RPC 實現(xiàn) Watch 功能

在很多系統(tǒng)中都提供了 Watch 監(jiān)視功能的接口,當系統(tǒng)滿足某種條件時 Watch 方法返回監(jiān)控的結(jié)果。在這里我們可以嘗試通過 RPC 框架實現(xiàn)一個基本的 Watch 功能。如前文所描述,因為 client.send 是線程安全的,我們也可以通過在不同的 Goroutine 中同時并發(fā)阻塞調(diào)用 RPC 方法。通過在一個獨立的 Goroutine 中調(diào)用 Watch 函數(shù)進行監(jiān)控。

為了便于演示,我們計劃通過 RPC 構(gòu)造一個簡單的內(nèi)存 KV 數(shù)據(jù)庫。首先定義服務(wù)如下:

type KVStoreService struct {
    m      map[string]string
    filter map[string]func(key string)
    mu     sync.Mutex
}

func NewKVStoreService() *KVStoreService {
    return &KVStoreService{
        m:      make(map[string]string),
        filter: make(map[string]func(key string)),
    }
}

其中 m 成員是一個 map 類型,用于存儲 KV 數(shù)據(jù)。filter 成員對應(yīng)每個 Watch 調(diào)用時定義的過濾器函數(shù)列表。而 mu 成員為互斥鎖,用于在多個 Goroutine 訪問或修改時對其它成員提供保護。

然后就是 Get 和 Set 方法:

func (p *KVStoreService) Get(key string, value *string) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    if v, ok := p.m[key]; ok {
        *value = v
        return nil
    }

    return fmt.Errorf("not found")
}

func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    key, value := kv[0], kv[1]

    if oldValue := p.m[key]; oldValue != value {
        for _, fn := range p.filter {
            fn(key)
        }
    }

    p.m[key] = value
    return nil
}

在 Set 方法中,輸入?yún)?shù)是 key 和 value 組成的數(shù)組,用一個匿名的空結(jié)構(gòu)體表示忽略了輸出參數(shù)。當修改某個 key 對應(yīng)的值時會調(diào)用每一個過濾器函數(shù)。

而過濾器列表在 Watch 方法中提供:

func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
    id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
    ch := make(chan string, 10) // buffered

    p.mu.Lock()
    p.filter[id] = func(key string) { ch <- key }
    p.mu.Unlock()

    select {
    case <-time.After(time.Duration(timeoutSecond) * time.Second):
        return fmt.Errorf("timeout")
    case key := <-ch:
        *keyChanged = key
        return nil
    }

    return nil
}

Watch 方法的輸入?yún)?shù)是超時的秒數(shù)。當有 key 變化時將 key 作為返回值返回。如果超過時間后依然沒有 key 被修改,則返回超時的錯誤。Watch 的實現(xiàn)中,用唯一的 id 表示每個 Watch 調(diào)用,然后根據(jù) id 將自身對應(yīng)的過濾器函數(shù)注冊到 p.filter 列表。

KVStoreService 服務(wù)的注冊和啟動過程我們不再贅述。下面我們看看如何從客戶端使用 Watch 方法:

func doClientWork(client *rpc.Client) {
    go func() {
        var keyChanged string
        err := client.Call("KVStoreService.Watch", 30, &keyChanged)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println("watch:", keyChanged)
    } ()

    err := client.Call(
        "KVStoreService.Set", [2]string{"abc", "abc-value"},
        new(struct{}),
    )
    if err != nil {
        log.Fatal(err)
    }

    time.Sleep(time.Second*3)
}

首先啟動一個獨立的 Goroutine 監(jiān)控 key 的變化。同步的 watch 調(diào)用會阻塞,直到有 key 發(fā)生變化或者超時。然后在通過 Set 方法修改 KV 值時,服務(wù)器會將變化的 key 通過 Watch 方法返回。這樣我們就可以實現(xiàn)對某些狀態(tài)的監(jiān)控。

4.3.3 反向 RPC

通常的 RPC 是基于 C/S 結(jié)構(gòu),RPC 的服務(wù)端對應(yīng)網(wǎng)絡(luò)的服務(wù)器,RPC 的客戶端也對應(yīng)網(wǎng)絡(luò)客戶端。但是對于一些特殊場景,比如在公司內(nèi)網(wǎng)提供一個 RPC 服務(wù),但是在外網(wǎng)無法連接到內(nèi)網(wǎng)的服務(wù)器。這種時候我們可以參考類似反向代理的技術(shù),首先從內(nèi)網(wǎng)主動連接到外網(wǎng)的 TCP 服務(wù)器,然后基于 TCP 連接向外網(wǎng)提供 RPC 服務(wù)。

以下是啟動反向 RPC 服務(wù)的代碼:

func main() {
    rpc.Register(new(HelloService))

    for {
        conn, _ := net.Dial("tcp", "localhost:1234")
        if conn == nil {
            time.Sleep(time.Second)
            continue
        }

        rpc.ServeConn(conn)
        conn.Close()
    }
}

反向 RPC 的內(nèi)網(wǎng)服務(wù)將不再主動提供 TCP 監(jiān)聽服務(wù),而是首先主動連接到對方的 TCP 服務(wù)器。然后基于每個建立的 TCP 連接向?qū)Ψ教峁?RPC 服務(wù)。

而 RPC 客戶端則需要在一個公共的地址提供一個 TCP 服務(wù),用于接受 RPC 服務(wù)器的連接請求:

func main() {
    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    clientChan := make(chan *rpc.Client)

    go func() {
        for {
            conn, err := listener.Accept()
            if err != nil {
                log.Fatal("Accept error:", err)
            }

            clientChan <- rpc.NewClient(conn)
        }
    }()

    doClientWork(clientChan)
}

當每個連接建立后,基于網(wǎng)絡(luò)連接構(gòu)造 RPC 客戶端對象并發(fā)送到 clientChan 管道。

客戶端執(zhí)行 RPC 調(diào)用的操作在 doClientWork 函數(shù)完成:

func doClientWork(clientChan <-chan *rpc.Client) {
    client := <-clientChan
    defer client.Close()

    var reply string
    err := client.Call("HelloService.Hello", "hello", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

首先從管道去取一個 RPC 客戶端對象,并且通過 defer 語句指定在函數(shù)退出前關(guān)閉客戶端。然后是執(zhí)行正常的 RPC 調(diào)用。

4.3.4 上下文信息

基于上下文我們可以針對不同客戶端提供定制化的 RPC 服務(wù)。我們可以通過為每個連接提供獨立的 RPC 服務(wù)來實現(xiàn)對上下文特性的支持。

首先改造 HelloService,里面增加了對應(yīng)連接的 conn 成員:

type HelloService struct {
    conn net.Conn
}

然后為每個連接啟動獨立的 RPC 服務(wù):

func main() {
    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Fatal("Accept error:", err)
        }

        go func() {
            defer conn.Close()

            p := rpc.NewServer()
            p.Register(&HelloService{conn: conn})
            p.ServeConn(conn)
        } ()
    }
}

Hello 方法中就可以根據(jù) conn 成員識別不同連接的 RPC 調(diào)用:

func (p *HelloService) Hello(request string, reply *string) error {
    *reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
    return nil
}

基于上下文信息,我們可以方便地為 RPC 服務(wù)增加簡單的登陸狀態(tài)的驗證:

type HelloService struct {
    conn    net.Conn
    isLogin bool
}

func (p *HelloService) Login(request string, reply *string) error {
    if request != "user:password" {
        return fmt.Errorf("auth failed")
    }
    log.Println("login ok")
    p.isLogin = true
    return nil
}

func (p *HelloService) Hello(request string, reply *string) error {
    if !p.isLogin {
        return fmt.Errorf("please login")
    }
    *reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
    return nil
}

這樣可以要求在客戶端連接 RPC 服務(wù)時,首先要執(zhí)行登陸操作,登陸成功后才能正常執(zhí)行其他的服務(wù)。



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號