Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@ pub fn build(b: *std.Build) void {
check_step.dependOn(&check_exe.step);

// --- Tests ---
const test_filters = b.option([][]const u8, "test-filter", "Filter tests (e.g. -Dtest-filter=string)") orelse &[0][]const u8{};

const test_mod = b.createModule(.{
.root_source_file = b.path("src/unit_tests.zig"),
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
.optimize = .Debug,
});
test_mod.addImport("zio", zio_mod);

const unit_tests = b.addTest(.{
.name = "test-unit",
.root_module = test_mod,
.filters = b.args orelse &.{},
.filters = test_filters,
});

const run_unit_tests = b.addRunArtifact(unit_tests);
Expand Down
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
// internet connectivity.
.dependencies = .{
.zio = .{
.url = "git+https://github.com/lalinsky/zio?ref=v0.12.0#9f622147033da64b5d51402a1f06fe3e640f4f5c",
.hash = "zio-0.12.0-xHbVVD79GwD7y4FMkB5DroWGWxCELLiOPM3G-tuLGa8C",
.url = "git+https://github.com/lalinsky/zio?ref=v0.14.0#aa860381beb42bc077e24ef304827cbdc23fcef9",
.hash = "zio-0.14.0-xHbVVPxFHgBCop4CvfF4u360kzWlHD4FS8Ud-RIkPcZ7",
},
},
.paths = .{
Expand Down
108 changes: 81 additions & 27 deletions src/aof/aof.zig
Original file line number Diff line number Diff line change
@@ -1,66 +1,111 @@
const std = @import("std");
const Server = @import("../server.zig");
const Parser = @import("../parser.zig");
const Command = @import("../parser.zig").Command;
const Registry = @import("../commands/registry.zig").CommandRegistry;
const Client = @import("../client.zig").Client;
const Store = @import("../store.zig").Store;
const resp = @import("../commands/resp.zig");
const Value = @import("../parser.zig").Value;
const Io = std.Io;
const File = Io.File;
const Dir = Io.Dir;
const builtin = @import("builtin");
const posix = std.posix;
const Clock = @import("../clock.zig");

const DEFAULT_NAME = "test.aof";

// TODO: AOF Rewrite
// Get the state of the store at the time of rewrite, and create
// the necessary commands to replicate it.
const Allocator = std.mem.Allocator;

pub const Writer = struct {
enabled: bool,
file_writer: ?Io.File.Writer,
write_buffer: ?[]u8,
io: Io,
filename: []const u8,
allocator: Allocator,
fsync_policy: FsyncPolicy,
last_fsync_ms: i64 = 0,

pub const FsyncPolicy = enum { always, everysec, no };

// take path when ready to
pub fn init(io: Io, enabled: bool) !Writer {
var fw: File.Writer = undefined;
pub fn init(alloc: Allocator, io: Io, enabled: bool, filename: []const u8, work_dir: []const u8, buffer_size: usize, fsync_policy: FsyncPolicy) !Writer {
var file_writer: ?Io.File.Writer = null;
var write_buffer: ?[]u8 = null;
if (enabled) {
const file = Dir.cwd().openFile(io, DEFAULT_NAME, .{ .mode = .write_only }) catch
try Dir.cwd().createFile(io, DEFAULT_NAME, .{});
fw = file.writer(io, &.{});
Dir.cwd().createDir(io, work_dir, .default_dir) catch |err| switch (err) {
error.PathAlreadyExists => {},
else => return err,
};
const dir = try Dir.cwd().openDir(io, work_dir, .{});

const file = dir.openFile(io, filename, .{ .mode = .write_only }) catch
try dir.createFile(io, filename, .{});
const buf = try alloc.alloc(u8, buffer_size);
errdefer alloc.free(buf);
var fw = file.writer(io, buf);
const length = try file.length(io);
try fw.seekTo(length);
file_writer = fw;
write_buffer = buf;
}
return .{
.enabled = enabled,
.file_writer = if (enabled) fw else null,
.file_writer = file_writer,
.write_buffer = write_buffer,
.io = io,
.filename = filename,
.allocator = alloc,
.fsync_policy = fsync_policy,
};
}

pub fn deinit(self: *Writer, io: Io) void {
pub fn deinit(self: *Writer) void {
if (self.write_buffer) |buf| {
self.allocator.free(buf);
}
if (self.file_writer) |fw| {
fw.file.close(io);
fw.file.close(self.io);
}
}

/// Encode command args as RESP and write to the AOF file.
/// Called outside the store_mutex critical section — zio makes file I/O async,
/// suspending the coroutine instead of blocking the OS thread.
pub fn writeCommand(self: *Writer, args: []const Value) void {
if (!self.enabled) return;
resp.writeListLen(self.writer(), args.len) catch return;
for (args) |arg| {
resp.writeBulkString(self.writer(), arg.asSlice()) catch return;
}

switch (self.fsync_policy) {
.always => {
_ = self.file_writer.?.interface.flush() catch {};
self.file_writer.?.file.sync(self.io) catch {};
},
.everysec => {
const now_ms = Io.Timestamp.now(self.io, .awake).toMilliseconds();
if (now_ms - self.last_fsync_ms >= 1000) {
_ = self.file_writer.?.interface.flush() catch {};
self.file_writer.?.file.sync(self.io) catch {};
self.last_fsync_ms = now_ms;
}
},
.no => {},
}
}

// only to be called if enabled
pub fn writer(self: *Writer) *Io.Writer {
return &self.file_writer.?.interface;
}
};

pub const Reader = struct {
file_reader: Io.File.Reader,
allocator: std.mem.Allocator,
allocator: Allocator,
store: *Store,
registry: *Registry,
reader_buffer: [8192]u8 = undefined,
io: Io,

// take path when ready to
pub fn init(allocator: std.mem.Allocator, store: *Store, registry: *Registry, io: Io) !Reader {
const file = try Dir.cwd().openFile(io, DEFAULT_NAME, .{ .mode = .read_only });
pub fn init(allocator: Allocator, store: *Store, registry: *Registry, io: Io, work_dir: []const u8, filename: []const u8) !Reader {
const dir = try Dir.cwd().openDir(io, work_dir, .{});
const file = try dir.openFile(io, filename, .{ .mode = .read_only });
var result = Reader{
.file_reader = undefined,
.allocator = allocator,
Expand Down Expand Up @@ -150,16 +195,25 @@ test "aof writing test" {
var dummy_client: Client = undefined;
dummy_client.authenticated = true;

// Execute command (mutates store, no AOF write)
try registry.executeCommand(&discarding.writer, &dummy_client, &store, cmd.getArgs());
try testing.expect(std.mem.eql(u8, store.get("t").?.value.short_string.asSlice(), "test"));

// Write AOF separately (simulating what server.writeAof does)
var aof_writer: Writer = undefined;
aof_writer.file_writer = test_file.writer(testing.io, &.{});
aof_writer.enabled = true;
aof_writer.file_writer = test_file.writer(testing.io, &.{});
aof_writer.io = testing.io;
aof_writer.filename = &.{};
aof_writer.allocator = testing.allocator;
aof_writer.write_buffer = null;
aof_writer.writeCommand(cmd.getArgs());

try registry.executeCommand(&discarding.writer, &dummy_client, &store, &aof_writer, cmd.getArgs());
_ = aof_writer.file_writer.?.interface.flush() catch {};

var file_reader_buffer: [8192]u8 = undefined;
var file_reader = test_file.reader(testing.io, &file_reader_buffer);

try testing.expect(std.mem.eql(u8, store.get("t").?.value.short_string.asSlice(), "test"));
const buf = try testing.allocator.alloc(u8, 1024);
defer testing.allocator.free(buf);
file_reader.interface.readSliceAll(buf) catch |e| {
Expand Down
4 changes: 4 additions & 0 deletions src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ pub const Client = struct {
self.server.processCommandDirect(self, command.getArgs(), &response_writer);
self.server.store_mutex.unlock();

// AOF write after mutex release: zio makes file I/O async,
// suspending this coroutine instead of blocking the OS thread.
self.server.writeAof(command.getArgSlice(0) orelse "", command.getArgs());

command.deinit();

// Write response directly to socket (no mailbox indirection)
Expand Down
Loading
Loading