diff --git a/src/client.zig b/src/client.zig index 24d8fe1..81f84eb 100644 --- a/src/client.zig +++ b/src/client.zig @@ -19,6 +19,7 @@ const freeMessageList = @import("./client_mailbox.zig").freeMessageList; const Config = @import("./config.zig"); const resp = @import("./commands/resp.zig"); const ClientHandle = @import("./types.zig").ClientHandle; +const StackWriter = @import("stack_writer.zig").StackWriter; const log = std.log.scoped(.client); @@ -38,6 +39,7 @@ pub const Client = struct { disconnect_requested: *std.atomic.Value(bool), server: *Server, io: std.Io, + parse_buffer: [65536]u8 = undefined, pub fn init( allocator: std.mem.Allocator, @@ -84,9 +86,10 @@ pub const Client = struct { var sr = self.connection.reader(self.io, &reader_buffer); const reader = &sr.interface; - // Create per-command arena for parsing (will be freed after enqueueing) - // Use page_allocator directly as it's thread-safe (multiple clients parse concurrently) - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); + // Per-command arena for parsing. + // Uses a fixed buffer in the Client struct to avoid page_allocator syscalls. + var fba = std.heap.FixedBufferAllocator.init(self.parse_buffer[0..]); + var arena = std.heap.ArenaAllocator.init(fba.allocator()); defer arena.deinit(); while (true) { @@ -134,12 +137,25 @@ pub const Client = struct { while (!self.server.store_mutex.tryLock()) { std.atomic.spinLoopHint(); } - self.server.processCommandDirect(self, command.getArgs()); + var sw = StackWriter.init(self.allocator); + var response_writer = sw.writer(); + self.server.processCommandDirect(self, command.getArgs(), &response_writer); self.server.store_mutex.unlock(); command.deinit(); - self.flushMailbox() catch return; + // Write response directly to socket (no mailbox indirection) + const response = sw.slice(&response_writer); + if (response.len > 0) { + var writer_buffer: [1024 * 4]u8 = undefined; + var socket_writer = self.connection.writer(self.io, &writer_buffer); + try socket_writer.interface.writeAll(response); + try socket_writer.interface.flush(); + } + sw.deinit(); + + // Check mailbox for pubsub messages that arrived during command execution + try self.flushMailbox(); if (self.disconnect_requested.load(.acquire)) { log.debug("client {d}: disconnect after cmd", .{self.client_id}); @@ -190,7 +206,7 @@ pub const Client = struct { fn flushMailbox(self: *Client) !void { const head = self.mailbox.takeAll(); - defer freeMessageList(self.allocator, head); + defer self.mailbox.freeNodes(self.allocator, head); if (head == null) return; diff --git a/src/client_mailbox.zig b/src/client_mailbox.zig index 1750763..a2eb34f 100644 --- a/src/client_mailbox.zig +++ b/src/client_mailbox.zig @@ -15,6 +15,7 @@ pub const ClientMailbox = struct { tail: ?*MessageNode = null, pending_count: usize = 0, capacity: usize, + node_pool: [4]?*MessageNode = .{null} ** 4, pub fn init(capacity: usize) ClientMailbox { return .{ @@ -22,6 +23,39 @@ pub const ClientMailbox = struct { }; } + pub fn acquireNode(self: *ClientMailbox, allocator: Allocator, bytes: []u8) !*MessageNode { + for (&self.node_pool) |*slot| { + if (slot.*) |node| { + slot.* = null; + node.* = .{ .bytes = bytes, .next = null }; + return node; + } + } + const node = try allocator.create(MessageNode); + node.* = .{ .bytes = bytes, .next = null }; + return node; + } + + pub fn releaseNode(self: *ClientMailbox, allocator: Allocator, node: *MessageNode) void { + allocator.free(node.bytes); + for (&self.node_pool) |*slot| { + if (slot.* == null) { + slot.* = node; + return; + } + } + allocator.destroy(node); + } + + pub fn freeNodes(self: *ClientMailbox, allocator: Allocator, head: ?*MessageNode) void { + var current = head; + while (current) |node| { + const next = node.next; + self.releaseNode(allocator, node); + current = next; + } + } + pub fn open(self: *ClientMailbox) void { self.closed.store(false, .release); } @@ -86,6 +120,12 @@ pub const ClientMailbox = struct { pub fn deinit(self: *ClientMailbox, allocator: Allocator) void { self.close(); freeMessageList(allocator, self.takeAll()); + for (&self.node_pool) |*slot| { + if (slot.*) |node| { + allocator.destroy(node); + slot.* = null; + } + } } }; diff --git a/src/commands/list.zig b/src/commands/list.zig index 3c30b93..f1a2770 100644 --- a/src/commands/list.zig +++ b/src/commands/list.zig @@ -44,9 +44,8 @@ pub fn lpush(writer: *Writer, store: *Store, args: []const Value) !void { const list = try store.getSetList(key); for (args[2..]) |arg| { - // Duplicate string since arg.asSlice() points to temporary parser buffer - const value_str = try store.allocator.dupe(u8, arg.asSlice()); - try list.prepend(.{ .string = value_str }); + const pv = try PrimitiveValue.fromSlice(arg.asSlice(), store.allocator); + try list.prepend(pv); } try resp.writeInt(writer, list.len()); @@ -58,9 +57,8 @@ pub fn rpush(writer: *Writer, store: *Store, args: []const Value) !void { const list = try store.getSetList(key); for (args[2..]) |arg| { - // Duplicate string since arg.asSlice() points to temporary parser buffer - const value_str = try store.allocator.dupe(u8, arg.asSlice()); - try list.append(.{ .string = value_str }); + const pv = try PrimitiveValue.fromSlice(arg.asSlice(), store.allocator); + try list.append(pv); } try resp.writeInt(writer, list.len()); @@ -183,9 +181,9 @@ pub fn lset(writer: *Writer, store: *Store, args: []const Value) !void { return error.KeyNotFound; }; - // Duplicate string since value points to temporary parser buffer - const value_str = try store.allocator.dupe(u8, value); - try list.setByIndex(actual_index, .{ .string = value_str }); + // Set value inline if small enough, otherwise duplicate via KV allocator + const pv = try PrimitiveValue.fromSlice(value, store.allocator); + try list.setByIndex(actual_index, pv); try resp.writeOK(writer); } diff --git a/src/commands/resp.zig b/src/commands/resp.zig index d39e6e1..0aa3c75 100644 --- a/src/commands/resp.zig +++ b/src/commands/resp.zig @@ -107,6 +107,7 @@ pub fn writeDoubleBulkString(writer: *Writer, value: f64) !void { pub fn writePrimitiveValue(writer: *Writer, value: PrimitiveValue) !void { switch (value) { .string => |str| try writeBulkString(writer, str), + .short_string => |ss| try writeBulkString(writer, ss.asSlice()), .int => |i| try writeInt(writer, i), } } diff --git a/src/list.zig b/src/list.zig index cb88fbc..dc2b932 100644 --- a/src/list.zig +++ b/src/list.zig @@ -22,10 +22,9 @@ pub const ZedisList = struct { while (current) |node| { current = node.next; const list_node: *ZedisListNode = @fieldParentPtr("node", node); - // Free string data if present switch (list_node.data) { .string => |str| self.allocator.free(str), - .int => {}, + .int, .short_string => {}, } self.allocator.destroy(list_node); } @@ -137,10 +136,9 @@ pub const ZedisList = struct { pub fn setByIndex(self: *ZedisList, index: usize, value: PrimitiveValue) !void { const list_node = self.getNodeAt(index) orelse return error.KeyNotFound; - // Free old string value if present switch (list_node.data) { .string => |str| self.allocator.free(str), - .int => {}, + .int, .short_string => {}, } list_node.data = value; } diff --git a/src/rdb/zdb.zig b/src/rdb/zdb.zig index 17ec4a2..043a0a3 100644 --- a/src/rdb/zdb.zig +++ b/src/rdb/zdb.zig @@ -221,6 +221,7 @@ pub const Writer = struct { switch (value) { .int => |i| try self.writeIntValue(i), .string => |s| try self.writeString(s), + .short_string => |ss| try self.writeString(ss.asSlice()), } current = node.next; diff --git a/src/server.zig b/src/server.zig index d9ae807..f82bba5 100644 --- a/src/server.zig +++ b/src/server.zig @@ -16,6 +16,7 @@ const aof = @import("./aof/aof.zig"); const Clock = @import("clock.zig"); const ClientHandle = @import("types.zig").ClientHandle; const invalid_client_slot_index = @import("types.zig").invalid_client_slot_index; +const StackWriter = @import("stack_writer.zig").StackWriter; const Io = std.Io; const Stream = Io.net.Stream; @@ -354,45 +355,25 @@ fn storeThreadLoop(server: *Server) void { } fn processCommand(self: *Server, node: *CommandNode) void { - self.processCommandDirect(node.client, node.args); -} - -pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value) void { - var alloc_writer = std.Io.Writer.Allocating.init(self.base_allocator); - defer alloc_writer.deinit(); + var sw = StackWriter.init(self.base_allocator); + defer sw.deinit(); + var response_writer = sw.writer(); + self.processCommandDirect(node.client, node.args, &response_writer); - self.registry.executeCommand( - &alloc_writer.writer, - client, - client.getCurrentStore(), - &self.aof_writer, - args, - ) catch |err| { - log.err("Command execution failed: {s}", .{@errorName(err)}); - }; - - const response = alloc_writer.toOwnedSlice() catch return; + const response = sw.slice(&response_writer); if (response.len == 0) return; - // `response` is now owned by us; we will transfer it to the mailbox node. - - const msg_node = self.base_allocator.create(MessageNode) catch { - self.base_allocator.free(response); - return; - }; - msg_node.* = .{ - .bytes = response, - .next = null, - }; + const client = node.client; const client_slot = client.slot_handle; - if (client_slot.slot_index >= self.client_slots.len) { - log.warn("store thread: stale slot_index={d}", .{client_slot.slot_index}); - self.base_allocator.free(msg_node.bytes); - self.base_allocator.destroy(msg_node); - return; - } + if (client_slot.slot_index >= self.client_slots.len) return; const slot = &self.client_slots[client_slot.slot_index]; + const owned = self.base_allocator.dupe(u8, response) catch return; + const msg_node = slot.mailbox.acquireNode(self.base_allocator, owned) catch { + self.base_allocator.free(owned); + return; + }; + slot.mailbox.lockAtomic(); defer slot.mailbox.unlockAtomic(); @@ -400,14 +381,11 @@ pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value) slot.generation.load(.acquire) == client.slot_handle.generation and !slot.mailbox.closed.load(.acquire); if (!is_active) { - log.warn("store thread: slot inactive for client_id={d}", .{client.client_id}); - self.base_allocator.free(msg_node.bytes); - self.base_allocator.destroy(msg_node); + slot.mailbox.releaseNode(self.base_allocator, msg_node); return; } if (slot.mailbox.pending_count >= slot.mailbox.capacity) { - self.base_allocator.free(msg_node.bytes); - self.base_allocator.destroy(msg_node); + slot.mailbox.releaseNode(self.base_allocator, msg_node); return; } @@ -420,6 +398,18 @@ pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value) slot.mailbox.pending_count += 1; } +pub fn processCommandDirect(self: *Server, client: *Client, args: []const Value, response_writer: *std.Io.Writer) void { + self.registry.executeCommand( + response_writer, + client, + client.getCurrentStore(), + &self.aof_writer, + args, + ) catch |err| { + log.err("Command execution failed: {s}", .{@errorName(err)}); + }; +} + // The main server loop. It waits for incoming connections and // handles each client (one thread per connection). pub fn listen(self: *Server) !void { @@ -555,11 +545,12 @@ fn enqueueToHandle(self: *Server, handle: ClientHandle, payload: []const u8) !vo if (handle.slot_index >= self.client_slots.len) return error.StaleHandle; const slot = &self.client_slots[handle.slot_index]; - const node = try self.createMessageNode(payload); - errdefer { - self.base_allocator.free(node.bytes); - self.base_allocator.destroy(node); - } + const owned = try self.base_allocator.dupe(u8, payload); + const node = slot.mailbox.acquireNode(self.base_allocator, owned) catch |err| { + self.base_allocator.free(owned); + return err; + }; + errdefer slot.mailbox.releaseNode(self.base_allocator, node); slot.mailbox.lockAtomic(); defer slot.mailbox.unlockAtomic(); diff --git a/src/stack_writer.zig b/src/stack_writer.zig new file mode 100644 index 0000000..b557da6 --- /dev/null +++ b/src/stack_writer.zig @@ -0,0 +1,83 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; +const Writer = std.Io.Writer; + +const default_capacity = 16384; + +pub const StackWriter = struct { + buf: [default_capacity]u8 = undefined, + overflow: ?std.ArrayListUnmanaged(u8) = null, + heap_allocator: Allocator, + + pub fn init(heap_allocator: Allocator) StackWriter { + return .{ .heap_allocator = heap_allocator }; + } + + pub fn writer(self: *StackWriter) Writer { + return .{ + .vtable = &.{ .drain = drain }, + .buffer = &self.buf, + }; + } + + pub fn deinit(self: *StackWriter) void { + if (self.overflow) |*list| { + list.deinit(self.heap_allocator); + } + } + + fn drain(w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize { + const self: *StackWriter = @ptrCast(@alignCast(w.buffer.ptr)); + + const existing = Writer.buffered(w); + const new_total = Writer.countSplat(data, splat); + + if (self.overflow == null) { + var list: std.ArrayListUnmanaged(u8) = .empty; + list.ensureTotalCapacity(self.heap_allocator, existing.len + new_total) catch return error.WriteFailed; + if (existing.len > 0) list.appendSliceAssumeCapacity(existing); + for (data[0 .. data.len - 1]) |buf| { + list.appendSliceAssumeCapacity(buf); + } + const pattern = data[data.len - 1]; + for (0..splat) |_| { + list.appendSliceAssumeCapacity(pattern); + } + self.overflow = list; + w.buffer = list.items; + w.end = list.items.len; + return new_total; + } + + var list = &self.overflow.?; + list.ensureTotalCapacity(self.heap_allocator, list.items.len + new_total) catch return error.WriteFailed; + for (data[0 .. data.len - 1]) |buf| { + list.appendSliceAssumeCapacity(buf); + } + const pattern = data[data.len - 1]; + for (0..splat) |_| { + list.appendSliceAssumeCapacity(pattern); + } + w.buffer = list.items; + w.end = list.items.len; + return new_total; + } + + pub fn slice(self: *const StackWriter, w: *const Writer) []const u8 { + if (self.overflow) |*list| { + return list.items; + } + return Writer.buffered(w); + } + + pub fn toOwnedSlice(self: *StackWriter, w: *const Writer) ![]u8 { + if (self.overflow) |*list| { + const result = try list.toOwnedSlice(self.heap_allocator); + self.overflow = null; + return result; + } + const buffered = Writer.buffered(w); + if (buffered.len == 0) return buffered; + return try self.heap_allocator.dupe(u8, buffered); + } +}; diff --git a/src/store.zig b/src/store.zig index 5b48945..553804b 100644 --- a/src/store.zig +++ b/src/store.zig @@ -33,22 +33,8 @@ pub const ValueType = enum(u8) { } }; -pub const ShortString = struct { - data: [23]u8, - len: u8, - - pub fn fromSlice(str: []const u8) ShortString { - assert(str.len <= 23); - var ss: ShortString = .{ .data = undefined, .len = @intCast(str.len) }; - @memcpy(ss.data[0..str.len], str); - @memset(ss.data[str.len..], 0); - return ss; - } - - pub fn asSlice(self: *const ShortString) []const u8 { - return self.data[0..self.len]; - } -}; +pub const ShortString = @import("types.zig").ShortString; +pub const PrimitiveValue = @import("types.zig").PrimitiveValue; pub const ZedisValue = union(ValueType) { string: []const u8, diff --git a/src/types.zig b/src/types.zig index b6ae2d4..5c07a53 100644 --- a/src/types.zig +++ b/src/types.zig @@ -1,8 +1,35 @@ const std = @import("std"); +const assert = std.debug.assert; + +pub const ShortString = struct { + data: [23]u8, + len: u8, + + pub fn fromSlice(str: []const u8) ShortString { + assert(str.len <= 23); + var ss: ShortString = .{ .data = undefined, .len = @intCast(str.len) }; + @memcpy(ss.data[0..str.len], str); + @memset(ss.data[str.len..], 0); + return ss; + } + + pub fn asSlice(self: *const ShortString) []const u8 { + return self.data[0..self.len]; + } +}; pub const PrimitiveValue = union(enum) { string: []const u8, + short_string: ShortString, int: i64, + + pub fn fromSlice(str: []const u8, allocator: std.mem.Allocator) !PrimitiveValue { + if (str.len <= 23) { + return .{ .short_string = ShortString.fromSlice(str) }; + } + const duped = try allocator.dupe(u8, str); + return .{ .string = duped }; + } }; pub const invalid_client_slot_index: u32 = std.math.maxInt(u32);