pub const Actor = struct {
// Identity
id: ActorId,
name: ?[]const u8,
// Execution state
state: ActorState,
reductions: i32,
// Memory (Tiger Style - fixed limits)
mailbox: BoundedQueue(Message, MAX_MAILBOX_SIZE),
heap: FixedAllocator(ACTOR_HEAP_SIZE),
stack: [ACTOR_STACK_SIZE]u8 align(16),
// Behavior
behavior: *const fn (*Actor, Message) Error!void,
// Supervision
supervisor: ?ActorId,
children: BoundedArray(ActorId, MAX_CHILDREN),
restart_count: u32,
// Monitoring/Links
monitors: BoundedArray(Monitor, MAX_MONITORS),
links: BoundedArray(ActorId, MAX_LINKS),
pub const ActorState = enum {
initializing,
running,
suspended,
migrating,
terminated,
};
};
pub const MessageHandler = struct {
// Pattern matching for messages
pub fn handle(self: *Actor, msg: Message) Error!void {
switch (msg.payload) {
.compute => |compute| try self.handleCompute(compute),
.data => |data| try self.handleData(data),
.control => |control| try self.handleControl(control),
.system => |system| try self.handleSystem(system),
}
// Update metrics
self.messages_processed += 1;
}
// Selective receive pattern
pub fn receive(
self: *Actor,
comptime pattern: []const MessageType,
timeout: ?u64,
) !Message {
const deadline = if (timeout) |t|
std.time.nanoTimestamp() + t
else
null;
while (true) {
// Check mailbox for matching message
var iter = self.mailbox.iterator();
while (iter.next()) |msg| {
if (matchesPattern(msg, pattern)) {
self.mailbox.remove(msg);
return msg;
}
}
// Check timeout
if (deadline) |d| {
if (std.time.nanoTimestamp() > d) {
return error.Timeout;
}
}
// Yield to scheduler
try self.yield();
}
}
};
pub const ActorLifecycle = struct {
// Actor creation
pub fn spawn(
system: *ActorSystem,
behavior: ActorBehavior,
options: SpawnOptions,
) !ActorId {
// Allocate actor from pool
const actor = try system.allocateActor();
// Initialize actor
actor.* = Actor{
.id = system.generateId(),
.behavior = behavior,
.state = .initializing,
.reductions = INITIAL_REDUCTIONS,
.supervisor = options.supervisor,
// ... other fields
};
// Register with system
try system.registry.register(actor);
// Add to run queue
try system.scheduler.enqueue(actor);
// Send init message
try actor.send(.{ .system = .init });
return actor.id;
}
// Actor termination
pub fn terminate(
self: *Actor,
reason: TerminationReason,
) !void {
// Notify linked actors
for (self.links.items) |linked| {
try linked.send(.{
.system = .{ .exit = .{
.from = self.id,
.reason = reason,
}},
});
}
// Notify monitors
for (self.monitors.items) |monitor| {
try monitor.send(.{
.system = .{ .down = .{
.actor = self.id,
.reason = reason,
}},
});
}
// Clean up resources
self.heap.deinit();
self.mailbox.deinit();
// Update state
self.state = .terminated;
// Return to pool
self.system.releaseActor(self);
}
};
pub const WorkStealingScheduler = struct {
// Per-worker queues
queues: [MAX_WORKERS]WorkQueue,
// Global queue for overflow
global_queue: WorkQueue,
// Stealing strategy
steal_strategy: StealStrategy = .random,
pub fn schedule(self: *Scheduler, worker_id: u32) !*Actor {
const queue = &self.queues[worker_id];
// Fast path: local queue
if (queue.pop()) |actor| {
return actor;
}
// Try global queue
if (self.global_queue.pop()) |actor| {
return actor;
}
// Steal from other workers
return self.steal(worker_id);
}
fn steal(self: *Scheduler, thief: u32) !*Actor {
const victim = switch (self.steal_strategy) {
.random => self.randomVictim(thief),
.round_robin => self.nextVictim(thief),
.least_loaded => self.leastLoadedVictim(),
};
return self.queues[victim].steal();
}
};
pub const ReductionCounter = struct {
// Cost of operations in reductions
pub const Costs = struct {
pub const message_send = 1;
pub const message_receive = 1;
pub const function_call = 1;
pub const allocation = 10;
pub const gpu_launch = 1000;
pub const io_operation = 100;
};
// Track reductions
pub fn consume(
actor: *Actor,
comptime operation: Operation,
) !void {
const cost = switch (operation) {
.message_send => Costs.message_send,
.message_receive => Costs.message_receive,
.function_call => Costs.function_call,
.allocation => Costs.allocation,
.gpu_launch => Costs.gpu_launch,
.io_operation => Costs.io_operation,
};
actor.reductions -= cost;
if (actor.reductions <= 0) {
// Yield to scheduler
try actor.yield();
actor.reductions = INITIAL_REDUCTIONS;
}
}
};
pub const MessagePassing = struct {
// Send message to actor
pub fn send(
from: ActorId,
to: ActorId,
payload: MessagePayload,
) !void {
const msg = Message{
.id = generateMessageId(),
.from = from,
.to = to,
.timestamp = std.time.nanoTimestamp(),
.payload = payload,
};
// Local or remote?
if (isLocal(to)) {
const actor = registry.get(to);
try actor.mailbox.push(msg);
} else {
try network.send(to.node, msg);
}
}
// Broadcast to multiple actors
pub fn broadcast(
from: ActorId,
recipients: []const ActorId,
payload: MessagePayload,
) !void {
// Group by node for efficiency
var by_node = std.AutoHashMap(NodeId, ArrayList(ActorId)).init();
defer by_node.deinit();
for (recipients) |recipient| {
const node = getNode(recipient);
const list = try by_node.getOrPut(node);
try list.append(recipient);
}
// Send batched messages per node
var iter = by_node.iterator();
while (iter.next()) |entry| {
try network.sendBatch(entry.key, entry.value, payload);
}
}
};
pub const ProcessLinking = struct {
// Link two actors
pub fn link(a: ActorId, b: ActorId) !void {
const actor_a = registry.get(a);
const actor_b = registry.get(b);
try actor_a.links.append(b);
try actor_b.links.append(a);
}
// Monitor an actor
pub fn monitor(
monitor: ActorId,
monitored: ActorId,
) !MonitorRef {
const ref = MonitorRef{
.monitor = monitor,
.monitored = monitored,
.ref = generateRef(),
};
const actor = registry.get(monitored);
try actor.monitors.append(ref);
return ref;
}
// Trap exit signals
pub fn trapExit(actor: ActorId, enable: bool) !void {
const a = registry.get(actor);
a.trap_exit = enable;
}
};
pub const GenServer = struct {
actor: Actor,
state: *anyopaque,
// Callbacks
init: *const fn (*anyopaque) anyerror!void,
handle_call: *const fn (*anyopaque, Message) anyerror!Response,
handle_cast: *const fn (*anyopaque, Message) anyerror!void,
handle_info: *const fn (*anyopaque, Message) anyerror!void,
terminate: *const fn (*anyopaque, Reason) void,
pub fn start(
callbacks: Callbacks,
initial_state: anytype,
) !ActorId {
const server = GenServer{
.state = initial_state,
.init = callbacks.init,
.handle_call = callbacks.handle_call,
.handle_cast = callbacks.handle_cast,
.handle_info = callbacks.handle_info,
.terminate = callbacks.terminate,
};
return ActorSystem.spawn(genServerBehavior, .{
.data = server,
});
}
fn genServerBehavior(actor: *Actor, msg: Message) !void {
const server = @ptrCast(*GenServer, actor.data);
switch (msg.type) {
.call => {
const response = try server.handle_call(server.state, msg);
try msg.from.reply(response);
},
.cast => {
try server.handle_cast(server.state, msg);
},
.info => {
try server.handle_info(server.state, msg);
},
}
}
};
pub const FSM = struct {
actor: Actor,
state: State,
data: *anyopaque,
pub const State = enum {
idle,
processing,
waiting,
error_state,
};
pub const Transition = struct {
from: State,
event: Event,
to: State,
action: ?*const fn (*anyopaque) anyerror!void,
};
transitions: []const Transition,
pub fn handleEvent(self: *FSM, event: Event) !void {
for (self.transitions) |transition| {
if (transition.from == self.state and
transition.event == event) {
if (transition.action) |action| {
try action(self.data);
}
self.state = transition.to;
return;
}
}
return error.InvalidTransition;
}
};
test "actor message passing" {
var system = try ActorSystem.init(.{});
defer system.deinit();
const actor = try system.spawn(echoActor, .{});
// Send message
try actor.send(.{ .data = "hello" });
// Verify response
const response = try actor.call(.{ .data = "world" }, 1000);
try testing.expectEqualStrings("world", response.data);
}
test "actor supervision" {
var system = try ActorSystem.init(.{});
defer system.deinit();
// Create supervisor
const supervisor = try system.spawn(supervisorActor, .{
.restart_strategy = .one_for_one,
});
// Create child that crashes
const child = try supervisor.startChild(crashingActor, .{});
// Verify child restarts
try child.send(.{ .crash = true });
try testing.expect(child.isAlive());
}
test "actor fault injection" {
var system = try ActorSystem.init(.{
.fault_injection = .{
.enabled = true,
.failure_rate = 0.1,
},
});
defer system.deinit();
var success_count: u32 = 0;
var failure_count: u32 = 0;
// Run 1000 operations
for (0..1000) |_| {
const result = actor.send(.{ .compute = data });
if (result) {
success_count += 1;
} else |err| {
failure_count += 1;
try testing.expect(err == error.InjectedFault);
}
}
// Verify ~10% failure rate
const failure_rate = @as(f32, failure_count) / 1000.0;
try testing.expect(failure_rate > 0.08 and failure_rate < 0.12);
}