Remote Procedure Calls (RPC) in Go

Aus archium.org
Wechseln zu: Navigation, Suche


Bedeutung

Was ist dieses RPC eigentlich?

Remote Procedure Call bedeutet soviel wie "Aufruf einer fernen Prozedur." Die Implementierung einer solchen Prozedur ist in vielen Anwendungen nicht mehr wegzudenken. Zum Beispiel immer dann, wenn ein Informationsaustausch zwischen Clients vollkommen eigenständiger Prozesse, die in verschiedenen Programmen und sogar auf entfernten Rechnern laufen, stattfinden soll. Es wird häufig genutzt, um Client-Server-Modelle aufzubauen. RPC basiert auf UDP oder TCP. Daher kann man auch von einer verbindungslosen Kommunikation sprechen.

Das User Datagram Protocol (UDP) und das Transmission Control Protocol (TCP) gehören zur TCP/IP-Protokollfamilie und regeln den Umgang mit und die Anordnung von Datenpaketen bei der Kommunikation im Internet. TCP/IP ist eine Client-Server Kommunikation, sie basiert auf einer exklusiven Verbindung von Host zu Host. Damit ein Host mehrere Verbindungen gleichzeitig bedienen kann, werde den Services bestimmte Ports zugeordnet. Die Datenströme sind in zwei Schichten aufgeteilt:

  1. In der Datenschicht wird eine Nachricht vor dem Versand in kleinere Pakete zerlegt.
  2. In der IP-Schicht stehen die Adressinformationen.[1]
TCP
steuert den Datenfluß und verhindert den Paketverlust. TCP nimmt ein File bzw. den Datenstrom von den Anwendungen am Port entgegen, verteilt die Pakete und stellt ihnen einen IP-Header bei. Der Empfänger erhält mit dem Header eine Anleitung, um die Datenpakete wieder in der richtigen Reihenfolge und vollständig zusammenzusetzen.
UDP
hat die selbe Aufgabe wie TCP, aber es fehlen die Kontrollfunktionen und auch die Nummerierung der Pakete entfällt. Es ist dadurch schlanker, einfacher zu verarbeiten und deswegen auch schneller. Es eignet sich für Anwendungen, die nur einzelne Datenpakete ohne Zusammenhang transportieren (z.B. Zeitmarken) oder für den schnellen Transport großer Datenmengen mit Anwendungen, die die komplexen Kontrollfunktionen selber auf eigene Weise übernehmen (z.B. Network File System NFS).

Funktionsweise

Was macht so ein Remote Procedure Call?

Die Kommunikation beginnt, indem der Client eine Anfrage an einen bekannten Server schickt und auf die Antwort wartet. In der Anfrage gibt der Client an, welche Funktion mit welchen Parametern ausgeführt werden soll. Der Server bearbeitet die Anfrage und schickt die Antwort an den Client zurück. Nach Empfang der Nachricht kann der Client seine Verarbeitung fortführen.[2]

Beispiele für RPC mit Go

Nachfolgende Beispiele bestehen immer aus zwei voneinander unabhängig auszuführenden Programmen, dem Server und dem Client. Der Server ist immer vor dem Client zu starten.

Variante 1-1: Package "net/rpc"

[Server] Datei "main.go"

package main

import (
	"errors"
	"fmt"
	"log"
	"net"
	"net/rpc"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))

	l, e := net.Listen("tcp", "127.0.0.1:17152")

	defer l.Close()

	if e != nil {
		log.Fatal("listen error:", e)
	}

	for {
		c, err := l.Accept()
		fmt.Printf("request from %v\n", c.RemoteAddr())
		if err != nil {
			continue
		}
		go rpc.ServeConn(c)
	}
}

Ergebnis
$ ./rpc_server 
request from 127.0.0.1:47156
request from 127.0.0.1:47158
request from 127.0.0.1:47160
^C


[Client] Datei "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	client, err := rpc.Dial("tcp", "127.0.0.1:17152")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Ergebnis
$ ./rpc_client 
request took 405.45µs
7 / 8 = 0.875
$ ./rpc_client 
request took 1.055826ms
7 / 8 = 0.875
$ ./rpc_client 
request took 1.089143ms
7 / 8 = 0.875
$

Variante 1-2: Package "net/rpc" mit HTTP-Handler

ServeConn aus dem oberen Beispiel liefert eine einzelne Verbindung, die nach Beendigung neu gestartet werden muß. Demgegenüber ermöglicht der HTTP-Handler den Aufbau eines dauerhaft "lauschenden" Servers. Er wird nur einmal gestartet und bedient beliebig viele Requests. Allerdings stehen die Methoden des Listeners nicht mehr zur Verfügung.

[Server] Datei "main.go"

package main

import (
	"errors"
	"log"
	"net"
	"net/http"
	"net/rpc"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))
	rpc.HandleHTTP()

	l, e := net.Listen("tcp", "127.0.0.1:17152")

	defer l.Close()

	if e != nil {
		log.Fatal("listen error:", e)
	}

	go http.Serve(l, nil)

	useChannelToWait := make(chan int, 1)
	<-useChannelToWait
}

Ergebnis
$ ./rpc_server 
^C

[Client] Datei "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	client, err := rpc.DialHTTP("tcp", "127.0.0.1:17152")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Ergebnis
$ ./rpc_client
request took 1.311427ms
7 / 8 = 0.875
$ ./rpc_client
request took 576.049µs
7 / 8 = 0.875
$ ./rpc_client
request took 1.224976ms
7 / 8 = 0.875
$

Variante 1-3: Package "net/rpc/jsonrpc"

"net/rpc" kommuniziert nur von Go-Prozessen zu Go-Prozessen. Für cross-language- / cross-platform-Kommunikation entsprechend der JSON-RPC 2.0 Specification eignet sich stattdessen "net/rpc/jsonrpc".

[Server] Datei "main.go"

package main

import (
	"errors"
	"fmt"
	"log"
	"net"
	"net/rpc"
	"net/rpc/jsonrpc"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))

	l, e := net.Listen("tcp", "127.0.0.1:17152")

	defer l.Close()

	if e != nil {
		log.Fatal("listen error:", e)
	}

	for {
		c, err := l.Accept()
		fmt.Printf("request from %v\n", c.RemoteAddr())
		if err != nil {
			continue
		}
		go jsonrpc.ServeConn(c)
	}
}


[Client] Datei "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc/jsonrpc"
	"time"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	client, err := jsonrpc.Dial("tcp", "127.0.0.1:17152")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Variante 1-4: Package "golang.org/x/net/websocket" in Kombination mit "net/rpc" oder "net/rpc/jsonrpc"

[Server] Datei "main.go"

package main

import (
	"errors"
	"fmt"
	"net/http"
	"net/rpc"

	"golang.org/x/net/websocket"
)

type QuotientServer struct {
	Dividend, Divisor int
}

type ServerMethods int

func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
	if args.Divisor == 0 {
		return errors.New("division by zero")
	} else {
		*result = float64(args.Dividend) / float64(args.Divisor)
		return nil
	}
}
func main() {
	rpc.Register(new(ServerMethods))

	http.Handle("/ws", websocket.Handler(func(connection *websocket.Conn) {
		fmt.Printf("request from %v\n", connection.RemoteAddr())
		rpc.ServeConn(connection)       // variant 1
		//jsonrpc.ServeConn(connection) // variant 2
	}))
	http.ListenAndServe("127.0.0.1:17152", nil)

}


[Client] Datei "main.go"

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"

	"golang.org/x/net/websocket"
)

type quotientClient struct {
	Dividend, Divisor int
}

func main() {
	t0 := time.Now()
	var (
		result float64
		args   quotientClient
	)

	args.Dividend = 7
	args.Divisor = 8

	connection, err := websocket.Dial("ws://127.0.0.1:17152/ws", "", "http://127.0.0.1")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	defer connection.Close()

	client := rpc.NewClient(connection)       // variant 1
	//client := jsonrpc.NewClient(connection) // variant 2

	err = client.Call("ServerMethods.Divide", &args, &result)
	//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too

	if err != nil {
		log.Fatal("division error:", err)
	}
	fmt.Printf("request took %v\n", time.Since(t0))
	fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}

Websockets als Alternative – geeignet zur Einbettung von WebAssembly in verteilte Systeme

Die Kommunikation zwischen Go-Programmen und Go-WebAssembly-Anwendungen über RPC ist nicht zulässig.[3]Mit Hilfe der minimalistischen Websocket-Library für Go von https://github.com/nhooyr/websocket können wir eine ähnlich komfortable, bidirektionale Kommunikation zwischen zwei Go-Programmen (auch WASM!) aufbauen.

Variante 2-1: Byte-Stream

[Server] Datei "main.go"

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"nhooyr.io/websocket"
)

func main() {
	ReverseFunc := func(s string) (result string) {
		for _, v := range s {
			result = string(v) + result
		}
		return
	}

	hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		var (
			buffer []byte
			mt     websocket.MessageType
		)

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithTimeout(r.Context(), time.Second*10)
		defer cancel()

		//Receive
		mt, buffer, err = connection.Read(ctx)
		if err != nil {
			panic(err)
		}
		fmt.Printf("Message received: %s, message type %d\n", string(buffer), mt)

		//Send
		connection.Write(ctx, websocket.MessageText, []byte(ReverseFunc(string(buffer))))
		fmt.Printf("Message send: %s\n", ReverseFunc(string(buffer)))

		connection.Close(websocket.StatusNormalClosure, "")
	})

	http.ListenAndServe("127.0.0.1:17152", hf)
}

[Client] Datei "main.go" (Der WebAssembly-Partner muß der Client sein)

package main

import (
	"context"
	"fmt"
	"time"

	"nhooyr.io/websocket"
)

const MyMessage = "looc tsi muihcra"

func main() {
	var (
		buffer []byte
		mt     websocket.MessageType
	)

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
	if err != nil {
		panic(err)
	}
	defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

	//Send
	connection.Write(ctx, websocket.MessageBinary, []byte(MyMessage))
	fmt.Printf("Message send: %s\n", MyMessage)

	//Receive
	mt, buffer, err = connection.Read(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Message received: %s, message type %d\n", string(buffer), mt)

	connection.Close(websocket.StatusNormalClosure, "")
}

Variante 2-2: JSON-Text

[Server] Datei "main.go"

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

func main() {
	ReverseFunc := func(s string) (result string) {
		for _, v := range s {
			result = string(v) + result
		}
		return
	}

	hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		var buffer interface{}

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithTimeout(r.Context(), time.Second*10)
		defer cancel()

		//Receive
		err = wsjson.Read(ctx, connection, &buffer)
		fmt.Printf("Message received: %s\n", buffer.(string))

		//Send
		err = wsjson.Write(ctx, connection, ReverseFunc(string(buffer.(string))))
		if err != nil {
			panic(err)
		}
		fmt.Printf("Message send: %s\n", ReverseFunc(string(buffer.(string))))

		connection.Close(websocket.StatusNormalClosure, "")
	})

	http.ListenAndServe("127.0.0.1:17152", hf)
}

[Client] Datei "main.go" (Der WebAssembly-Partner muß der Client sein)

package main

import (
	"context"
	"fmt"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

const MyMessage = "looc tsi muihcra"

func main() {
	var buffer interface{}

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
	if err != nil {
		panic(err)
	}
	defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

	//Send
	err = wsjson.Write(ctx, connection, MyMessage)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Message send: %s\n", MyMessage)

	//Receive
	err = wsjson.Read(ctx, connection, &buffer)
	fmt.Printf("Message received: %s\n", buffer.(string))

	connection.Close(websocket.StatusNormalClosure, "")
}

Tip für die Praxis: Bidirektionale Kommunikation zwischen Server und WebAssembly-Client

Es ist zwar nicht möglich, in der virtuellen WebAssembly-Maschine einen Websocket-Server einzurichten, aber ist eine Websocket-Verbindung erst einmal aufgebaut, funktioniert sie in beide Richtungen. Wenn wir aber Remote-Procedure-Calls machen wollen, benötigen wir bereits beide Richtungen der Verbindungen für Anfrage und Antwort. Es bietet sich an, zwei Websocket-Verbindungen zu eröffnen und für die jeweilige Richtung eines Requests zu reservieren.

Im nachfolgenden Beispiel wurden wsjson.Write mit nachfolgendem wsjson.Read für Send-Requests zu einer Funktion zusammengefaßt. Im Gegenzug empfängt wsjson.Read Anfragen, übergibt den Wert an eine zwischengeschaltete Funktion zur Auswertung des Requests und liefert das Resultat über wsjson.Write zurück an den Absender.

[Server] Datei "main.go"

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

const MyMessage = "looc tsi muihcra"

func wsjsonSend(ctx context.Context, conn *websocket.Conn, sendval interface{}) (err error, receiveval interface{}) {
	//Send
	err = wsjson.Write(ctx, conn, sendval)
	if err != nil {
		return err, nil
	}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		return err, nil
	}

	return nil, receiveval
}
func wsjsonReceive(ctx context.Context, conn *websocket.Conn, dosth func(interface{}) interface{}) {
	var err error
	var sendval, receiveval interface{}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		log.Fatalln(err)
	}
	sendval = dosth(receiveval)

	//Send
	err = wsjson.Write(ctx, conn, sendval.(string))
	if err != nil {
		log.Fatalln(err)
	}
}

func ReverseFunc(i interface{}) interface{} {
	var result string

	for _, v := range i.(string) {
		result = string(v) + result
	}
	return result
}

func main() {
	hf1 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithCancel(r.Context())
		defer cancel()
		for ; true; time.Sleep(1 * time.Second) {
			fmt.Printf("Message send: %s\n", MyMessage)

			// Send request and wait for answer
			err, buffer := wsjsonSend(ctx, connection, MyMessage)
			if err != nil {
				panic(err)
			}
			fmt.Printf("Message received: %s\n", buffer.(string))
		}
		connection.Close(websocket.StatusNormalClosure, "")
	})

	hf2 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
			CompressionMode: websocket.CompressionDisabled,
			OriginPatterns:  []string{"*"},
		})

		if err != nil {
			panic(err)
		}
		defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

		ctx, cancel := context.WithCancel(r.Context())
		defer cancel()
		for {
			// Receive and answer request
			wsjsonReceive(ctx, connection, ReverseFunc)
		}
		connection.Close(websocket.StatusNormalClosure, "")
	})

	go http.ListenAndServe("127.0.0.1:17152", hf1)
	go http.ListenAndServe("127.0.0.1:17153", hf2)

	time.Sleep(5 * time.Minute) //To give the experiment some time
}

[Client] Datei "main.go" (Der WebAssembly-Partner muß der Client sein)

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"nhooyr.io/websocket"
	"nhooyr.io/websocket/wsjson"
)

const MyMessage2 = "skcor ylbmessabew"

var wg sync.WaitGroup

func wsjsonSend(ctx context.Context, conn *websocket.Conn, sendval interface{}) (err error, receiveval interface{}) {
	//Send
	err = wsjson.Write(ctx, conn, sendval)
	if err != nil {
		return err, nil
	}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		return err, nil
	}

	return nil, receiveval
}
func wsjsonReceive(ctx context.Context, conn *websocket.Conn, dosth func(interface{}) interface{}) {
	var err error
	var sendval, receiveval interface{}

	//Receive
	err = wsjson.Read(ctx, conn, &receiveval)
	if err != nil {
		log.Fatalln(err)
	}
	sendval = dosth(receiveval)

	//Send
	err = wsjson.Write(ctx, conn, sendval.(string))
	if err != nil {
		log.Fatalln(err)
	}
}

func ReverseFunc(i interface{}) interface{} {
	var result string

	for _, v := range i.(string) {
		result = string(v) + result
	}
	return result
}

func main() {
	ctx, cancel := context.WithCancel(context.Background()) //Contexts are safe for simultaneous use by multiple goroutines. (https://golang.org/pkg/context/#pkg-overview)
	defer cancel()

	connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
	if err != nil {
		panic(err)
	}
	defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")

	connection2, _, err2 := websocket.Dial(ctx, "ws://127.0.0.1:17153", nil)
	if err != nil {
		panic(err2)
	}
	defer connection2.Close(websocket.StatusInternalError, "Websocket: internal Error")

	wg.Add(2)
	// Receive and answer request
	go func() {
		defer wg.Done()
		for {
			wsjsonReceive(ctx, connection, ReverseFunc)
		}
	}()

	// Send request and wait for answer
	go func() {
		defer wg.Done()
		for ; true; time.Sleep(5 * time.Second) {
			fmt.Printf("Message send: %s\n", MyMessage2)
			err, buffer2 := wsjsonSend(ctx, connection2, MyMessage2)
			if err != nil {
				panic(err)
			}
			fmt.Printf("Message received: %s\n", buffer2.(string))

		}
	}()
	wg.Wait()

	connection.Close(websocket.StatusNormalClosure, "")
}

Anmerkungen