Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 3: Actor System

Actor Model Implementation

Core Actor Structure

pub const Actor = struct {
    // Identity
    id: ActorId,
    name: ?[]const u8,

    // Execution state
    state: ActorState,
    reductions: i32,

    // Memory (Tiger Style - fixed limits)
    mailbox: BoundedQueue(Message, MAX_MAILBOX_SIZE),
    heap: FixedAllocator(ACTOR_HEAP_SIZE),
    stack: [ACTOR_STACK_SIZE]u8 align(16),

    // Behavior
    behavior: *const fn (*Actor, Message) Error!void,

    // Supervision
    supervisor: ?ActorId,
    children: BoundedArray(ActorId, MAX_CHILDREN),
    restart_count: u32,

    // Monitoring/Links
    monitors: BoundedArray(Monitor, MAX_MONITORS),
    links: BoundedArray(ActorId, MAX_LINKS),

    pub const ActorState = enum {
        initializing,
        running,
        suspended,
        migrating,
        terminated,
    };
};

Message Handling

pub const MessageHandler = struct {
    // Pattern matching for messages
    pub fn handle(self: *Actor, msg: Message) Error!void {
        switch (msg.payload) {
            .compute => |compute| try self.handleCompute(compute),
            .data => |data| try self.handleData(data),
            .control => |control| try self.handleControl(control),
            .system => |system| try self.handleSystem(system),
        }

        // Update metrics
        self.messages_processed += 1;
    }

    // Selective receive pattern
    pub fn receive(
        self: *Actor,
        comptime pattern: []const MessageType,
        timeout: ?u64,
    ) !Message {
        const deadline = if (timeout) |t|
            std.time.nanoTimestamp() + t
        else
            null;

        while (true) {
            // Check mailbox for matching message
            var iter = self.mailbox.iterator();
            while (iter.next()) |msg| {
                if (matchesPattern(msg, pattern)) {
                    self.mailbox.remove(msg);
                    return msg;
                }
            }

            // Check timeout
            if (deadline) |d| {
                if (std.time.nanoTimestamp() > d) {
                    return error.Timeout;
                }
            }

            // Yield to scheduler
            try self.yield();
        }
    }
};

Actor Lifecycle

pub const ActorLifecycle = struct {
    // Actor creation
    pub fn spawn(
        system: *ActorSystem,
        behavior: ActorBehavior,
        options: SpawnOptions,
    ) !ActorId {
        // Allocate actor from pool
        const actor = try system.allocateActor();

        // Initialize actor
        actor.* = Actor{
            .id = system.generateId(),
            .behavior = behavior,
            .state = .initializing,
            .reductions = INITIAL_REDUCTIONS,
            .supervisor = options.supervisor,
            // ... other fields
        };

        // Register with system
        try system.registry.register(actor);

        // Add to run queue
        try system.scheduler.enqueue(actor);

        // Send init message
        try actor.send(.{ .system = .init });

        return actor.id;
    }

    // Actor termination
    pub fn terminate(
        self: *Actor,
        reason: TerminationReason,
    ) !void {
        // Notify linked actors
        for (self.links.items) |linked| {
            try linked.send(.{
                .system = .{ .exit = .{
                    .from = self.id,
                    .reason = reason,
                }},
            });
        }

        // Notify monitors
        for (self.monitors.items) |monitor| {
            try monitor.send(.{
                .system = .{ .down = .{
                    .actor = self.id,
                    .reason = reason,
                }},
            });
        }

        // Clean up resources
        self.heap.deinit();
        self.mailbox.deinit();

        // Update state
        self.state = .terminated;

        // Return to pool
        self.system.releaseActor(self);
    }
};

Scheduling

Work-Stealing Scheduler

pub const WorkStealingScheduler = struct {
    // Per-worker queues
    queues: [MAX_WORKERS]WorkQueue,

    // Global queue for overflow
    global_queue: WorkQueue,

    // Stealing strategy
    steal_strategy: StealStrategy = .random,

    pub fn schedule(self: *Scheduler, worker_id: u32) !*Actor {
        const queue = &self.queues[worker_id];

        // Fast path: local queue
        if (queue.pop()) |actor| {
            return actor;
        }

        // Try global queue
        if (self.global_queue.pop()) |actor| {
            return actor;
        }

        // Steal from other workers
        return self.steal(worker_id);
    }

    fn steal(self: *Scheduler, thief: u32) !*Actor {
        const victim = switch (self.steal_strategy) {
            .random => self.randomVictim(thief),
            .round_robin => self.nextVictim(thief),
            .least_loaded => self.leastLoadedVictim(),
        };

        return self.queues[victim].steal();
    }
};

Reduction Counting

pub const ReductionCounter = struct {
    // Cost of operations in reductions
    pub const Costs = struct {
        pub const message_send = 1;
        pub const message_receive = 1;
        pub const function_call = 1;
        pub const allocation = 10;
        pub const gpu_launch = 1000;
        pub const io_operation = 100;
    };

    // Track reductions
    pub fn consume(
        actor: *Actor,
        comptime operation: Operation,
    ) !void {
        const cost = switch (operation) {
            .message_send => Costs.message_send,
            .message_receive => Costs.message_receive,
            .function_call => Costs.function_call,
            .allocation => Costs.allocation,
            .gpu_launch => Costs.gpu_launch,
            .io_operation => Costs.io_operation,
        };

        actor.reductions -= cost;

        if (actor.reductions <= 0) {
            // Yield to scheduler
            try actor.yield();
            actor.reductions = INITIAL_REDUCTIONS;
        }
    }
};

Inter-Actor Communication

Message Passing

pub const MessagePassing = struct {
    // Send message to actor
    pub fn send(
        from: ActorId,
        to: ActorId,
        payload: MessagePayload,
    ) !void {
        const msg = Message{
            .id = generateMessageId(),
            .from = from,
            .to = to,
            .timestamp = std.time.nanoTimestamp(),
            .payload = payload,
        };

        // Local or remote?
        if (isLocal(to)) {
            const actor = registry.get(to);
            try actor.mailbox.push(msg);
        } else {
            try network.send(to.node, msg);
        }
    }

    // Broadcast to multiple actors
    pub fn broadcast(
        from: ActorId,
        recipients: []const ActorId,
        payload: MessagePayload,
    ) !void {
        // Group by node for efficiency
        var by_node = std.AutoHashMap(NodeId, ArrayList(ActorId)).init();
        defer by_node.deinit();

        for (recipients) |recipient| {
            const node = getNode(recipient);
            const list = try by_node.getOrPut(node);
            try list.append(recipient);
        }

        // Send batched messages per node
        var iter = by_node.iterator();
        while (iter.next()) |entry| {
            try network.sendBatch(entry.key, entry.value, payload);
        }
    }
};

Process Linking

pub const ProcessLinking = struct {
    // Link two actors
    pub fn link(a: ActorId, b: ActorId) !void {
        const actor_a = registry.get(a);
        const actor_b = registry.get(b);

        try actor_a.links.append(b);
        try actor_b.links.append(a);
    }

    // Monitor an actor
    pub fn monitor(
        monitor: ActorId,
        monitored: ActorId,
    ) !MonitorRef {
        const ref = MonitorRef{
            .monitor = monitor,
            .monitored = monitored,
            .ref = generateRef(),
        };

        const actor = registry.get(monitored);
        try actor.monitors.append(ref);

        return ref;
    }

    // Trap exit signals
    pub fn trapExit(actor: ActorId, enable: bool) !void {
        const a = registry.get(actor);
        a.trap_exit = enable;
    }
};

Actor Patterns

GenServer Pattern

pub const GenServer = struct {
    actor: Actor,
    state: *anyopaque,

    // Callbacks
    init: *const fn (*anyopaque) anyerror!void,
    handle_call: *const fn (*anyopaque, Message) anyerror!Response,
    handle_cast: *const fn (*anyopaque, Message) anyerror!void,
    handle_info: *const fn (*anyopaque, Message) anyerror!void,
    terminate: *const fn (*anyopaque, Reason) void,

    pub fn start(
        callbacks: Callbacks,
        initial_state: anytype,
    ) !ActorId {
        const server = GenServer{
            .state = initial_state,
            .init = callbacks.init,
            .handle_call = callbacks.handle_call,
            .handle_cast = callbacks.handle_cast,
            .handle_info = callbacks.handle_info,
            .terminate = callbacks.terminate,
        };

        return ActorSystem.spawn(genServerBehavior, .{
            .data = server,
        });
    }

    fn genServerBehavior(actor: *Actor, msg: Message) !void {
        const server = @ptrCast(*GenServer, actor.data);

        switch (msg.type) {
            .call => {
                const response = try server.handle_call(server.state, msg);
                try msg.from.reply(response);
            },
            .cast => {
                try server.handle_cast(server.state, msg);
            },
            .info => {
                try server.handle_info(server.state, msg);
            },
        }
    }
};

FSM Pattern

pub const FSM = struct {
    actor: Actor,
    state: State,
    data: *anyopaque,

    pub const State = enum {
        idle,
        processing,
        waiting,
        error_state,
    };

    pub const Transition = struct {
        from: State,
        event: Event,
        to: State,
        action: ?*const fn (*anyopaque) anyerror!void,
    };

    transitions: []const Transition,

    pub fn handleEvent(self: *FSM, event: Event) !void {
        for (self.transitions) |transition| {
            if (transition.from == self.state and
                transition.event == event) {

                if (transition.action) |action| {
                    try action(self.data);
                }

                self.state = transition.to;
                return;
            }
        }

        return error.InvalidTransition;
    }
};

Testing Actors

test "actor message passing" {
    var system = try ActorSystem.init(.{});
    defer system.deinit();

    const actor = try system.spawn(echoActor, .{});

    // Send message
    try actor.send(.{ .data = "hello" });

    // Verify response
    const response = try actor.call(.{ .data = "world" }, 1000);
    try testing.expectEqualStrings("world", response.data);
}

test "actor supervision" {
    var system = try ActorSystem.init(.{});
    defer system.deinit();

    // Create supervisor
    const supervisor = try system.spawn(supervisorActor, .{
        .restart_strategy = .one_for_one,
    });

    // Create child that crashes
    const child = try supervisor.startChild(crashingActor, .{});

    // Verify child restarts
    try child.send(.{ .crash = true });
    try testing.expect(child.isAlive());
}

test "actor fault injection" {
    var system = try ActorSystem.init(.{
        .fault_injection = .{
            .enabled = true,
            .failure_rate = 0.1,
        },
    });
    defer system.deinit();

    var success_count: u32 = 0;
    var failure_count: u32 = 0;

    // Run 1000 operations
    for (0..1000) |_| {
        const result = actor.send(.{ .compute = data });
        if (result) {
            success_count += 1;
        } else |err| {
            failure_count += 1;
            try testing.expect(err == error.InjectedFault);
        }
    }

    // Verify ~10% failure rate
    const failure_rate = @as(f32, failure_count) / 1000.0;
    try testing.expect(failure_rate > 0.08 and failure_rate < 0.12);
}