Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const freeMessageList = @import("./client_mailbox.zig").freeMessageList;
const Config = @import("./config.zig");
const resp = @import("./commands/resp.zig");
const ClientHandle = @import("./types.zig").ClientHandle;
const StackWriter = @import("stack_writer.zig").StackWriter;

const log = std.log.scoped(.client);

Expand All @@ -38,6 +39,7 @@ pub const Client = struct {
disconnect_requested: *std.atomic.Value(bool),
server: *Server,
io: std.Io,
parse_buffer: [65536]u8 = undefined,

pub fn init(
allocator: std.mem.Allocator,
Expand Down Expand Up @@ -84,9 +86,10 @@ pub const Client = struct {
var sr = self.connection.reader(self.io, &reader_buffer);
const reader = &sr.interface;

// Create per-command arena for parsing (will be freed after enqueueing)
// Use page_allocator directly as it's thread-safe (multiple clients parse concurrently)
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
// Per-command arena for parsing.
// Uses a fixed buffer in the Client struct to avoid page_allocator syscalls.
var fba = std.heap.FixedBufferAllocator.init(self.parse_buffer[0..]);
var arena = std.heap.ArenaAllocator.init(fba.allocator());
defer arena.deinit();

while (true) {
Expand Down Expand Up @@ -134,12 +137,25 @@ pub const Client = struct {
while (!self.server.store_mutex.tryLock()) {
std.atomic.spinLoopHint();
}
self.server.processCommandDirect(self, command.getArgs());
var sw = StackWriter.init(self.allocator);
var response_writer = sw.writer();
self.server.processCommandDirect(self, command.getArgs(), &response_writer);
self.server.store_mutex.unlock();

command.deinit();

self.flushMailbox() catch return;
// Write response directly to socket (no mailbox indirection)
const response = sw.slice(&response_writer);
if (response.len > 0) {
var writer_buffer: [1024 * 4]u8 = undefined;
var socket_writer = self.connection.writer(self.io, &writer_buffer);
try socket_writer.interface.writeAll(response);
try socket_writer.interface.flush();
}
sw.deinit();

// Check mailbox for pubsub messages that arrived during command execution
try self.flushMailbox();

if (self.disconnect_requested.load(.acquire)) {
log.debug("client {d}: disconnect after cmd", .{self.client_id});
Expand Down Expand Up @@ -190,7 +206,7 @@ pub const Client = struct {

fn flushMailbox(self: *Client) !void {
const head = self.mailbox.takeAll();
defer freeMessageList(self.allocator, head);
defer self.mailbox.freeNodes(self.allocator, head);

if (head == null) return;

Expand Down
40 changes: 40 additions & 0 deletions src/client_mailbox.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,47 @@ pub const ClientMailbox = struct {
tail: ?*MessageNode = null,
pending_count: usize = 0,
capacity: usize,
node_pool: [4]?*MessageNode = .{null} ** 4,

pub fn init(capacity: usize) ClientMailbox {
return .{
.capacity = capacity,
};
}

pub fn acquireNode(self: *ClientMailbox, allocator: Allocator, bytes: []u8) !*MessageNode {
for (&self.node_pool) |*slot| {
if (slot.*) |node| {
slot.* = null;
node.* = .{ .bytes = bytes, .next = null };
return node;
}
}
const node = try allocator.create(MessageNode);
node.* = .{ .bytes = bytes, .next = null };
return node;
}

pub fn releaseNode(self: *ClientMailbox, allocator: Allocator, node: *MessageNode) void {
allocator.free(node.bytes);
for (&self.node_pool) |*slot| {
if (slot.* == null) {
slot.* = node;
return;
}
}
allocator.destroy(node);
}

pub fn freeNodes(self: *ClientMailbox, allocator: Allocator, head: ?*MessageNode) void {
var current = head;
while (current) |node| {
const next = node.next;
self.releaseNode(allocator, node);
current = next;
}
}

pub fn open(self: *ClientMailbox) void {
self.closed.store(false, .release);
}
Expand Down Expand Up @@ -86,6 +120,12 @@ pub const ClientMailbox = struct {
pub fn deinit(self: *ClientMailbox, allocator: Allocator) void {
self.close();
freeMessageList(allocator, self.takeAll());
for (&self.node_pool) |*slot| {
if (slot.*) |node| {
allocator.destroy(node);
slot.* = null;
}
}
}
};

Expand Down
16 changes: 7 additions & 9 deletions src/commands/list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ pub fn lpush(writer: *Writer, store: *Store, args: []const Value) !void {
const list = try store.getSetList(key);

for (args[2..]) |arg| {
// Duplicate string since arg.asSlice() points to temporary parser buffer
const value_str = try store.allocator.dupe(u8, arg.asSlice());
try list.prepend(.{ .string = value_str });
const pv = try PrimitiveValue.fromSlice(arg.asSlice(), store.allocator);
try list.prepend(pv);
}

try resp.writeInt(writer, list.len());
Expand All @@ -58,9 +57,8 @@ pub fn rpush(writer: *Writer, store: *Store, args: []const Value) !void {
const list = try store.getSetList(key);

for (args[2..]) |arg| {
// Duplicate string since arg.asSlice() points to temporary parser buffer
const value_str = try store.allocator.dupe(u8, arg.asSlice());
try list.append(.{ .string = value_str });
const pv = try PrimitiveValue.fromSlice(arg.asSlice(), store.allocator);
try list.append(pv);
}

try resp.writeInt(writer, list.len());
Expand Down Expand Up @@ -183,9 +181,9 @@ pub fn lset(writer: *Writer, store: *Store, args: []const Value) !void {
return error.KeyNotFound;
};

// Duplicate string since value points to temporary parser buffer
const value_str = try store.allocator.dupe(u8, value);
try list.setByIndex(actual_index, .{ .string = value_str });
// Set value inline if small enough, otherwise duplicate via KV allocator
const pv = try PrimitiveValue.fromSlice(value, store.allocator);
try list.setByIndex(actual_index, pv);

try resp.writeOK(writer);
}
Expand Down
1 change: 1 addition & 0 deletions src/commands/resp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub fn writeDoubleBulkString(writer: *Writer, value: f64) !void {
pub fn writePrimitiveValue(writer: *Writer, value: PrimitiveValue) !void {
switch (value) {
.string => |str| try writeBulkString(writer, str),
.short_string => |ss| try writeBulkString(writer, ss.asSlice()),
.int => |i| try writeInt(writer, i),
}
}
6 changes: 2 additions & 4 deletions src/list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ pub const ZedisList = struct {
while (current) |node| {
current = node.next;
const list_node: *ZedisListNode = @fieldParentPtr("node", node);
// Free string data if present
switch (list_node.data) {
.string => |str| self.allocator.free(str),
.int => {},
.int, .short_string => {},
}
self.allocator.destroy(list_node);
}
Expand Down Expand Up @@ -137,10 +136,9 @@ pub const ZedisList = struct {

pub fn setByIndex(self: *ZedisList, index: usize, value: PrimitiveValue) !void {
const list_node = self.getNodeAt(index) orelse return error.KeyNotFound;
// Free old string value if present
switch (list_node.data) {
.string => |str| self.allocator.free(str),
.int => {},
.int, .short_string => {},
}
list_node.data = value;
}
Expand Down
1 change: 1 addition & 0 deletions src/rdb/zdb.zig
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ pub const Writer = struct {
switch (value) {
.int => |i| try self.writeIntValue(i),
.string => |s| try self.writeString(s),
.short_string => |ss| try self.writeString(ss.asSlice()),
}

current = node.next;
Expand Down
77 changes: 34 additions & 43 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const aof = @import("./aof/aof.zig");
const Clock = @import("clock.zig");
const ClientHandle = @import("types.zig").ClientHandle;
const invalid_client_slot_index = @import("types.zig").invalid_client_slot_index;
const StackWriter = @import("stack_writer.zig").StackWriter;
const Io = std.Io;
const Stream = Io.net.Stream;

Expand Down Expand Up @@ -354,60 +355,37 @@ fn storeThreadLoop(server: *Server) void {
}

fn processCommand(self: *Server, node: *CommandNode) void {
self.processCommandDirect(node.client, node.args);
}

pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value) void {
var alloc_writer = std.Io.Writer.Allocating.init(self.base_allocator);
defer alloc_writer.deinit();
var sw = StackWriter.init(self.base_allocator);
defer sw.deinit();
var response_writer = sw.writer();
self.processCommandDirect(node.client, node.args, &response_writer);

self.registry.executeCommand(
&alloc_writer.writer,
client,
client.getCurrentStore(),
&self.aof_writer,
args,
) catch |err| {
log.err("Command execution failed: {s}", .{@errorName(err)});
};

const response = alloc_writer.toOwnedSlice() catch return;
const response = sw.slice(&response_writer);
if (response.len == 0) return;
// `response` is now owned by us; we will transfer it to the mailbox node.

const msg_node = self.base_allocator.create(MessageNode) catch {
self.base_allocator.free(response);
return;
};
msg_node.* = .{
.bytes = response,
.next = null,
};

const client = node.client;
const client_slot = client.slot_handle;
if (client_slot.slot_index >= self.client_slots.len) {
log.warn("store thread: stale slot_index={d}", .{client_slot.slot_index});
self.base_allocator.free(msg_node.bytes);
self.base_allocator.destroy(msg_node);
return;
}
if (client_slot.slot_index >= self.client_slots.len) return;
const slot = &self.client_slots[client_slot.slot_index];

const owned = self.base_allocator.dupe(u8, response) catch return;
const msg_node = slot.mailbox.acquireNode(self.base_allocator, owned) catch {
self.base_allocator.free(owned);
return;
};

slot.mailbox.lockAtomic();
defer slot.mailbox.unlockAtomic();

const is_active = slot.state.load(.acquire) == .active and
slot.generation.load(.acquire) == client.slot_handle.generation and
!slot.mailbox.closed.load(.acquire);
if (!is_active) {
log.warn("store thread: slot inactive for client_id={d}", .{client.client_id});
self.base_allocator.free(msg_node.bytes);
self.base_allocator.destroy(msg_node);
slot.mailbox.releaseNode(self.base_allocator, msg_node);
return;
}
if (slot.mailbox.pending_count >= slot.mailbox.capacity) {
self.base_allocator.free(msg_node.bytes);
self.base_allocator.destroy(msg_node);
slot.mailbox.releaseNode(self.base_allocator, msg_node);
return;
}

Expand All @@ -420,6 +398,18 @@ pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value)
slot.mailbox.pending_count += 1;
}

pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value, response_writer: *std.Io.Writer) void {
self.registry.executeCommand(
response_writer,
client,
client.getCurrentStore(),
&self.aof_writer,
args,
) catch |err| {
log.err("Command execution failed: {s}", .{@errorName(err)});
};
}

// The main server loop. It waits for incoming connections and
// handles each client (one thread per connection).
pub fn listen(self: *Server) !void {
Expand Down Expand Up @@ -555,11 +545,12 @@ fn enqueueToHandle(self: *Server, handle: ClientHandle, payload: []const u8) !vo
if (handle.slot_index >= self.client_slots.len) return error.StaleHandle;

const slot = &self.client_slots[handle.slot_index];
const node = try self.createMessageNode(payload);
errdefer {
self.base_allocator.free(node.bytes);
self.base_allocator.destroy(node);
}
const owned = try self.base_allocator.dupe(u8, payload);
const node = slot.mailbox.acquireNode(self.base_allocator, owned) catch |err| {
self.base_allocator.free(owned);
return err;
};
errdefer slot.mailbox.releaseNode(self.base_allocator, node);

slot.mailbox.lockAtomic();
defer slot.mailbox.unlockAtomic();
Expand Down
Loading
Loading