Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 5: Supervision & Fault Tolerance

Supervision Trees

Supervisor Structure

pub const Supervisor = struct {
    actor: Actor,
    children: BoundedArray(Child, MAX_CHILDREN),
    strategy: RestartStrategy,
    intensity: u32,          // Max restarts
    period: i64,             // Time window (nanoseconds)
    restart_count: u32,
    restart_timestamps: BoundedArray(i64, MAX_RESTARTS),

    pub const Child = struct {
        id: ActorId,
        spec: ChildSpec,
        state: ChildState,
        restart_count: u32,
    };

    pub const ChildSpec = struct {
        behavior: ActorBehavior,
        restart: RestartType,
        shutdown: ShutdownType,
        type: ChildType,
    };

    pub const RestartType = enum {
        permanent,    // Always restart
        temporary,    // Never restart
        transient,    // Restart only on abnormal exit
    };

    pub const ChildState = enum {
        starting,
        running,
        stopping,
        stopped,
    };
};

Restart Strategies

pub const RestartStrategies = struct {
    // Restart only the failed child
    pub fn oneForOne(
        supervisor: *Supervisor,
        failed_child: ActorId,
        reason: TerminationReason,
    ) !void {
        const child = supervisor.findChild(failed_child) orelse
            return error.ChildNotFound;

        // Check if should restart
        if (shouldRestart(child, reason)) {
            try supervisor.restartChild(child);
        }
    }

    // Restart all children
    pub fn oneForAll(
        supervisor: *Supervisor,
        failed_child: ActorId,
        reason: TerminationReason,
    ) !void {
        _ = failed_child;
        _ = reason;

        // Stop all children
        for (supervisor.children.items) |*child| {
            try supervisor.stopChild(child);
        }

        // Restart all children
        for (supervisor.children.items) |*child| {
            try supervisor.restartChild(child);
        }
    }

    // Restart failed child and all children started after it
    pub fn restForOne(
        supervisor: *Supervisor,
        failed_child: ActorId,
        reason: TerminationReason,
    ) !void {
        const failed_idx = supervisor.findChildIndex(failed_child) orelse
            return error.ChildNotFound;

        // Stop failed and younger children
        for (failed_idx..supervisor.children.items.len) |i| {
            try supervisor.stopChild(&supervisor.children.items[i]);
        }

        // Restart them
        for (failed_idx..supervisor.children.items.len) |i| {
            try supervisor.restartChild(&supervisor.children.items[i]);
        }
    }

    fn shouldRestart(child: *Child, reason: TerminationReason) bool {
        return switch (child.spec.restart) {
            .permanent => true,
            .temporary => false,
            .transient => reason != .normal,
        };
    }
};

Dynamic Supervision

pub const DynamicSupervisor = struct {
    supervisor: Supervisor,
    child_spec: ChildSpec,  // Template for all children

    pub fn startChild(
        self: *DynamicSupervisor,
        args: anytype,
    ) !ActorId {
        const child = try self.supervisor.actor.system.spawn(
            self.child_spec.behavior,
            .{
                .supervisor = self.supervisor.actor.id,
                .args = args,
            },
        );

        try self.supervisor.children.append(.{
            .id = child,
            .spec = self.child_spec,
            .state = .running,
            .restart_count = 0,
        });

        return child;
    }

    pub fn terminateChild(
        self: *DynamicSupervisor,
        child_id: ActorId,
    ) !void {
        const idx = self.supervisor.findChildIndex(child_id) orelse
            return error.ChildNotFound;

        try self.supervisor.stopChild(&self.supervisor.children.items[idx]);
        _ = self.supervisor.children.swapRemove(idx);
    }
};

ML-Specific Supervisors

Training Supervisor

pub const TrainingSupervisor = struct {
    supervisor: Supervisor,

    // Children
    model_actor: ActorId,
    optimizer_actor: ActorId,
    dataloader_actor: ActorId,
    checkpoint_actor: ActorId,
    metric_actor: ActorId,

    // Training state
    current_epoch: u32,
    current_batch: u32,
    best_loss: f32,

    pub fn init(system: *ActorSystem, config: TrainingConfig) !ActorId {
        const supervisor = try system.spawn(trainingSupervisorBehavior, .{
            .strategy = .rest_for_one,  // Model failure restarts optimizer too
            .intensity = 10,
            .period = 60_000_000_000,  // 1 minute
        });

        // Start children in order
        const model = try supervisor.startChild(ModelActor, .{
            .architecture = config.model,
        });

        const optimizer = try supervisor.startChild(OptimizerActor, .{
            .type = config.optimizer,
            .lr = config.learning_rate,
        });

        const dataloader = try supervisor.startChild(DataloaderActor, .{
            .dataset = config.dataset,
            .batch_size = config.batch_size,
        });

        const checkpoint = try supervisor.startChild(CheckpointActor, .{
            .interval = config.checkpoint_interval,
            .path = config.checkpoint_path,
        });

        const metrics = try supervisor.startChild(MetricActor, .{});

        return supervisor;
    }

    fn trainingSupervisorBehavior(self: *Actor, msg: Message) !void {
        switch (msg.payload) {
            .child_failed => |failure| {
                try handleChildFailure(self, failure);
            },
            .epoch_complete => |epoch| {
                try handleEpochComplete(self, epoch);
            },
            .checkpoint_request => {
                try performCheckpoint(self);
            },
        }
    }

    fn handleChildFailure(
        self: *Actor,
        failure: ChildFailure,
    ) !void {
        const supervisor = @fieldParentPtr(TrainingSupervisor, "supervisor.actor", self);

        switch (failure.child_type) {
            .model => {
                // Critical failure - checkpoint and restart
                try supervisor.checkpoint_actor.send(.emergency_checkpoint);
                try supervisor.supervisor.restartStrategy(failure.child_id, failure.reason);
            },
            .dataloader => {
                // Can recover - just restart
                try supervisor.supervisor.oneForOne(failure.child_id, failure.reason);
            },
            .optimizer => {
                // Reset optimizer state
                try supervisor.optimizer_actor.send(.reset_state);
                try supervisor.supervisor.oneForOne(failure.child_id, failure.reason);
            },
        }
    }
};

GPU Supervisor

pub const GpuSupervisor = struct {
    supervisor: Supervisor,
    devices: []Device,
    kernel_actors: std.AutoHashMap(KernelId, ActorId),

    // Health monitoring
    health_checker: ActorId,
    unhealthy_devices: std.AutoHashMap(DeviceId, HealthStatus),

    pub fn superviseKernel(
        self: *GpuSupervisor,
        kernel: Kernel,
        device: Device,
    ) !ActorId {
        // Check device health
        if (self.unhealthy_devices.get(device.id)) |status| {
            if (status.severity == .critical) {
                // Find alternative device
                device = try self.findHealthyDevice(kernel.requirements);
            }
        }

        // Create supervised kernel actor
        const actor = try self.supervisor.startChild(GpuKernelActor, .{
            .kernel = kernel,
            .device = device,
            .restart = .transient,  // Restart on GPU errors
        });

        try self.kernel_actors.put(kernel.id, actor);
        return actor;
    }

    pub fn handleGpuError(
        self: *GpuSupervisor,
        device: DeviceId,
        error_type: GpuError,
    ) !void {
        // Update health status
        const status = self.unhealthy_devices.get(device) orelse .{
            .error_count = 0,
            .last_error = 0,
            .severity = .healthy,
        };

        status.error_count += 1;
        status.last_error = std.time.nanoTimestamp();

        // Determine severity
        status.severity = if (status.error_count > 10)
            .critical
        else if (status.error_count > 5)
            .degraded
        else
            .warning;

        try self.unhealthy_devices.put(device, status);

        // Migrate actors if critical
        if (status.severity == .critical) {
            try self.migrateActorsFromDevice(device);
        }
    }

    fn migrateActorsFromDevice(
        self: *GpuSupervisor,
        failed_device: DeviceId,
    ) !void {
        var iter = self.kernel_actors.iterator();
        while (iter.next()) |entry| {
            const actor = self.supervisor.actor.system.getActor(entry.value);
            const kernel_actor = @fieldParentPtr(GpuKernelActor, "actor", actor);

            if (kernel_actor.device.id == failed_device) {
                const new_device = try self.findHealthyDevice(
                    kernel_actor.kernel.requirements
                );

                try kernel_actor.migrateTo(new_device);
            }
        }
    }
};

Failure Detection

Phi Accrual Failure Detector

pub const PhiAccrualDetector = struct {
    // Sliding window of heartbeat intervals
    intervals: BoundedArray(i64, WINDOW_SIZE),
    last_heartbeat: i64,
    threshold: f64 = 8.0,  // Suspicion threshold

    pub fn heartbeat(self: *PhiAccrualDetector) void {
        const now = std.time.nanoTimestamp();

        if (self.last_heartbeat > 0) {
            const interval = now - self.last_heartbeat;
            try self.intervals.append(interval);

            // Keep window size bounded
            if (self.intervals.items.len > WINDOW_SIZE) {
                _ = self.intervals.orderedRemove(0);
            }
        }

        self.last_heartbeat = now;
    }

    pub fn phi(self: *PhiAccrualDetector) f64 {
        if (self.intervals.items.len < MIN_SAMPLES) {
            return 0.0;  // Not enough data
        }

        const now = std.time.nanoTimestamp();
        const time_since = now - self.last_heartbeat;

        // Calculate mean and variance
        const mean = calculateMean(self.intervals.items);
        const variance = calculateVariance(self.intervals.items, mean);
        const stddev = @sqrt(variance);

        // Calculate phi
        const phi_value = -@log10(1.0 - cdf(time_since, mean, stddev));
        return phi_value;
    }

    pub fn isAlive(self: *PhiAccrualDetector) bool {
        return self.phi() < self.threshold;
    }
};

Circuit Breaker

pub const CircuitBreaker = struct {
    state: State,
    failure_count: u32,
    success_count: u32,
    last_failure: i64,

    // Configuration
    failure_threshold: u32 = 5,
    success_threshold: u32 = 3,
    timeout: i64 = 30_000_000_000,  // 30 seconds

    pub const State = enum {
        closed,      // Normal operation
        open,        // Failing, reject calls
        half_open,   // Testing recovery
    };

    pub fn call(
        self: *CircuitBreaker,
        func: anytype,
        args: anytype,
    ) !ReturnType(@TypeOf(func)) {
        switch (self.state) {
            .open => {
                // Check if should transition to half-open
                if (std.time.nanoTimestamp() - self.last_failure > self.timeout) {
                    self.state = .half_open;
                    self.success_count = 0;
                } else {
                    return error.CircuitOpen;
                }
            },
            .half_open => {
                // Testing recovery
                const result = func(args) catch |err| {
                    self.state = .open;
                    self.failure_count += 1;
                    self.last_failure = std.time.nanoTimestamp();
                    return err;
                };

                self.success_count += 1;
                if (self.success_count >= self.success_threshold) {
                    self.state = .closed;
                    self.failure_count = 0;
                }

                return result;
            },
            .closed => {
                // Normal operation
                const result = func(args) catch |err| {
                    self.failure_count += 1;
                    self.last_failure = std.time.nanoTimestamp();

                    if (self.failure_count >= self.failure_threshold) {
                        self.state = .open;
                    }

                    return err;
                };

                // Reset on success
                self.failure_count = 0;
                return result;
            },
        }
    }
};

Recovery Mechanisms

Checkpoint and Recovery

pub const CheckpointManager = struct {
    // Checkpoint storage
    storage: CheckpointStorage,

    // Active checkpoints
    checkpoints: std.AutoHashMap(ActorId, Checkpoint),

    pub const Checkpoint = struct {
        actor_id: ActorId,
        timestamp: i64,
        state: []const u8,
        mailbox: []Message,
        metadata: CheckpointMetadata,
    };

    pub fn checkpoint(
        self: *CheckpointManager,
        actor: *Actor,
    ) !void {
        // Serialize actor state
        const state = try actor.serialize();
        defer state.deinit();

        // Save mailbox messages
        var messages = ArrayList(Message).init();
        var iter = actor.mailbox.iterator();
        while (iter.next()) |msg| {
            try messages.append(msg.*);
        }

        const cp = Checkpoint{
            .actor_id = actor.id,
            .timestamp = std.time.nanoTimestamp(),
            .state = state.items,
            .mailbox = messages.toOwnedSlice(),
            .metadata = .{
                .reductions = actor.reductions,
                .message_count = actor.mailbox.count(),
            },
        };

        // Store checkpoint
        try self.storage.store(cp);
        try self.checkpoints.put(actor.id, cp);
    }

    pub fn recover(
        self: *CheckpointManager,
        actor_id: ActorId,
    ) !*Actor {
        const cp = self.checkpoints.get(actor_id) orelse
            try self.storage.load(actor_id);

        // Create new actor with saved state
        const actor = try Actor.deserialize(cp.state);

        // Restore mailbox
        for (cp.mailbox) |msg| {
            try actor.mailbox.push(msg);
        }

        // Restore metadata
        actor.reductions = cp.metadata.reductions;

        return actor;
    }
};

Testing Fault Tolerance

test "supervisor restart strategies" {
    var system = try ActorSystem.init(.{});
    defer system.deinit();

    // Test one-for-one
    {
        const supervisor = try system.spawn(Supervisor, .{
            .strategy = .one_for_one,
        });

        const child1 = try supervisor.startChild(testActor, .{});
        const child2 = try supervisor.startChild(testActor, .{});

        // Kill child1
        try child1.send(.{ .exit = .crash });

        // Child1 should restart, child2 should be unaffected
        try testing.expect(child1.isAlive());
        try testing.expect(child2.isAlive());
    }

    // Test one-for-all
    {
        const supervisor = try system.spawn(Supervisor, .{
            .strategy = .one_for_all,
        });

        const child1 = try supervisor.startChild(testActor, .{});
        const child2 = try supervisor.startChild(testActor, .{});

        const child1_pid = child1.id;
        const child2_pid = child2.id;

        // Kill child1
        try child1.send(.{ .exit = .crash });

        // Both should have new PIDs (restarted)
        try testing.expect(child1.id != child1_pid);
        try testing.expect(child2.id != child2_pid);
    }
}

test "circuit breaker" {
    var breaker = CircuitBreaker{
        .state = .closed,
        .failure_count = 0,
        .success_count = 0,
        .last_failure = 0,
        .failure_threshold = 3,
    };

    var fail_count: u32 = 0;

    const failingFunc = struct {
        fn call(count: *u32) !void {
            count.* += 1;
            if (count.* <= 3) {
                return error.ServiceError;
            }
        }
    }.call;

    // First 3 calls fail, circuit opens
    for (0..3) |_| {
        _ = breaker.call(failingFunc, &fail_count) catch {};
    }

    try testing.expect(breaker.state == .open);

    // Calls while open should fail fast
    const result = breaker.call(failingFunc, &fail_count);
    try testing.expectError(error.CircuitOpen, result);
}

test "checkpoint and recovery" {
    var system = try ActorSystem.init(.{});
    defer system.deinit();

    var manager = try CheckpointManager.init();

    // Create actor with state
    const actor = try system.spawn(statefulActor, .{
        .initial_state = 42,
    });

    // Process some messages
    for (0..10) |i| {
        try actor.send(.{ .increment = i });
    }

    // Checkpoint
    try manager.checkpoint(actor);

    // Simulate crash
    try actor.terminate(.crash);

    // Recover
    const recovered = try manager.recover(actor.id);

    // State should be preserved
    try testing.expectEqual(42 + 45, recovered.state);
}