diff --git a/.github/workflows/go-tests.yml b/.github/workflows/go-tests.yml new file mode 100644 index 0000000..8260ea3 --- /dev/null +++ b/.github/workflows/go-tests.yml @@ -0,0 +1,27 @@ +name: Go Tests + +on: + push: + branches: + - main + - tests + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Install dependencies + run: go get github.com/stretchr/testify/assert + + - name: Run Go tests + run: go test -v ./... \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..38688f9 --- /dev/null +++ b/README.md @@ -0,0 +1,176 @@ +# FastRpc + +[![Go Reference](https://pkg.go.dev/badge/github.com/AmanTrance/FastRpc.svg)](https://pkg.go.dev/github.com/AmanTrance/FastRpc) +[![Tests](https://github.com/AmanTrance/FastRpc/actions/workflows/go-test.yml/badge.svg)](https://github.com/AmanTrance/FastRpc/actions/workflows/go-test.yml) + +**FastRpc** is a simple, high-performance, TCP-based binary RPC (Remote Procedure Call) framework for Go. It is designed for extremely low-latency, synchronous, server-to-server communication where a full-featured framework like gRPC is overkill and text-based protocols like JSON-RPC are too slow. + +## Why FastRpc? + +In many systems, you need a service to synchronously call another service and get a response *fast*. The problem is that many solutions are either too complex or too slow: + +* **HTTP/JSON-RPC:** Easy to use, but the overhead of text parsing (JSON) and HTTP handshakes is high, making sub-10ms response times difficult. +* **gRPC:** Extremely fast and robust, but requires `.proto` files, code generation, and a more complex setup. +* **Polling/Queues (like RabbitMQ):** This is an *asynchronous* pattern. It's great for background jobs, but not for a synchronous "call-and-wait" request. + +FastRpc solves the "call-and-wait" problem by providing: +1. **A Simple Binary Protocol:** No text parsing. It sends raw bytes over a custom header protocol. +2. **Persistent Connection Pooling:** Clients (Slaves) hold a pool of "on-hold" TCP connections. When you make a call, it uses an existing connection, skipping the slow TCP handshake. +3. **Lightweight API:** The entire framework is just a few Go files with a minimal, easy-to-understand API. + +## Features + +* **`RpcMaster` (Server):** + * Concurrently handles thousands of client connections, one goroutine per client. + * Simple `RegisterRPC` method to map a string name to a handler function. + * Graceful `Close()` method. +* **`RpcSlave` (Client):** + * Manages a pool of persistent TCP connections for high throughput. + * Simple `CallForBuffer` method that feels like a local function call. + * Automatically re-uses connections from the pool. +* **Safe & Robust:** + * Concurrent-safe server and client using `sync.Mutex`. + * Properly handles stream I/O using loops to ensure all bytes are read/written. + * Automatically discards unread data to prevent connection poisoning. + +## Installation + +```sh +go get github.com/AmanTrance/FastRpc +```` + +## Quick Start + +Here is a complete, minimal example. + +### 1\. Server (`server.go`) + +```go +package main + +import ( + "context" + "log" + "net" + + fastrpc "github.com/AmanTrance/FastRpc" +) + +func main() { + master, _ := fastrpc.NewMaster() + + // Register a "ping" RPC + master.RegisterRPC("ping", "ping-pong", "", "", func(i *fastrpc.IOOperator) error { + return i.WriteIOFromBuffer([]byte("pong")) + }) + + log.Println("Starting RPC server on :10000") + // Start blocks and listens for connections + master.Start(context.Background(), net.IPv4(127, 0, 0, 1), 10000) +} +``` + +### 2\. Client (`client.go`) + +```go +package main + +import ( + "fmt" + "log" + "net" + + fastrpc "github.com/AmanTrance/FastRpc" +) + +func main() { + // Connect to the master, creating a pool of 5 connections + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), 10000, 5) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer slave.DeInitialize() + + // Call the "ping" RPC + data, err := slave.CallForBuffer("ping", nil) + if err != nil { + log.Fatalf("RPC call failed: %v", err) + } + + fmt.Printf("Server responded: %s\n", string(data)) // Prints: Server responded: pong +} +``` + +## API Reference + +### `RpcMaster` + +```go +// Creates a new, ready-to-configure RpcMaster. +// Automatically registers a 'capabilities' RPC at ID 0. +func NewMaster() (*RpcMaster, error) + +// Registers a new function to be called by clients. +func (r *RpcMaster) RegisterRPC(name string, description string, incomingType string, returningType string, rpc func(*IOOperator) error) + +// Starts the server. This is a blocking call. +func (r *RpcMaster) Start(ctx context.Context, ip net.IP, port int) error + +// Closes the master's listening socket, stopping it from accepting new connections. +func (r *RpcMaster) Close() error +``` + +### `RpcSlave` + +```go +// Creates a new client and connects to the master, filling the connection pool. +func NewSlave(masterIP net.IP, masterPort int, poolSize int) (*RpcSlave, error) + +// Gets the list of available RPCs from the master. +func (r *RpcSlave) GetMasterCapabilities() ([]MasterCapabilitiesDTO, error) + +// Calls an RPC by name and waits for a response. This is the primary method. +func (r *RpcSlave) CallForBuffer(method string, buf []byte) ([]byte, error) + +// Closes all connections in the pool and shuts down the slave. +func (r *RpcSlave) DeInitialize() +``` + +### `IOOperator` + +(This is the object passed into your RPC handler function) + +```go +// Returns the number of bytes the client sent that have not been read yet. +func (i *IOOperator) ReadDataLeft() uint64 + +// Reads a specific number of bytes from the client's request payload. +func (i *IOOperator) ReadIOStream(count int) ([]byte, error) + +// Writes a byte slice as the response. +// This will send a 9-byte header followed by the data. +func (i *IOOperator) WriteIOFromBuffer(buf []byte) error + +// Writes data from a reader (e.g., a file) as the response. +func (i *IOOperator) WriteIOFromReader(reader io.Reader, count int, chunkSize int) error + +// Sends an empty, successful response. +func (i *IOOperator) WriteNothing() error + +// Sends an error response. The client will receive this as an error. +func (i *IOOperator) WriteError(message string) error +``` + +## Running Tests + +To run the full test suite, including concurrency and data integrity tests: + +```sh +go test -v ./... +``` + +## License + +This project is licensed under the **GNU General Public License v3.0**. See the [LICENSE](https://www.google.com/search?q=LICENSE) file for details. + +``` \ No newline at end of file diff --git a/client.go b/client.go index c660fe9..f1481ab 100644 --- a/client.go +++ b/client.go @@ -121,13 +121,18 @@ func (r *RpcSlave) CallForBuffer(method string, buf []byte) ([]byte, error) { r.mutex.Lock() defer r.mutex.Unlock() + rpcID, ok := r.capabilitiesMap[method] + if !ok { + return nil, errors.New("unknown rpc method " + method) + } + connection := <-r.connectionPool defer func() { r.connectionPool <- connection }() var headersBuffer []byte = make([]byte, 12) - binary.BigEndian.PutUint32(headersBuffer[:4], r.capabilitiesMap[method]) + binary.BigEndian.PutUint32(headersBuffer[:4], rpcID) binary.BigEndian.PutUint64(headersBuffer[4:], uint64(len(buf))) err := writeSpecifiedBytes(connection, headersBuffer, 12) if err != nil { diff --git a/examples/client_concurrent/main.go b/examples/client_concurrent/main.go new file mode 100644 index 0000000..213f612 --- /dev/null +++ b/examples/client_concurrent/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "fmt" + "log" + "net" + "strconv" + "sync" + "time" + + fastrpc "github.com/AmanTrance/FastRpc" +) + +func main() { + ip := net.IPv4(127, 0, 0, 1) + port := 10000 + poolSize := 20 + numCalls := 1000 + + log.Printf("Connecting to master at %s:%d with pool size %d\n", ip, port, poolSize) + slave, err := fastrpc.NewSlave(ip, port, poolSize) + if err != nil { + log.Fatalf("Failed to create slave: %v", err) + } + defer slave.DeInitialize() + + var wg sync.WaitGroup + wg.Add(numCalls) + + log.Printf("Dispatching %d concurrent calls...\n", numCalls) + startTime := time.Now() + + for i := range numCalls { + go func(callIndex int) { + defer wg.Done() + + payload := []byte("Hello from call " + strconv.Itoa(callIndex)) + + data, err := slave.CallForBuffer("echo", payload) + if err != nil { + log.Printf("Call %d failed: %v\n", callIndex, err) + return + } + + if string(data) != string(payload) { + log.Printf("Call %d: Data mismatch!\n", callIndex) + return + } + }(i) + } + + wg.Wait() + duration := time.Since(startTime) + + fmt.Println("\n--- All calls complete ---") + fmt.Printf("Total time: %v\n", duration) + fmt.Printf("Total calls: %d\n", numCalls) + fmt.Printf("Avg. calls/sec: %.2f\n", float64(numCalls)/duration.Seconds()) +} diff --git a/examples/client_simple/main.go b/examples/client_simple/main.go new file mode 100644 index 0000000..cfa6753 --- /dev/null +++ b/examples/client_simple/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + "log" + "net" + + fastrpc "github.com/AmanTrance/FastRpc" +) + +func main() { + ip := net.IPv4(127, 0, 0, 1) + port := 10000 + poolSize := 2 + + log.Printf("Connecting to master at %s:%d with pool size %d\n", ip, port, poolSize) + slave, err := fastrpc.NewSlave(ip, port, poolSize) + if err != nil { + log.Fatalf("Failed to create slave: %v", err) + } + defer slave.DeInitialize() + + log.Println("Calling RPC: 'ping'...") + data, err := slave.CallForBuffer("ping", nil) + if err != nil { + log.Fatalf("RPC call 'ping' failed: %v", err) + } + + fmt.Printf("Server replied: %s\n", string(data)) + + log.Println("Calling RPC: 'echo'...") + payload := []byte("Hello from the simple client!") + data, err = slave.CallForBuffer("echo", payload) + if err != nil { + log.Fatalf("RPC call 'echo' failed: %v", err) + } + + fmt.Printf("Server echoed: %s\n", string(data)) +} diff --git a/examples/main.go b/examples/main.go deleted file mode 100644 index 372be3b..0000000 --- a/examples/main.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "net" - "time" - - fastrpc "github.com/AmanTrance/FastRpc" -) - -func main() { - master, err := fastrpc.NewMaster() - if err != nil { - log.Default().Fatal(err.Error()) - } - - master.RegisterRPC("rpc1", "hello world", "utf-8", "utf-8", func(i *fastrpc.IOOperator) error { - return i.WriteIOFromBuffer([]byte("hello world")) - }) - - master.RegisterRPC("rpc2", "data", "utf-8", "utf-8", func(i *fastrpc.IOOperator) error { - return i.WriteIOFromBuffer([]byte("some random data or some specific encoding based data")) - }) - - go func() { - err = master.Start(context.Background(), net.IPv4(127, 0, 0, 1), 10000) - if err != nil { - log.Default().Fatal(err.Error()) - } - }() - - time.Sleep(time.Second * 2) - - slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), 10000, 2) - if err != nil { - log.Default().Fatal(err.Error()) - } - - c, err := slave.GetMasterCapabilities() - if err != nil { - log.Default().Fatal(err.Error()) - } - - fmt.Printf("%v\n", c) - - data, err := slave.CallForBuffer("rpc1", nil) - if err != nil { - log.Default().Fatal(err.Error()) - } - - println(string(data)) - - data, err = slave.CallForBuffer("rpc2", nil) - if err != nil { - log.Default().Fatal(err.Error()) - } - - println(string(data)) -} diff --git a/examples/server/main.go b/examples/server/main.go new file mode 100644 index 0000000..ccc0469 --- /dev/null +++ b/examples/server/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "log" + "net" + + fastrpc "github.com/AmanTrance/FastRpc" +) + +func main() { + master, err := fastrpc.NewMaster() + if err != nil { + log.Fatalf("Failed to create master: %v", err) + } + + master.RegisterRPC("ping", "Simple ping-pong check", "nil", "string", func(i *fastrpc.IOOperator) error { + log.Println("[Server] Received PING") + return i.WriteIOFromBuffer([]byte("pong")) + }) + + master.RegisterRPC("echo", "Echoes back any data sent", "binary", "binary", func(i *fastrpc.IOOperator) error { + log.Printf("[Server] Received ECHO with %d bytes\n", i.ReadDataLeft()) + data, err := i.ReadIOStream(int(i.ReadDataLeft())) + if err != nil { + return err + } + return i.WriteIOFromBuffer(data) + }) + + master.RegisterRPC("large_data", "Returns a 10MB payload", "nil", "binary", func(i *fastrpc.IOOperator) error { + log.Println("[Server] Received LARGE_DATA request") + const dataSize = 10 * 1024 * 1024 + payload := make([]byte, dataSize) + rand.Read(payload) + + log.Println("[Server] Sending 10MB payload...") + return i.WriteIOFromBuffer(payload) + }) + + master.RegisterRPC("force_error", "A test function that always fails", "nil", "nil", func(i *fastrpc.IOOperator) error { + log.Println("[Server] Received FORCE_ERROR, returning an error") + return errors.New("a server-side error was forced") + }) + + ip := net.IPv4(127, 0, 0, 1) + port := 10000 + fmt.Printf("Starting FastRpc Master on %s:%d...\n", ip, port) + + err = master.Start(context.Background(), ip, port) + if err != nil { + log.Fatalf("Master failed to start: %v", err) + } +} diff --git a/fastrpc_test.go b/fastrpc_test.go new file mode 100644 index 0000000..61a5074 --- /dev/null +++ b/fastrpc_test.go @@ -0,0 +1,287 @@ +package fastrpc_test + +import ( + "bytes" + "context" + "crypto/rand" + "errors" + "net" + "strconv" + "sync" + "testing" + "time" + + fastrpc "github.com/AmanTrance/FastRpc" + "github.com/stretchr/testify/assert" +) + +func setupMaster(t *testing.T) (master *fastrpc.RpcMaster, port int, teardown func()) { + assertions := assert.New(t) + + master, err := fastrpc.NewMaster() + if !assertions.NoError(err, "Failed to create new master") { + t.FailNow() + } + + master.RegisterRPC("ping", "returns pong", "text", "text", func(i *fastrpc.IOOperator) error { + return i.WriteIOFromBuffer([]byte("pong")) + }) + + master.RegisterRPC("echo", "returns input", "binary", "binary", func(i *fastrpc.IOOperator) error { + data, err := i.ReadIOStream(int(i.ReadDataLeft())) + if err != nil { + return err + } + return i.WriteIOFromBuffer(data) + }) + + master.RegisterRPC("discard", "reads nothing, returns nothing", "binary", "text", func(i *fastrpc.IOOperator) error { + return i.WriteIOFromBuffer([]byte("discarded")) + }) + + master.RegisterRPC("force_error", "returns an error", "", "", func(i *fastrpc.IOOperator) error { + return errors.New("this is a forced server error") + }) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if !assertions.NoError(err) { + t.FailNow() + } + + port = listener.Addr().(*net.TCPAddr).Port + listener.Close() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + master.Start(ctx, net.IPv4(127, 0, 0, 1), port) + }() + + time.Sleep(50 * time.Millisecond) + + teardown = func() { + cancel() + master.Close() + } + + return master, port, teardown +} + +func TestBasicCall(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 1) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + data, err := slave.CallForBuffer("ping", nil) + if !assertions.NoError(err) { + return + } + assertions.Equal("pong", string(data)) + + payload := []byte("hello world") + data, err = slave.CallForBuffer("echo", payload) + if !assertions.NoError(err) { + return + } + assertions.Equal(payload, data) +} + +func TestSequentialCallsOnSameConnection(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 1) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + for i := range 100 { + payload := []byte("hello " + strconv.Itoa(i)) + data, err := slave.CallForBuffer("echo", payload) + + if !assertions.NoError(err, "Call failed at iteration %d", i) { + return + } + assertions.Equal(payload, data, "Data mismatch at iteration %d", i) + } +} + +func TestConcurrentCalls(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 20) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + numCalls := 1000 + var wg sync.WaitGroup + wg.Add(numCalls) + + for i := range numCalls { + callIndex := i + go func() { + defer wg.Done() + payload := []byte("concurrent hello " + strconv.Itoa(callIndex)) + data, err := slave.CallForBuffer("echo", payload) + + if !assert.NoError(t, err) { + return + } + assert.Equal(t, payload, data) + }() + } + wg.Wait() +} + +func TestLargeData(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 1) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + const dataSize = 10 * 1024 * 1024 + payload := make([]byte, dataSize) + _, err = rand.Read(payload) + if !assertions.NoError(err, "Failed to generate random payload") { + return + } + + data, err := slave.CallForBuffer("echo", payload) + if !assertions.NoError(err) { + return + } + + assertions.Equal(len(payload), len(data), "Returned data size is wrong") + assertions.True(bytes.Equal(payload, data), "Returned data does not match sent data") +} + +func TestMultipleSlaves(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + numSlaves := 10 + numCallsPerSlave := 100 + var wg sync.WaitGroup + wg.Add(numSlaves * numCallsPerSlave) + + for i := range numSlaves { + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 5) + if !assertions.NoError(err) { + t.Logf("Failed to create slave %d", i) + wg.Add(-numCallsPerSlave) + continue + } + defer slave.DeInitialize() + + go func(slaveIndex int) { + for j := range numCallsPerSlave { + payload := []byte("slave " + strconv.Itoa(slaveIndex) + " call " + strconv.Itoa(j)) + go func(p []byte) { + defer wg.Done() + data, err := slave.CallForBuffer("echo", p) + assert.NoError(t, err) + assert.Equal(t, p, data) + }(payload) + } + }(i) + } + wg.Wait() +} + +func TestUnknownRPC(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 1) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + data, err := slave.CallForBuffer("non_existent_rpc", nil) + + if !assertions.Error(err, "An error is expected for an unknown RPC") { + return + } + assertions.Nil(data, "Data should be nil when an error occurs") + assertions.ErrorContains(err, "unknown rpc method non_existent_rpc") +} + +func TestServerDiscardLogic(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 1) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + payload := make([]byte, 1024*1024) + data, err := slave.CallForBuffer("discard", payload) + if !assertions.NoError(err) { + return + } + assertions.Equal("discarded", string(data)) + + data, err = slave.CallForBuffer("ping", nil) + if !assertions.NoError(err, "The second call failed, server discard logic is broken") { + return + } + assertions.Equal("pong", string(data), "The second call returned wrong data") +} + +func TestServerRepliesWithServerError(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 1) + if !assertions.NoError(err) { + return + } + defer slave.DeInitialize() + + data, err := slave.CallForBuffer("force_error", nil) + + if !assertions.Error(err, "Expected an error from the server") { + return + } + assertions.Nil(data, "Data should be nil on error") + assertions.ErrorContains(err, "this is a forced server error") +} + +func TestDeInitialize(t *testing.T) { + assertions := assert.New(t) + _, port, teardown := setupMaster(t) + defer teardown() + + slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), port, 5) + if !assertions.NoError(err) { + return + } + + slave.DeInitialize() + + _, err = slave.CallForBuffer("ping", nil) + assertions.Error(err, "Call should fail after DeInitialize") +} diff --git a/go.mod b/go.mod index 9e6ebe8..f49eebc 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,11 @@ module github.com/AmanTrance/FastRpc go 1.24.6 + +require github.com/stretchr/testify v1.11.1 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c4c1710 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=