Remote Procedure Calls (RPC) in Go
Inhaltsverzeichnis
- 1 Bedeutung
- 2 Funktionsweise
- 3 Beispiele für RPC mit Go
- 4 Websockets als Alternative – geeignet zur Einbettung von WebAssembly in verteilte Systeme
- 5 Anmerkungen
Bedeutung
Was ist dieses RPC eigentlich?
Remote Procedure Call bedeutet soviel wie "Aufruf einer fernen Prozedur." Die Implementierung einer solchen Prozedur ist in vielen Anwendungen nicht mehr wegzudenken. Zum Beispiel immer dann, wenn ein Informationsaustausch zwischen Clients vollkommen eigenständiger Prozesse, die in verschiedenen Programmen und sogar auf entfernten Rechnern laufen, stattfinden soll. Es wird häufig genutzt, um Client-Server-Modelle aufzubauen. RPC basiert auf UDP oder TCP. Daher kann man auch von einer verbindungslosen Kommunikation sprechen.
Das User Datagram Protocol (UDP) und das Transmission Control Protocol (TCP) gehören zur TCP/IP-Protokollfamilie und regeln den Umgang mit und die Anordnung von Datenpaketen bei der Kommunikation im Internet. TCP/IP ist eine Client-Server Kommunikation, sie basiert auf einer exklusiven Verbindung von Host zu Host. Damit ein Host mehrere Verbindungen gleichzeitig bedienen kann, werde den Services bestimmte Ports zugeordnet. Die Datenströme sind in zwei Schichten aufgeteilt:
- In der Datenschicht wird eine Nachricht vor dem Versand in kleinere Pakete zerlegt.
- In der IP-Schicht stehen die Adressinformationen.[1]
- TCP
- steuert den Datenfluß und verhindert den Paketverlust. TCP nimmt ein File bzw. den Datenstrom von den Anwendungen am Port entgegen, verteilt die Pakete und stellt ihnen einen IP-Header bei. Der Empfänger erhält mit dem Header eine Anleitung, um die Datenpakete wieder in der richtigen Reihenfolge und vollständig zusammenzusetzen.
- UDP
- hat die selbe Aufgabe wie TCP, aber es fehlen die Kontrollfunktionen und auch die Nummerierung der Pakete entfällt. Es ist dadurch schlanker, einfacher zu verarbeiten und deswegen auch schneller. Es eignet sich für Anwendungen, die nur einzelne Datenpakete ohne Zusammenhang transportieren (z.B. Zeitmarken) oder für den schnellen Transport großer Datenmengen mit Anwendungen, die die komplexen Kontrollfunktionen selber auf eigene Weise übernehmen (z.B. Network File System NFS).
Funktionsweise
Was macht so ein Remote Procedure Call?
Die Kommunikation beginnt, indem der Client eine Anfrage an einen bekannten Server schickt und auf die Antwort wartet. In der Anfrage gibt der Client an, welche Funktion mit welchen Parametern ausgeführt werden soll. Der Server bearbeitet die Anfrage und schickt die Antwort an den Client zurück. Nach Empfang der Nachricht kann der Client seine Verarbeitung fortführen.[2]
Beispiele für RPC mit Go
Nachfolgende Beispiele bestehen immer aus zwei voneinander unabhängig auszuführenden Programmen, dem Server und dem Client. Der Server ist immer vor dem Client zu starten.
Variante 1-1: Package "net/rpc"
[Server] Datei "main.go"
package main
import (
"errors"
"fmt"
"log"
"net"
"net/rpc"
)
type QuotientServer struct {
Dividend, Divisor int
}
type ServerMethods int
func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
if args.Divisor == 0 {
return errors.New("division by zero")
} else {
*result = float64(args.Dividend) / float64(args.Divisor)
return nil
}
}
func main() {
rpc.Register(new(ServerMethods))
l, e := net.Listen("tcp", "127.0.0.1:17152")
defer l.Close()
if e != nil {
log.Fatal("listen error:", e)
}
for {
c, err := l.Accept()
fmt.Printf("request from %v\n", c.RemoteAddr())
if err != nil {
continue
}
go rpc.ServeConn(c)
}
}
Ergebnis
$ ./rpc_server
request from 127.0.0.1:47156
request from 127.0.0.1:47158
request from 127.0.0.1:47160
^C
[Client] Datei "main.go"
package main
import (
"fmt"
"log"
"net/rpc"
"time"
)
type quotientClient struct {
Dividend, Divisor int
}
func main() {
t0 := time.Now()
var (
result float64
args quotientClient
)
args.Dividend = 7
args.Divisor = 8
client, err := rpc.Dial("tcp", "127.0.0.1:17152")
if err != nil {
log.Fatal("dial error:", err)
}
err = client.Call("ServerMethods.Divide", &args, &result)
//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
if err != nil {
log.Fatal("division error:", err)
}
fmt.Printf("request took %v\n", time.Since(t0))
fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}
Ergebnis
$ ./rpc_client
request took 405.45µs
7 / 8 = 0.875
$ ./rpc_client
request took 1.055826ms
7 / 8 = 0.875
$ ./rpc_client
request took 1.089143ms
7 / 8 = 0.875
$
Variante 1-2: Package "net/rpc" mit HTTP-Handler
ServeConn aus dem oberen Beispiel liefert eine einzelne Verbindung, die nach Beendigung neu gestartet werden muß. Demgegenüber ermöglicht der HTTP-Handler den Aufbau eines dauerhaft "lauschenden" Servers. Er wird nur einmal gestartet und bedient beliebig viele Requests. Allerdings stehen die Methoden des Listeners nicht mehr zur Verfügung.
[Server] Datei "main.go"
package main
import (
"errors"
"log"
"net"
"net/http"
"net/rpc"
)
type QuotientServer struct {
Dividend, Divisor int
}
type ServerMethods int
func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
if args.Divisor == 0 {
return errors.New("division by zero")
} else {
*result = float64(args.Dividend) / float64(args.Divisor)
return nil
}
}
func main() {
rpc.Register(new(ServerMethods))
rpc.HandleHTTP()
l, e := net.Listen("tcp", "127.0.0.1:17152")
defer l.Close()
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
useChannelToWait := make(chan int, 1)
<-useChannelToWait
}
Ergebnis
$ ./rpc_server
^C
[Client] Datei "main.go"
package main
import (
"fmt"
"log"
"net/rpc"
"time"
)
type quotientClient struct {
Dividend, Divisor int
}
func main() {
t0 := time.Now()
var (
result float64
args quotientClient
)
args.Dividend = 7
args.Divisor = 8
client, err := rpc.DialHTTP("tcp", "127.0.0.1:17152")
if err != nil {
log.Fatal("dial error:", err)
}
err = client.Call("ServerMethods.Divide", &args, &result)
//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
if err != nil {
log.Fatal("division error:", err)
}
fmt.Printf("request took %v\n", time.Since(t0))
fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}
Ergebnis
$ ./rpc_client
request took 1.311427ms
7 / 8 = 0.875
$ ./rpc_client
request took 576.049µs
7 / 8 = 0.875
$ ./rpc_client
request took 1.224976ms
7 / 8 = 0.875
$
Variante 1-3: Package "net/rpc/jsonrpc"
"net/rpc" kommuniziert nur von Go-Prozessen zu Go-Prozessen. Für cross-language- / cross-platform-Kommunikation entsprechend der JSON-RPC 2.0 Specification eignet sich stattdessen "net/rpc/jsonrpc".
[Server] Datei "main.go"
package main
import (
"errors"
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
type QuotientServer struct {
Dividend, Divisor int
}
type ServerMethods int
func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
if args.Divisor == 0 {
return errors.New("division by zero")
} else {
*result = float64(args.Dividend) / float64(args.Divisor)
return nil
}
}
func main() {
rpc.Register(new(ServerMethods))
l, e := net.Listen("tcp", "127.0.0.1:17152")
defer l.Close()
if e != nil {
log.Fatal("listen error:", e)
}
for {
c, err := l.Accept()
fmt.Printf("request from %v\n", c.RemoteAddr())
if err != nil {
continue
}
go jsonrpc.ServeConn(c)
}
}
[Client] Datei "main.go"
package main
import (
"fmt"
"log"
"net/rpc/jsonrpc"
"time"
)
type quotientClient struct {
Dividend, Divisor int
}
func main() {
t0 := time.Now()
var (
result float64
args quotientClient
)
args.Dividend = 7
args.Divisor = 8
client, err := jsonrpc.Dial("tcp", "127.0.0.1:17152")
if err != nil {
log.Fatal("dial error:", err)
}
err = client.Call("ServerMethods.Divide", &args, &result)
//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
if err != nil {
log.Fatal("division error:", err)
}
fmt.Printf("request took %v\n", time.Since(t0))
fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}
Variante 1-4: Package "golang.org/x/net/websocket" in Kombination mit "net/rpc" oder "net/rpc/jsonrpc"
[Server] Datei "main.go"
package main
import (
"errors"
"fmt"
"net/http"
"net/rpc"
"golang.org/x/net/websocket"
)
type QuotientServer struct {
Dividend, Divisor int
}
type ServerMethods int
func (rcv *ServerMethods) Divide(args QuotientServer, result *float64) error {
if args.Divisor == 0 {
return errors.New("division by zero")
} else {
*result = float64(args.Dividend) / float64(args.Divisor)
return nil
}
}
func main() {
rpc.Register(new(ServerMethods))
http.Handle("/ws", websocket.Handler(func(connection *websocket.Conn) {
fmt.Printf("request from %v\n", connection.RemoteAddr())
rpc.ServeConn(connection) // variant 1
//jsonrpc.ServeConn(connection) // variant 2
}))
http.ListenAndServe("127.0.0.1:17152", nil)
}
[Client] Datei "main.go"
package main
import (
"fmt"
"log"
"net/rpc"
"time"
"golang.org/x/net/websocket"
)
type quotientClient struct {
Dividend, Divisor int
}
func main() {
t0 := time.Now()
var (
result float64
args quotientClient
)
args.Dividend = 7
args.Divisor = 8
connection, err := websocket.Dial("ws://127.0.0.1:17152/ws", "", "http://127.0.0.1")
if err != nil {
log.Fatal("dial error:", err)
}
defer connection.Close()
client := rpc.NewClient(connection) // variant 1
//client := jsonrpc.NewClient(connection) // variant 2
err = client.Call("ServerMethods.Divide", &args, &result)
//err = client.Call("ServerMethods.Divide", args, &result) // Valid, too
if err != nil {
log.Fatal("division error:", err)
}
fmt.Printf("request took %v\n", time.Since(t0))
fmt.Printf("%d / %d = %0.3f\n", args.Dividend, args.Divisor, result)
}
Websockets als Alternative – geeignet zur Einbettung von WebAssembly in verteilte Systeme
Die Kommunikation zwischen Go-Programmen und Go-WebAssembly-Anwendungen über RPC ist nicht zulässig.[3]Mit Hilfe der minimalistischen Websocket-Library für Go von https://github.com/nhooyr/websocket können wir eine ähnlich komfortable, bidirektionale Kommunikation zwischen zwei Go-Programmen (auch WASM!) aufbauen.
Variante 2-1: Byte-Stream
[Server] Datei "main.go"
package main
import (
"context"
"fmt"
"net/http"
"time"
"nhooyr.io/websocket"
)
func main() {
ReverseFunc := func(s string) (result string) {
for _, v := range s {
result = string(v) + result
}
return
}
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
buffer []byte
mt websocket.MessageType
)
connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionMode: websocket.CompressionDisabled,
OriginPatterns: []string{"*"},
})
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
ctx, cancel := context.WithTimeout(r.Context(), time.Second*10)
defer cancel()
//Receive
mt, buffer, err = connection.Read(ctx)
if err != nil {
panic(err)
}
fmt.Printf("Message received: %s, message type %d\n", string(buffer), mt)
//Send
connection.Write(ctx, websocket.MessageText, []byte(ReverseFunc(string(buffer))))
fmt.Printf("Message send: %s\n", ReverseFunc(string(buffer)))
connection.Close(websocket.StatusNormalClosure, "")
})
http.ListenAndServe("127.0.0.1:17152", hf)
}
[Client] Datei "main.go" (Der WebAssembly-Partner muß der Client sein)
package main
import (
"context"
"fmt"
"time"
"nhooyr.io/websocket"
)
const MyMessage = "looc tsi muihcra"
func main() {
var (
buffer []byte
mt websocket.MessageType
)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
//Send
connection.Write(ctx, websocket.MessageBinary, []byte(MyMessage))
fmt.Printf("Message send: %s\n", MyMessage)
//Receive
mt, buffer, err = connection.Read(ctx)
if err != nil {
panic(err)
}
fmt.Printf("Message received: %s, message type %d\n", string(buffer), mt)
connection.Close(websocket.StatusNormalClosure, "")
}
Variante 2-2: JSON-Text
[Server] Datei "main.go"
package main
import (
"context"
"fmt"
"net/http"
"time"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
func main() {
ReverseFunc := func(s string) (result string) {
for _, v := range s {
result = string(v) + result
}
return
}
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var buffer interface{}
connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionMode: websocket.CompressionDisabled,
OriginPatterns: []string{"*"},
})
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
ctx, cancel := context.WithTimeout(r.Context(), time.Second*10)
defer cancel()
//Receive
err = wsjson.Read(ctx, connection, &buffer)
fmt.Printf("Message received: %s\n", buffer.(string))
//Send
err = wsjson.Write(ctx, connection, ReverseFunc(string(buffer.(string))))
if err != nil {
panic(err)
}
fmt.Printf("Message send: %s\n", ReverseFunc(string(buffer.(string))))
connection.Close(websocket.StatusNormalClosure, "")
})
http.ListenAndServe("127.0.0.1:17152", hf)
}
[Client] Datei "main.go" (Der WebAssembly-Partner muß der Client sein)
package main
import (
"context"
"fmt"
"time"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
const MyMessage = "looc tsi muihcra"
func main() {
var buffer interface{}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
//Send
err = wsjson.Write(ctx, connection, MyMessage)
if err != nil {
panic(err)
}
fmt.Printf("Message send: %s\n", MyMessage)
//Receive
err = wsjson.Read(ctx, connection, &buffer)
fmt.Printf("Message received: %s\n", buffer.(string))
connection.Close(websocket.StatusNormalClosure, "")
}
Tip für die Praxis: Bidirektionale Kommunikation zwischen Server und WebAssembly-Client
Es ist zwar nicht möglich, in der virtuellen WebAssembly-Maschine einen Websocket-Server einzurichten, aber ist eine Websocket-Verbindung erst einmal aufgebaut, funktioniert sie in beide Richtungen. Wenn wir aber Remote-Procedure-Calls machen wollen, benötigen wir bereits beide Richtungen der Verbindungen für Anfrage und Antwort. Es bietet sich an, zwei Websocket-Verbindungen zu eröffnen und für die jeweilige Richtung eines Requests zu reservieren.
Im nachfolgenden Beispiel wurden wsjson.Write mit nachfolgendem wsjson.Read für Send-Requests zu einer Funktion zusammengefaßt. Im Gegenzug empfängt wsjson.Read Anfragen, übergibt den Wert an eine zwischengeschaltete Funktion zur Auswertung des Requests und liefert das Resultat über wsjson.Write zurück an den Absender.
[Server] Datei "main.go"
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
const MyMessage = "looc tsi muihcra"
func wsjsonSend(ctx context.Context, conn *websocket.Conn, sendval interface{}) (err error, receiveval interface{}) {
//Send
err = wsjson.Write(ctx, conn, sendval)
if err != nil {
return err, nil
}
//Receive
err = wsjson.Read(ctx, conn, &receiveval)
if err != nil {
return err, nil
}
return nil, receiveval
}
func wsjsonReceive(ctx context.Context, conn *websocket.Conn, dosth func(interface{}) interface{}) {
var err error
var sendval, receiveval interface{}
//Receive
err = wsjson.Read(ctx, conn, &receiveval)
if err != nil {
log.Fatalln(err)
}
sendval = dosth(receiveval)
//Send
err = wsjson.Write(ctx, conn, sendval.(string))
if err != nil {
log.Fatalln(err)
}
}
func ReverseFunc(i interface{}) interface{} {
var result string
for _, v := range i.(string) {
result = string(v) + result
}
return result
}
func main() {
hf1 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionMode: websocket.CompressionDisabled,
OriginPatterns: []string{"*"},
})
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
for ; true; time.Sleep(1 * time.Second) {
fmt.Printf("Message send: %s\n", MyMessage)
// Send request and wait for answer
err, buffer := wsjsonSend(ctx, connection, MyMessage)
if err != nil {
panic(err)
}
fmt.Printf("Message received: %s\n", buffer.(string))
}
connection.Close(websocket.StatusNormalClosure, "")
})
hf2 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
connection, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionMode: websocket.CompressionDisabled,
OriginPatterns: []string{"*"},
})
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
for {
// Receive and answer request
wsjsonReceive(ctx, connection, ReverseFunc)
}
connection.Close(websocket.StatusNormalClosure, "")
})
go http.ListenAndServe("127.0.0.1:17152", hf1)
go http.ListenAndServe("127.0.0.1:17153", hf2)
time.Sleep(5 * time.Minute) //To give the experiment some time
}
[Client] Datei "main.go" (Der WebAssembly-Partner muß der Client sein)
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
const MyMessage2 = "skcor ylbmessabew"
var wg sync.WaitGroup
func wsjsonSend(ctx context.Context, conn *websocket.Conn, sendval interface{}) (err error, receiveval interface{}) {
//Send
err = wsjson.Write(ctx, conn, sendval)
if err != nil {
return err, nil
}
//Receive
err = wsjson.Read(ctx, conn, &receiveval)
if err != nil {
return err, nil
}
return nil, receiveval
}
func wsjsonReceive(ctx context.Context, conn *websocket.Conn, dosth func(interface{}) interface{}) {
var err error
var sendval, receiveval interface{}
//Receive
err = wsjson.Read(ctx, conn, &receiveval)
if err != nil {
log.Fatalln(err)
}
sendval = dosth(receiveval)
//Send
err = wsjson.Write(ctx, conn, sendval.(string))
if err != nil {
log.Fatalln(err)
}
}
func ReverseFunc(i interface{}) interface{} {
var result string
for _, v := range i.(string) {
result = string(v) + result
}
return result
}
func main() {
ctx, cancel := context.WithCancel(context.Background()) //Contexts are safe for simultaneous use by multiple goroutines. (https://golang.org/pkg/context/#pkg-overview)
defer cancel()
connection, _, err := websocket.Dial(ctx, "ws://127.0.0.1:17152", nil)
if err != nil {
panic(err)
}
defer connection.Close(websocket.StatusInternalError, "Websocket: internal Error")
connection2, _, err2 := websocket.Dial(ctx, "ws://127.0.0.1:17153", nil)
if err != nil {
panic(err2)
}
defer connection2.Close(websocket.StatusInternalError, "Websocket: internal Error")
wg.Add(2)
// Receive and answer request
go func() {
defer wg.Done()
for {
wsjsonReceive(ctx, connection, ReverseFunc)
}
}()
// Send request and wait for answer
go func() {
defer wg.Done()
for ; true; time.Sleep(5 * time.Second) {
fmt.Printf("Message send: %s\n", MyMessage2)
err, buffer2 := wsjsonSend(ctx, connection2, MyMessage2)
if err != nil {
panic(err)
}
fmt.Printf("Message received: %s\n", buffer2.(string))
}
}()
wg.Wait()
connection.Close(websocket.StatusNormalClosure, "")
}
Anmerkungen
- ↑ Siehe sehr anschaulich https://www.elektronik-kompendium.de zu TCP/IP, https://www.elektronik-kompendium.de zu TCP und https://www.elektronik-kompendium.de zu UDP, Stand 2025
- ↑ https://de.wikipedia.org/wiki/Remote_Procedure_Call
- ↑ https://stackoverflow.com/questions/55750947/websockets-over-webassembly-generated-by-golang