pub const Supervisor = struct {
actor: Actor,
children: BoundedArray(Child, MAX_CHILDREN),
strategy: RestartStrategy,
intensity: u32, // Max restarts
period: i64, // Time window (nanoseconds)
restart_count: u32,
restart_timestamps: BoundedArray(i64, MAX_RESTARTS),
pub const Child = struct {
id: ActorId,
spec: ChildSpec,
state: ChildState,
restart_count: u32,
};
pub const ChildSpec = struct {
behavior: ActorBehavior,
restart: RestartType,
shutdown: ShutdownType,
type: ChildType,
};
pub const RestartType = enum {
permanent, // Always restart
temporary, // Never restart
transient, // Restart only on abnormal exit
};
pub const ChildState = enum {
starting,
running,
stopping,
stopped,
};
};
pub const RestartStrategies = struct {
// Restart only the failed child
pub fn oneForOne(
supervisor: *Supervisor,
failed_child: ActorId,
reason: TerminationReason,
) !void {
const child = supervisor.findChild(failed_child) orelse
return error.ChildNotFound;
// Check if should restart
if (shouldRestart(child, reason)) {
try supervisor.restartChild(child);
}
}
// Restart all children
pub fn oneForAll(
supervisor: *Supervisor,
failed_child: ActorId,
reason: TerminationReason,
) !void {
_ = failed_child;
_ = reason;
// Stop all children
for (supervisor.children.items) |*child| {
try supervisor.stopChild(child);
}
// Restart all children
for (supervisor.children.items) |*child| {
try supervisor.restartChild(child);
}
}
// Restart failed child and all children started after it
pub fn restForOne(
supervisor: *Supervisor,
failed_child: ActorId,
reason: TerminationReason,
) !void {
const failed_idx = supervisor.findChildIndex(failed_child) orelse
return error.ChildNotFound;
// Stop failed and younger children
for (failed_idx..supervisor.children.items.len) |i| {
try supervisor.stopChild(&supervisor.children.items[i]);
}
// Restart them
for (failed_idx..supervisor.children.items.len) |i| {
try supervisor.restartChild(&supervisor.children.items[i]);
}
}
fn shouldRestart(child: *Child, reason: TerminationReason) bool {
return switch (child.spec.restart) {
.permanent => true,
.temporary => false,
.transient => reason != .normal,
};
}
};
pub const DynamicSupervisor = struct {
supervisor: Supervisor,
child_spec: ChildSpec, // Template for all children
pub fn startChild(
self: *DynamicSupervisor,
args: anytype,
) !ActorId {
const child = try self.supervisor.actor.system.spawn(
self.child_spec.behavior,
.{
.supervisor = self.supervisor.actor.id,
.args = args,
},
);
try self.supervisor.children.append(.{
.id = child,
.spec = self.child_spec,
.state = .running,
.restart_count = 0,
});
return child;
}
pub fn terminateChild(
self: *DynamicSupervisor,
child_id: ActorId,
) !void {
const idx = self.supervisor.findChildIndex(child_id) orelse
return error.ChildNotFound;
try self.supervisor.stopChild(&self.supervisor.children.items[idx]);
_ = self.supervisor.children.swapRemove(idx);
}
};
pub const TrainingSupervisor = struct {
supervisor: Supervisor,
// Children
model_actor: ActorId,
optimizer_actor: ActorId,
dataloader_actor: ActorId,
checkpoint_actor: ActorId,
metric_actor: ActorId,
// Training state
current_epoch: u32,
current_batch: u32,
best_loss: f32,
pub fn init(system: *ActorSystem, config: TrainingConfig) !ActorId {
const supervisor = try system.spawn(trainingSupervisorBehavior, .{
.strategy = .rest_for_one, // Model failure restarts optimizer too
.intensity = 10,
.period = 60_000_000_000, // 1 minute
});
// Start children in order
const model = try supervisor.startChild(ModelActor, .{
.architecture = config.model,
});
const optimizer = try supervisor.startChild(OptimizerActor, .{
.type = config.optimizer,
.lr = config.learning_rate,
});
const dataloader = try supervisor.startChild(DataloaderActor, .{
.dataset = config.dataset,
.batch_size = config.batch_size,
});
const checkpoint = try supervisor.startChild(CheckpointActor, .{
.interval = config.checkpoint_interval,
.path = config.checkpoint_path,
});
const metrics = try supervisor.startChild(MetricActor, .{});
return supervisor;
}
fn trainingSupervisorBehavior(self: *Actor, msg: Message) !void {
switch (msg.payload) {
.child_failed => |failure| {
try handleChildFailure(self, failure);
},
.epoch_complete => |epoch| {
try handleEpochComplete(self, epoch);
},
.checkpoint_request => {
try performCheckpoint(self);
},
}
}
fn handleChildFailure(
self: *Actor,
failure: ChildFailure,
) !void {
const supervisor = @fieldParentPtr(TrainingSupervisor, "supervisor.actor", self);
switch (failure.child_type) {
.model => {
// Critical failure - checkpoint and restart
try supervisor.checkpoint_actor.send(.emergency_checkpoint);
try supervisor.supervisor.restartStrategy(failure.child_id, failure.reason);
},
.dataloader => {
// Can recover - just restart
try supervisor.supervisor.oneForOne(failure.child_id, failure.reason);
},
.optimizer => {
// Reset optimizer state
try supervisor.optimizer_actor.send(.reset_state);
try supervisor.supervisor.oneForOne(failure.child_id, failure.reason);
},
}
}
};
pub const GpuSupervisor = struct {
supervisor: Supervisor,
devices: []Device,
kernel_actors: std.AutoHashMap(KernelId, ActorId),
// Health monitoring
health_checker: ActorId,
unhealthy_devices: std.AutoHashMap(DeviceId, HealthStatus),
pub fn superviseKernel(
self: *GpuSupervisor,
kernel: Kernel,
device: Device,
) !ActorId {
// Check device health
if (self.unhealthy_devices.get(device.id)) |status| {
if (status.severity == .critical) {
// Find alternative device
device = try self.findHealthyDevice(kernel.requirements);
}
}
// Create supervised kernel actor
const actor = try self.supervisor.startChild(GpuKernelActor, .{
.kernel = kernel,
.device = device,
.restart = .transient, // Restart on GPU errors
});
try self.kernel_actors.put(kernel.id, actor);
return actor;
}
pub fn handleGpuError(
self: *GpuSupervisor,
device: DeviceId,
error_type: GpuError,
) !void {
// Update health status
const status = self.unhealthy_devices.get(device) orelse .{
.error_count = 0,
.last_error = 0,
.severity = .healthy,
};
status.error_count += 1;
status.last_error = std.time.nanoTimestamp();
// Determine severity
status.severity = if (status.error_count > 10)
.critical
else if (status.error_count > 5)
.degraded
else
.warning;
try self.unhealthy_devices.put(device, status);
// Migrate actors if critical
if (status.severity == .critical) {
try self.migrateActorsFromDevice(device);
}
}
fn migrateActorsFromDevice(
self: *GpuSupervisor,
failed_device: DeviceId,
) !void {
var iter = self.kernel_actors.iterator();
while (iter.next()) |entry| {
const actor = self.supervisor.actor.system.getActor(entry.value);
const kernel_actor = @fieldParentPtr(GpuKernelActor, "actor", actor);
if (kernel_actor.device.id == failed_device) {
const new_device = try self.findHealthyDevice(
kernel_actor.kernel.requirements
);
try kernel_actor.migrateTo(new_device);
}
}
}
};
pub const PhiAccrualDetector = struct {
// Sliding window of heartbeat intervals
intervals: BoundedArray(i64, WINDOW_SIZE),
last_heartbeat: i64,
threshold: f64 = 8.0, // Suspicion threshold
pub fn heartbeat(self: *PhiAccrualDetector) void {
const now = std.time.nanoTimestamp();
if (self.last_heartbeat > 0) {
const interval = now - self.last_heartbeat;
try self.intervals.append(interval);
// Keep window size bounded
if (self.intervals.items.len > WINDOW_SIZE) {
_ = self.intervals.orderedRemove(0);
}
}
self.last_heartbeat = now;
}
pub fn phi(self: *PhiAccrualDetector) f64 {
if (self.intervals.items.len < MIN_SAMPLES) {
return 0.0; // Not enough data
}
const now = std.time.nanoTimestamp();
const time_since = now - self.last_heartbeat;
// Calculate mean and variance
const mean = calculateMean(self.intervals.items);
const variance = calculateVariance(self.intervals.items, mean);
const stddev = @sqrt(variance);
// Calculate phi
const phi_value = -@log10(1.0 - cdf(time_since, mean, stddev));
return phi_value;
}
pub fn isAlive(self: *PhiAccrualDetector) bool {
return self.phi() < self.threshold;
}
};
pub const CircuitBreaker = struct {
state: State,
failure_count: u32,
success_count: u32,
last_failure: i64,
// Configuration
failure_threshold: u32 = 5,
success_threshold: u32 = 3,
timeout: i64 = 30_000_000_000, // 30 seconds
pub const State = enum {
closed, // Normal operation
open, // Failing, reject calls
half_open, // Testing recovery
};
pub fn call(
self: *CircuitBreaker,
func: anytype,
args: anytype,
) !ReturnType(@TypeOf(func)) {
switch (self.state) {
.open => {
// Check if should transition to half-open
if (std.time.nanoTimestamp() - self.last_failure > self.timeout) {
self.state = .half_open;
self.success_count = 0;
} else {
return error.CircuitOpen;
}
},
.half_open => {
// Testing recovery
const result = func(args) catch |err| {
self.state = .open;
self.failure_count += 1;
self.last_failure = std.time.nanoTimestamp();
return err;
};
self.success_count += 1;
if (self.success_count >= self.success_threshold) {
self.state = .closed;
self.failure_count = 0;
}
return result;
},
.closed => {
// Normal operation
const result = func(args) catch |err| {
self.failure_count += 1;
self.last_failure = std.time.nanoTimestamp();
if (self.failure_count >= self.failure_threshold) {
self.state = .open;
}
return err;
};
// Reset on success
self.failure_count = 0;
return result;
},
}
}
};
pub const CheckpointManager = struct {
// Checkpoint storage
storage: CheckpointStorage,
// Active checkpoints
checkpoints: std.AutoHashMap(ActorId, Checkpoint),
pub const Checkpoint = struct {
actor_id: ActorId,
timestamp: i64,
state: []const u8,
mailbox: []Message,
metadata: CheckpointMetadata,
};
pub fn checkpoint(
self: *CheckpointManager,
actor: *Actor,
) !void {
// Serialize actor state
const state = try actor.serialize();
defer state.deinit();
// Save mailbox messages
var messages = ArrayList(Message).init();
var iter = actor.mailbox.iterator();
while (iter.next()) |msg| {
try messages.append(msg.*);
}
const cp = Checkpoint{
.actor_id = actor.id,
.timestamp = std.time.nanoTimestamp(),
.state = state.items,
.mailbox = messages.toOwnedSlice(),
.metadata = .{
.reductions = actor.reductions,
.message_count = actor.mailbox.count(),
},
};
// Store checkpoint
try self.storage.store(cp);
try self.checkpoints.put(actor.id, cp);
}
pub fn recover(
self: *CheckpointManager,
actor_id: ActorId,
) !*Actor {
const cp = self.checkpoints.get(actor_id) orelse
try self.storage.load(actor_id);
// Create new actor with saved state
const actor = try Actor.deserialize(cp.state);
// Restore mailbox
for (cp.mailbox) |msg| {
try actor.mailbox.push(msg);
}
// Restore metadata
actor.reductions = cp.metadata.reductions;
return actor;
}
};
test "supervisor restart strategies" {
var system = try ActorSystem.init(.{});
defer system.deinit();
// Test one-for-one
{
const supervisor = try system.spawn(Supervisor, .{
.strategy = .one_for_one,
});
const child1 = try supervisor.startChild(testActor, .{});
const child2 = try supervisor.startChild(testActor, .{});
// Kill child1
try child1.send(.{ .exit = .crash });
// Child1 should restart, child2 should be unaffected
try testing.expect(child1.isAlive());
try testing.expect(child2.isAlive());
}
// Test one-for-all
{
const supervisor = try system.spawn(Supervisor, .{
.strategy = .one_for_all,
});
const child1 = try supervisor.startChild(testActor, .{});
const child2 = try supervisor.startChild(testActor, .{});
const child1_pid = child1.id;
const child2_pid = child2.id;
// Kill child1
try child1.send(.{ .exit = .crash });
// Both should have new PIDs (restarted)
try testing.expect(child1.id != child1_pid);
try testing.expect(child2.id != child2_pid);
}
}
test "circuit breaker" {
var breaker = CircuitBreaker{
.state = .closed,
.failure_count = 0,
.success_count = 0,
.last_failure = 0,
.failure_threshold = 3,
};
var fail_count: u32 = 0;
const failingFunc = struct {
fn call(count: *u32) !void {
count.* += 1;
if (count.* <= 3) {
return error.ServiceError;
}
}
}.call;
// First 3 calls fail, circuit opens
for (0..3) |_| {
_ = breaker.call(failingFunc, &fail_count) catch {};
}
try testing.expect(breaker.state == .open);
// Calls while open should fail fast
const result = breaker.call(failingFunc, &fail_count);
try testing.expectError(error.CircuitOpen, result);
}
test "checkpoint and recovery" {
var system = try ActorSystem.init(.{});
defer system.deinit();
var manager = try CheckpointManager.init();
// Create actor with state
const actor = try system.spawn(statefulActor, .{
.initial_state = 42,
});
// Process some messages
for (0..10) |i| {
try actor.send(.{ .increment = i });
}
// Checkpoint
try manager.checkpoint(actor);
// Simulate crash
try actor.terminate(.crash);
// Recover
const recovered = try manager.recover(actor.id);
// State should be preserved
try testing.expectEqual(42 + 45, recovered.state);
}