Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/go-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Go Tests

on:
push:
branches:
- main
- tests
workflow_dispatch:

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Check out code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

- name: Install dependencies
run: go get github.com/stretchr/testify/assert

- name: Run Go tests
run: go test -v ./...
176 changes: 176 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# FastRpc

[![Go Reference](https://pkg.go.dev/badge/github.com/AmanTrance/FastRpc.svg)](https://pkg.go.dev/github.com/AmanTrance/FastRpc)
[![Tests](https://github.com/AmanTrance/FastRpc/actions/workflows/go-test.yml/badge.svg)](https://github.com/AmanTrance/FastRpc/actions/workflows/go-test.yml)

**FastRpc** is a simple, high-performance, TCP-based binary RPC (Remote Procedure Call) framework for Go. It is designed for extremely low-latency, synchronous, server-to-server communication where a full-featured framework like gRPC is overkill and text-based protocols like JSON-RPC are too slow.

## Why FastRpc?

In many systems, you need a service to synchronously call another service and get a response *fast*. The problem is that many solutions are either too complex or too slow:

* **HTTP/JSON-RPC:** Easy to use, but the overhead of text parsing (JSON) and HTTP handshakes is high, making sub-10ms response times difficult.
* **gRPC:** Extremely fast and robust, but requires `.proto` files, code generation, and a more complex setup.
* **Polling/Queues (like RabbitMQ):** This is an *asynchronous* pattern. It's great for background jobs, but not for a synchronous "call-and-wait" request.

FastRpc solves the "call-and-wait" problem by providing:
1. **A Simple Binary Protocol:** No text parsing. It sends raw bytes over a custom header protocol.
2. **Persistent Connection Pooling:** Clients (Slaves) hold a pool of "on-hold" TCP connections. When you make a call, it uses an existing connection, skipping the slow TCP handshake.
3. **Lightweight API:** The entire framework is just a few Go files with a minimal, easy-to-understand API.

## Features

* **`RpcMaster` (Server):**
* Concurrently handles thousands of client connections, one goroutine per client.
* Simple `RegisterRPC` method to map a string name to a handler function.
* Graceful `Close()` method.
* **`RpcSlave` (Client):**
* Manages a pool of persistent TCP connections for high throughput.
* Simple `CallForBuffer` method that feels like a local function call.
* Automatically re-uses connections from the pool.
* **Safe & Robust:**
* Concurrent-safe server and client using `sync.Mutex`.
* Properly handles stream I/O using loops to ensure all bytes are read/written.
* Automatically discards unread data to prevent connection poisoning.

## Installation

```sh
go get github.com/AmanTrance/FastRpc
````

## Quick Start

Here is a complete, minimal example.

### 1\. Server (`server.go`)

```go
package main

import (
"context"
"log"
"net"

fastrpc "github.com/AmanTrance/FastRpc"
)

func main() {
master, _ := fastrpc.NewMaster()

// Register a "ping" RPC
master.RegisterRPC("ping", "ping-pong", "", "", func(i *fastrpc.IOOperator) error {
return i.WriteIOFromBuffer([]byte("pong"))
})

log.Println("Starting RPC server on :10000")
// Start blocks and listens for connections
master.Start(context.Background(), net.IPv4(127, 0, 0, 1), 10000)
}
```

### 2\. Client (`client.go`)

```go
package main

import (
"fmt"
"log"
"net"

fastrpc "github.com/AmanTrance/FastRpc"
)

func main() {
// Connect to the master, creating a pool of 5 connections
slave, err := fastrpc.NewSlave(net.IPv4(127, 0, 0, 1), 10000, 5)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer slave.DeInitialize()

// Call the "ping" RPC
data, err := slave.CallForBuffer("ping", nil)
if err != nil {
log.Fatalf("RPC call failed: %v", err)
}

fmt.Printf("Server responded: %s\n", string(data)) // Prints: Server responded: pong
}
```

## API Reference

### `RpcMaster`

```go
// Creates a new, ready-to-configure RpcMaster.
// Automatically registers a 'capabilities' RPC at ID 0.
func NewMaster() (*RpcMaster, error)

// Registers a new function to be called by clients.
func (r *RpcMaster) RegisterRPC(name string, description string, incomingType string, returningType string, rpc func(*IOOperator) error)

// Starts the server. This is a blocking call.
func (r *RpcMaster) Start(ctx context.Context, ip net.IP, port int) error

// Closes the master's listening socket, stopping it from accepting new connections.
func (r *RpcMaster) Close() error
```

### `RpcSlave`

```go
// Creates a new client and connects to the master, filling the connection pool.
func NewSlave(masterIP net.IP, masterPort int, poolSize int) (*RpcSlave, error)

// Gets the list of available RPCs from the master.
func (r *RpcSlave) GetMasterCapabilities() ([]MasterCapabilitiesDTO, error)

// Calls an RPC by name and waits for a response. This is the primary method.
func (r *RpcSlave) CallForBuffer(method string, buf []byte) ([]byte, error)

// Closes all connections in the pool and shuts down the slave.
func (r *RpcSlave) DeInitialize()
```

### `IOOperator`

(This is the object passed into your RPC handler function)

```go
// Returns the number of bytes the client sent that have not been read yet.
func (i *IOOperator) ReadDataLeft() uint64

// Reads a specific number of bytes from the client's request payload.
func (i *IOOperator) ReadIOStream(count int) ([]byte, error)

// Writes a byte slice as the response.
// This will send a 9-byte header followed by the data.
func (i *IOOperator) WriteIOFromBuffer(buf []byte) error

// Writes data from a reader (e.g., a file) as the response.
func (i *IOOperator) WriteIOFromReader(reader io.Reader, count int, chunkSize int) error

// Sends an empty, successful response.
func (i *IOOperator) WriteNothing() error

// Sends an error response. The client will receive this as an error.
func (i *IOOperator) WriteError(message string) error
```

## Running Tests

To run the full test suite, including concurrency and data integrity tests:

```sh
go test -v ./...
```

## License

This project is licensed under the **GNU General Public License v3.0**. See the [LICENSE](https://www.google.com/search?q=LICENSE) file for details.

```
7 changes: 6 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,18 @@ func (r *RpcSlave) CallForBuffer(method string, buf []byte) ([]byte, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

rpcID, ok := r.capabilitiesMap[method]
if !ok {
return nil, errors.New("unknown rpc method " + method)
}

connection := <-r.connectionPool
defer func() {
r.connectionPool <- connection
}()

var headersBuffer []byte = make([]byte, 12)
binary.BigEndian.PutUint32(headersBuffer[:4], r.capabilitiesMap[method])
binary.BigEndian.PutUint32(headersBuffer[:4], rpcID)
binary.BigEndian.PutUint64(headersBuffer[4:], uint64(len(buf)))
err := writeSpecifiedBytes(connection, headersBuffer, 12)
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions examples/client_concurrent/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"fmt"
"log"
"net"
"strconv"
"sync"
"time"

fastrpc "github.com/AmanTrance/FastRpc"
)

func main() {
ip := net.IPv4(127, 0, 0, 1)
port := 10000
poolSize := 20
numCalls := 1000

log.Printf("Connecting to master at %s:%d with pool size %d\n", ip, port, poolSize)
slave, err := fastrpc.NewSlave(ip, port, poolSize)
if err != nil {
log.Fatalf("Failed to create slave: %v", err)
}
defer slave.DeInitialize()

var wg sync.WaitGroup
wg.Add(numCalls)

log.Printf("Dispatching %d concurrent calls...\n", numCalls)
startTime := time.Now()

for i := range numCalls {
go func(callIndex int) {
defer wg.Done()

payload := []byte("Hello from call " + strconv.Itoa(callIndex))

data, err := slave.CallForBuffer("echo", payload)
if err != nil {
log.Printf("Call %d failed: %v\n", callIndex, err)
return
}

if string(data) != string(payload) {
log.Printf("Call %d: Data mismatch!\n", callIndex)
return
}
}(i)
}

wg.Wait()
duration := time.Since(startTime)

fmt.Println("\n--- All calls complete ---")
fmt.Printf("Total time: %v\n", duration)
fmt.Printf("Total calls: %d\n", numCalls)
fmt.Printf("Avg. calls/sec: %.2f\n", float64(numCalls)/duration.Seconds())
}
39 changes: 39 additions & 0 deletions examples/client_simple/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"fmt"
"log"
"net"

fastrpc "github.com/AmanTrance/FastRpc"
)

func main() {
ip := net.IPv4(127, 0, 0, 1)
port := 10000
poolSize := 2

log.Printf("Connecting to master at %s:%d with pool size %d\n", ip, port, poolSize)
slave, err := fastrpc.NewSlave(ip, port, poolSize)
if err != nil {
log.Fatalf("Failed to create slave: %v", err)
}
defer slave.DeInitialize()

log.Println("Calling RPC: 'ping'...")
data, err := slave.CallForBuffer("ping", nil)
if err != nil {
log.Fatalf("RPC call 'ping' failed: %v", err)
}

fmt.Printf("Server replied: %s\n", string(data))

log.Println("Calling RPC: 'echo'...")
payload := []byte("Hello from the simple client!")
data, err = slave.CallForBuffer("echo", payload)
if err != nil {
log.Fatalf("RPC call 'echo' failed: %v", err)
}

fmt.Printf("Server echoed: %s\n", string(data))
}
61 changes: 0 additions & 61 deletions examples/main.go

This file was deleted.

Loading