Remote Procedure Calls (RPC) в Go

Aus archium.org
Wechseln zu: Navigation, Suche


Inhaltsverzeichnis

Значение

Что такое RPC?

Remote Procedure Call означает "Удаленный вызов процедуры". Осуществление такой процедуры стало незаменимым во многих приложениях, например, при обмене информацией между клиентами совершенно независимых процессов, работающих в разных программах и даже на удаленных компьютерах. Он часто используется для построения моделей "клиент-сервер". RPC основан на UDP или TCP, а следовательно можно также говорить о коммуникации без установления связи.

Протокол пользовательских датаграмм (UDP) и протокол управления передачей (TCP) относятся к семейству протоколов TCP/IP и регулируют обработку и расположение пакетов данных при обмене данными в Интернете. TCP/IP — это взаимодействие "клиент-сервер", основанное на эксклюзивном соединении от хоста к хосту. Службам назначаются определенные порты, чтобы хост мог обслуживать несколько подключений одновременно. Потоки данных делятся на два уровня:

  1. Сообщение перед отправкой разбивается на меньшие пакеты на уровне данных.
  2. Информация об адресе находится на уровне IP.
TCP
контролирует поток данных и предотвращает потерю пакетов. TCP принимает файл или поток данных от приложений на порту, распределяет пакеты и предоставляет им IP-заголовок. Вместе с заголовком получатель получает инструкции о том, как полностью и в правильном порядке собрать пакеты данных.
UDP
имеет ту же задачу, что и TCP, но функции управления отсутствуют, а также опущена нумерация пакетов. Поэтому он тоньше, легче обрабатывается и, следовательно, быстрее. Он подходит для приложений, которые переносят только отдельные пакеты данных без контекста (например, временные метки) или для быстрой передачи больших объемов данных с приложениями, которые сами берут на себя сложные функции управления (например, сетевая файловая система NFS).

Функциональность

Что делает удаленный вызов процедуры (RPC)?

Связь начинается, когда клиент отправляет запрос известному серверу и ждет ответа. В запросе клиент указывает, какая функция должна выполняться с какими параметрами. Сервер обрабатывает запрос и отправляет ответ обратно клиенту. После получения ответа клиент может продолжить работу.

Примеры RPC с Go

Следующие примеры всегда состоят из двух программ, которые должны выполняться независимо друг от друга, сервера и клиента. Сервер должен всегда запускаться раньше клиента.

Вариант 1-1: Пакет "net/rpc"

[Server] Файл "main.go"

package main

import (
	"errors"
	"fmt"
	"log"
	"net"
	"net/rpc"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))

	l, e := net.Listen("tcp", "127.0.0.1:17152")

	defer l.Close()

	if e != nil {
		log.Fatal("listen error:", e)
	}

	for {
		c, err := l.Accept()
		fmt.Printf("request from %v\n", c.RemoteAddr())
		if err != nil {
			continue
		}
		go rpc.ServeConn(c)
	}
}

Результат:
$ ./rpc_server 
request from 127.0.0.1:47156
request from 127.0.0.1:47158
request from 127.0.0.1:47160
^C

[Client] Файл "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	client, err := rpc.Dial("tcp", "127.0.0.1:17152")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Результат:
$ ./rpc_client 
request took 405.45µs
7 / 8 = 0.875
$ ./rpc_client 
request took 1.055826ms
7 / 8 = 0.875
$ ./rpc_client 
request took 1.089143ms
7 / 8 = 0.875
$

Вариант 1-2: Пакет "net/rpc" с HTTP-Обработчиком

ServeConn из приведенного выше примера обеспечивает только одно соединение, которое необходимо перезапускать после разрыва. Есть другое решение - Обработчик HTTP, который позволяет настроить постоянно "слушающий" сервер. Он запускается только один раз и обслуживает любое количество запросов. Однако методы слушателя больше не доступны.

[Server] Файл "main.go"

package main

import (
	"errors"
	"log"
	"net"
	"net/http"
	"net/rpc"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))
	rpc.HandleHTTP()

	l, e := net.Listen("tcp", "127.0.0.1:17152")

	defer l.Close()

	if e != nil {
		log.Fatal("listen error:", e)
	}

	go http.Serve(l, nil)

	useChannelToWait := make(chan int, 1)
	<-useChannelToWait
}

Результат:
$ ./rpc_server 
^C

[Client] Файл "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	client, err := rpc.DialHTTP("tcp", "127.0.0.1:17152")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Результат:
$ ./rpc_client
request took 1.311427ms
7 / 8 = 0.875
$ ./rpc_client
request took 576.049µs
7 / 8 = 0.875
$ ./rpc_client
request took 1.224976ms
7 / 8 = 0.875
$

Вариант 1-3: Пакет "net/rpc/jsonrpc"

"net/rpc" обменивается данными только между процессами Go, а "net/rpc/jsonrpc" подходит для межъязыковой/межплатформенной связи в соответствии со спецификацией JSON-RPC 2.0.

[Server] Файл "main.go"

package main

import (
	"errors"
	"fmt"
	"log"
	"net"
	"net/rpc"
	"net/rpc/jsonrpc"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))

	l, e := net.Listen("tcp", "127.0.0.1:17152")

	defer l.Close()

	if e != nil {
		log.Fatal("listen error:", e)
	}

	for {
		c, err := l.Accept()
		fmt.Printf("request from %v\n", c.RemoteAddr())
		if err != nil {
			continue
		}
		go jsonrpc.ServeConn(c)
	}
}

[Client] Файл "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc/jsonrpc"
	"time"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	client, err := jsonrpc.Dial("tcp", "127.0.0.1:17152")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Вариант 1-4: Пакет "golang.org/x/net/websocket" в комбинации с "net/rpc" или "net/rpc/jsonrpc"

[Server] Файл "main.go"

package main

import (
	"errors"
	"fmt"
	"net/http"
	"net/rpc"

	"golang.org/x/net/websocket"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))

	http.Handle("/ws", websocket.Handler(func(connection *websocket.Conn) {
		fmt.Printf("request from %v\n", connection.RemoteAddr())
		rpc.ServeConn(connection)       // Вариант 1
		//jsonrpc.ServeConn(connection) // Вариант 2
	}))
	http.ListenAndServe("127.0.0.1:17152", nil)

}

[Client] Файл "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"

	"golang.org/x/net/websocket"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	connection, err := websocket.Dial("ws://127.0.0.1:17152/ws", "", "http://127.0.0.1")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	defer connection.Close()

	client := rpc.NewClient(connection)       // Вариант 1
	//client := jsonrpc.NewClient(connection) // Вариант 2

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too

	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Вебсокеты как альтернатива — подходят для встраивания WebAssembly в распределённые системы

Обмен данными между программами Go и приложениями Go WebAssembly через RPC невозможен. Используя минималистскую библиотеку веб-сокетов для Go из https://github.com/nhooyr/websocket, мы можем создать столь же удобную двунаправленную связь между двумя программами Go (и WASM).

Вариант 2-1: Byte-Stream

[Server] Файл "main.go"

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"nhooyr.io/websocket"
)

func main() {
	ReverseFunc := func(s string) (result string) {
		for _, v := range s {
			result = string(v) + result
		}
		return
	}

	hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		var (
			buffer []byte
			mt     websocket.MessageType
		)

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithTimeout(r.Context(), time.Second*10)
		defer cancel()

		//Receive
		mt, buffer, err = connection.Read(ctx)
		if err != nil {
			panic(err)
		}
		fmt.Printf("Message received: %s, message type %d\n", string(buffer), mt)

		//Send
		connection.Write(ctx, websocket.MessageText, []byte(ReverseFunc(string(buffer))))
		fmt.Printf("Message send: %s\n", ReverseFunc(string(buffer)))

		connection.Close(websocket.StatusNormalClosure, "")
	})

	http.ListenAndServe("127.0.0.1:17152", hf)
}

[Client] Файл "main.go" (Партнер WebAssembly должен быть клиентом)

package main

import (
	"context"
	"fmt"
	"time"

	"nhooyr.io/websocket"
)

const MyMessage = "йотурк muihcra"

func main() {
	var (
		buffer []byte
		mt     websocket.MessageType
	)

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
	if err != nil {
		panic(err)
	}
	defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

	//Send
	connection.Write(ctx, websocket.MessageBinary, []byte(MyMessage))
	fmt.Printf("Message send: %s\n", MyMessage)

	//Receive
	mt, buffer, err = connection.Read(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Message received: %s, message type %d\n", string(buffer), mt)

	connection.Close(websocket.StatusNormalClosure, "")
}

Variante 2-2: JSON-Text

[Server] Файл "main.go"

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

func main() {
	ReverseFunc := func(s string) (result string) {
		for _, v := range s {
			result = string(v) + result
		}
		return
	}

	hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		var buffer interface{}

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithTimeout(r.Context(), time.Second*10)
		defer cancel()

		//Receive
		err = wsjson.Read(ctx, connection, &buffer)
		fmt.Printf("Message received: %s\n", buffer.(string))

		//Send
		err = wsjson.Write(ctx, connection, ReverseFunc(string(buffer.(string))))
		if err != nil {
			panic(err)
		}
		fmt.Printf("Message send: %s\n", ReverseFunc(string(buffer.(string))))

		connection.Close(websocket.StatusNormalClosure, "")
	})

	http.ListenAndServe("127.0.0.1:17152", hf)
}

[Client] Файл "main.go" (Партнер WebAssembly должен быть клиентом)

package main

import (
	"context"
	"fmt"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

const MyMessage = "йотурк muihcra"

func main() {
	var buffer interface{}

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
	if err != nil {
		panic(err)
	}
	defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

	//Send
	err = wsjson.Write(ctx, connection, MyMessage)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Message send: %s\n", MyMessage)

	//Receive
	err = wsjson.Read(ctx, connection, &buffer)
	fmt.Printf("Message received: %s\n", buffer.(string))

	connection.Close(websocket.StatusNormalClosure, "")
}

Практический совет: двусторонняя связь между сервером и клиентом WebAssembly

Cервер веб-сокетов невозможно настроить на виртуальной машине WebAssembly, но, как только соединение через веб-сокет установлено, оно работает в обоих направлениях. Но если мы хотим совершать удаленные вызовы процедур, нам уже нужны оба направления соединений для запроса и ответа. Имеет смысл открыть два соединения через веб-сокеты и зарезервировать их для соответствующего направления запроса.

В следующем примере wsjson.Write с последующим wsjson.Read для запросов на отправку объединены в одну функцию. wsjson.Read получает запросы, передает значение промежуточной функции для оценки запроса и возвращает результат отправителю через wsjson.Write.

[Server] Файл "main.go"

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

const MyMessage = "йотурк muihcra"

func wsjsonSend(ctx context.Context, conn *websocket.Conn, sendval interface{}) (err error, receiveval interface{}) {
	//Send
	err = wsjson.Write(ctx, conn, sendval)
	if err != nil {
		return err, nil
	}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		return err, nil
	}

	return nil, receiveval
}
func wsjsonReceive(ctx context.Context, conn *websocket.Conn, dosth func(interface{}) interface{}) {
	var err error
	var sendval, receiveval interface{}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		log.Fatalln(err)
	}
	sendval = dosth(receiveval)

	//Send
	err = wsjson.Write(ctx, conn, sendval.(string))
	if err != nil {
		log.Fatalln(err)
	}
}

func ReverseFunc(i interface{}) interface{} {
	var result string

	for _, v := range i.(string) {
		result = string(v) + result
	}
	return result
}

func main() {
	hf1 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithCancel(r.Context())
		defer cancel()
		for ; true; time.Sleep(1 * time.Second) {
			fmt.Printf("Message send: %s\n", MyMessage)

			// Send request and wait for answer
			err, buffer := wsjsonSend(ctx, connection, MyMessage)
			if err != nil {
				panic(err)
			}
			fmt.Printf("Message received: %s\n", buffer.(string))
		}
		connection.Close(websocket.StatusNormalClosure, "")
	})

	hf2 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithCancel(r.Context())
		defer cancel()
		for {
			// Receive and answer request
			wsjsonReceive(ctx, connection, ReverseFunc)
		}
		connection.Close(websocket.StatusNormalClosure, "")
	})

	go http.ListenAndServe("127.0.0.1:17152", hf1)
	go http.ListenAndServe("127.0.0.1:17153", hf2)

	time.Sleep(5 * time.Minute) //To give the experiment some time
}

[Client] Файл "main.go" (Партнер WebAssembly должен быть клиентом)

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

const MyMessage2 = "skcor ylbmessabew"

var wg sync.WaitGroup

func wsjsonSend(ctx context.Context, conn *websocket.Conn, sendval interface{}) (err error, receiveval interface{}) {
	//Send
	err = wsjson.Write(ctx, conn, sendval)
	if err != nil {
		return err, nil
	}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		return err, nil
	}

	return nil, receiveval
}
func wsjsonReceive(ctx context.Context, conn *websocket.Conn, dosth func(interface{}) interface{}) {
	var err error
	var sendval, receiveval interface{}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		log.Fatalln(err)
	}
	sendval = dosth(receiveval)

	//Send
	err = wsjson.Write(ctx, conn, sendval.(string))
	if err != nil {
		log.Fatalln(err)
	}
}

func ReverseFunc(i interface{}) interface{} {
	var result string

	for _, v := range i.(string) {
		result = string(v) + result
	}
	return result
}

func main() {
	ctx, cancel := context.WithCancel(context.Background()) //Contexts are safe for simultaneous use by multiple goroutines. (https://golang.org/pkg/context/#pkg-overview)
	defer cancel()

	connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
	if err != nil {
		panic(err)
	}
	defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

	connection2, _, err2 := websocket.Dial(ctx, "ws://127.0.0.1:17153", nil)
	if err != nil {
		panic(err2)
	}
	defer connection2.Close(websocket.StatusInternalError, "Websocket: internal Error")

	wg.Add(2)
	// Receive and answer request
	go func() {
		defer wg.Done()
		for {
			wsjsonReceive(ctx, connection, ReverseFunc)
		}
	}()

	// Send request and wait for answer
	go func() {
		defer wg.Done()
		for ; true; time.Sleep(5 * time.Second) {
			fmt.Printf("Message send: %s\n", MyMessage2)
			err, buffer2 := wsjsonSend(ctx, connection2, MyMessage2)
			if err != nil {
				panic(err)
			}
			fmt.Printf("Message received: %s\n", buffer2.(string))

		}
	}()
	wg.Wait()

	connection.Close(websocket.StatusNormalClosure, "")
}