Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 7: Domain-Driven Design

Bounded Contexts

Core Domain: Actor System

pub const ActorSystemDomain = struct {
    // Core concepts
    pub const Actor = struct {
        id: ActorId,
        behavior: Behavior,
        mailbox: Mailbox,
        state: State,
    };

    pub const Message = struct {
        from: ActorId,
        to: ActorId,
        payload: Payload,
    };

    pub const Supervisor = struct {
        strategy: RestartStrategy,
        children: []Actor,
    };

    // Domain services
    pub const ActorRegistry = struct {
        fn register(actor: Actor) !void;
        fn lookup(id: ActorId) ?*Actor;
        fn unregister(id: ActorId) void;
    };

    pub const MessageRouter = struct {
        fn route(msg: Message) !void;
        fn broadcast(msg: Message, targets: []ActorId) !void;
    };

    // Domain events
    pub const DomainEvent = union(enum) {
        actor_spawned: ActorSpawned,
        actor_terminated: ActorTerminated,
        message_sent: MessageSent,
        supervision_triggered: SupervisionTriggered,
    };
};

ML Domain: Training & Inference

pub const MLDomain = struct {
    // Value objects
    pub const TensorShape = struct {
        dims: []const usize,

        pub fn numElements(self: TensorShape) usize {
            var prod: usize = 1;
            for (self.dims) |d| prod *= d;
            return prod;
        }

        pub fn isCompatible(self: TensorShape, other: TensorShape) bool {
            // Broadcasting rules
            // ...
        }
    };

    pub const LearningRate = struct {
        value: f32,
        schedule: Schedule,

        pub fn decay(self: *LearningRate, step: u32) void {
            self.value = self.schedule.calculate(step);
        }
    };

    // Entities
    pub const Model = struct {
        id: ModelId,
        architecture: Architecture,
        parameters: []Parameter,
        version: Version,

        // Invariants
        pub fn validate(self: Model) !void {
            // Check architecture consistency
            // Validate parameter shapes
            // ...
        }
    };

    pub const TrainingSession = struct {
        id: SessionId,
        model: Model,
        dataset: Dataset,
        hyperparameters: Hyperparameters,
        state: TrainingState,

        pub fn canResume(self: TrainingSession) bool {
            return self.state != .completed and self.state != .failed;
        }
    };

    // Aggregates
    pub const TrainingRun = struct {
        session: TrainingSession,
        checkpoints: []Checkpoint,
        metrics: MetricsHistory,

        // Aggregate root ensures consistency
        pub fn addCheckpoint(self: *TrainingRun, cp: Checkpoint) !void {
            // Validate checkpoint
            if (cp.session_id != self.session.id) {
                return error.InvalidCheckpoint;
            }

            // Ensure monotonic progress
            if (self.checkpoints.len > 0) {
                const last = self.checkpoints[self.checkpoints.len - 1];
                if (cp.epoch <= last.epoch) {
                    return error.NonMonotonicCheckpoint;
                }
            }

            try self.checkpoints.append(cp);
        }
    };

    // Domain services
    pub const TrainingService = struct {
        pub fn startTraining(
            model: Model,
            dataset: Dataset,
            config: TrainingConfig,
        ) !TrainingSession {
            // Validate preconditions
            try model.validate();
            try dataset.validate();

            // Create session
            const session = TrainingSession{
                .id = generateId(),
                .model = model,
                .dataset = dataset,
                .hyperparameters = config.hyperparameters,
                .state = .initializing,
            };

            // Emit domain event
            try EventBus.publish(.{
                .training_started = .{
                    .session_id = session.id,
                    .model_id = model.id,
                },
            });

            return session;
        }
    };
};

GPU Domain: Device & Kernel Management

pub const GpuDomain = struct {
    // Value objects
    pub const ComputeCapability = struct {
        major: u8,
        minor: u8,

        pub fn supports(self: ComputeCapability, feature: Feature) bool {
            return switch (feature) {
                .tensor_cores => self.major >= 7,
                .bf16 => self.major >= 8,
                .fp64 => self.major >= 2,
                // ...
            };
        }
    };

    pub const MemoryRequirement = struct {
        size: usize,
        alignment: usize,
        access_pattern: AccessPattern,

        pub fn isSatisfiedBy(self: MemoryRequirement, allocation: Allocation) bool {
            return allocation.size >= self.size and
                   allocation.alignment >= self.alignment;
        }
    };

    // Entities
    pub const GpuDevice = struct {
        id: DeviceId,
        name: []const u8,
        capability: ComputeCapability,
        memory: MemoryInfo,
        health: HealthStatus,

        pub fn canExecute(self: GpuDevice, kernel: Kernel) bool {
            // Check capability requirements
            for (kernel.requirements) |req| {
                if (!self.capability.supports(req)) {
                    return false;
                }
            }

            // Check memory availability
            return self.memory.available >= kernel.memory_requirement;
        }
    };

    pub const KernelInstance = struct {
        id: InstanceId,
        kernel: Kernel,
        device: GpuDevice,
        state: ExecutionState,
        launch_config: LaunchConfig,

        // State transitions
        pub fn launch(self: *KernelInstance) !void {
            if (self.state != .ready) {
                return error.InvalidState;
            }

            self.state = .launching;
            // ...
        }
    };

    // Repository pattern
    pub const DeviceRepository = struct {
        fn findAvailable(requirements: Requirements) ![]GpuDevice;
        fn getById(id: DeviceId) ?GpuDevice;
        fn save(device: GpuDevice) !void;
    };

    // Domain service
    pub const KernelScheduler = struct {
        devices: DeviceRepository,

        pub fn scheduleKernel(
            kernel: Kernel,
            preferences: SchedulingPreferences,
        ) !KernelInstance {
            // Find suitable device
            const devices = try self.devices.findAvailable(kernel.requirements);

            if (devices.len == 0) {
                return error.NoSuitableDevice;
            }

            // Select best device based on preferences
            const device = selectBestDevice(devices, preferences);

            // Create kernel instance
            const instance = KernelInstance{
                .id = generateId(),
                .kernel = kernel,
                .device = device,
                .state = .ready,
                .launch_config = calculateLaunchConfig(kernel, device),
            };

            return instance;
        }
    };
};

Context Mapping

Integration Between Contexts

pub const ContextIntegration = struct {
    // Anti-corruption layer between Actor System and ML domains
    pub const MLActorAdapter = struct {
        pub fn adaptModelToActor(model: MLDomain.Model) ActorSystemDomain.Actor {
            return .{
                .id = generateActorId(model.id),
                .behavior = modelActorBehavior,
                .mailbox = Mailbox.init(),
                .state = .{
                    .model = model,
                },
            };
        }

        pub fn adaptMessageToMLCommand(
            msg: ActorSystemDomain.Message,
        ) ?MLCommand {
            return switch (msg.payload) {
                .train => |params| MLCommand.StartTraining{
                    .hyperparameters = params.hyperparameters,
                    .dataset = params.dataset,
                },
                .infer => |params| MLCommand.RunInference{
                    .input = params.input,
                    .batch_size = params.batch_size,
                },
                else => null,
            };
        }
    };

    // Shared kernel between ML and GPU domains
    pub const TensorOperations = struct {
        // Shared understanding of tensor operations
        pub const Operation = enum {
            matmul,
            conv2d,
            attention,
            // ...
        };

        pub fn getKernelForOperation(
            op: Operation,
            dtype: DataType,
            device: GpuDomain.ComputeCapability,
        ) Kernel {
            // Select optimized kernel based on device
            // ...
        }
    };

    // Published language for external systems
    pub const ExternalAPI = struct {
        pub const TrainingRequest = struct {
            model_architecture: []const u8,
            dataset_path: []const u8,
            config: struct {
                batch_size: u32,
                learning_rate: f32,
                epochs: u32,
            },
        };

        pub fn fromExternalRequest(
            req: TrainingRequest,
        ) !MLDomain.TrainingConfig {
            // Translate external format to internal domain model
            // ...
        }
    };
};

Domain Events

Event Sourcing

pub const EventStore = struct {
    events: std.ArrayListUnmanaged(DomainEvent),
    snapshots: std.AutoHashMap(AggregateId, Snapshot),

    pub const DomainEvent = struct {
        id: EventId,
        timestamp: i64,
        aggregate_id: AggregateId,
        version: u32,
        payload: EventPayload,
    };

    pub fn append(self: *EventStore, event: DomainEvent) !void {
        // Validate event
        if (event.version <= self.getLatestVersion(event.aggregate_id)) {
            return error.ConcurrencyConflict;
        }

        try self.events.append(event);

        // Publish to subscribers
        try EventBus.publish(event);
    }

    pub fn replay(
        self: *EventStore,
        aggregate_id: AggregateId,
        from_version: ?u32,
    ) ![]DomainEvent {
        var result = ArrayList(DomainEvent).init();

        // Start from snapshot if available
        if (self.snapshots.get(aggregate_id)) |snapshot| {
            if (from_version == null or snapshot.version >= from_version.?) {
                // Use snapshot as starting point
                from_version = snapshot.version + 1;
            }
        }

        // Replay events
        for (self.events.items) |event| {
            if (event.aggregate_id == aggregate_id and
                event.version >= (from_version orelse 0)) {
                try result.append(event);
            }
        }

        return result.toOwnedSlice();
    }
};

Saga Pattern for Distributed Transactions

pub const TrainingSaga = struct {
    pub const State = enum {
        allocating_resources,
        loading_data,
        initializing_model,
        training,
        checkpointing,
        completed,
        compensating,
        failed,
    };

    state: State,
    context: SagaContext,

    pub const SagaContext = struct {
        session_id: SessionId,
        allocated_devices: []DeviceId,
        loaded_datasets: []DatasetId,
        checkpoint_path: ?[]const u8,
    };

    pub fn handle(self: *TrainingSaga, event: DomainEvent) !void {
        switch (self.state) {
            .allocating_resources => {
                switch (event) {
                    .resource_allocated => |e| {
                        try self.context.allocated_devices.append(e.device_id);

                        if (self.context.allocated_devices.len >= required_devices) {
                            self.state = .loading_data;
                            try self.loadData();
                        }
                    },
                    .resource_allocation_failed => {
                        self.state = .compensating;
                        try self.compensate();
                    },
                    else => {},
                }
            },
            .training => {
                switch (event) {
                    .epoch_completed => |e| {
                        if (e.epoch % checkpoint_interval == 0) {
                            self.state = .checkpointing;
                            try self.createCheckpoint();
                        }
                    },
                    .training_failed => {
                        self.state = .compensating;
                        try self.compensate();
                    },
                    else => {},
                }
            },
            .compensating => {
                // Rollback actions
                for (self.context.allocated_devices) |device| {
                    try releaseDevice(device);
                }

                for (self.context.loaded_datasets) |dataset| {
                    try unloadDataset(dataset);
                }

                self.state = .failed;
            },
            // ...
        }
    }
};

Domain Testing

test "training aggregate maintains invariants" {
    var run = MLDomain.TrainingRun{
        .session = .{
            .id = "test_session",
            .model = test_model,
            .dataset = test_dataset,
            .hyperparameters = .{},
            .state = .running,
        },
        .checkpoints = ArrayList(Checkpoint).init(),
        .metrics = MetricsHistory.init(),
    };

    // Should accept valid checkpoint
    const cp1 = Checkpoint{
        .session_id = "test_session",
        .epoch = 1,
        .loss = 0.5,
    };
    try run.addCheckpoint(cp1);

    // Should reject non-monotonic checkpoint
    const cp2 = Checkpoint{
        .session_id = "test_session",
        .epoch = 0, // Earlier than cp1
        .loss = 0.6,
    };

    const result = run.addCheckpoint(cp2);
    try testing.expectError(error.NonMonotonicCheckpoint, result);
}

test "device capability checking" {
    const device = GpuDomain.GpuDevice{
        .id = "gpu0",
        .name = "RTX 3090",
        .capability = .{ .major = 8, .minor = 6 },
        .memory = .{
            .total = 24 * 1024 * 1024 * 1024,
            .available = 20 * 1024 * 1024 * 1024,
        },
        .health = .healthy,
    };

    const kernel_with_tensor_cores = Kernel{
        .requirements = &[_]Feature{.tensor_cores},
        .memory_requirement = 1024 * 1024 * 1024,
    };

    // Should support tensor cores (8.6 >= 7.0)
    try testing.expect(device.canExecute(kernel_with_tensor_cores));

    const kernel_with_huge_memory = Kernel{
        .requirements = &[_]Feature{},
        .memory_requirement = 30 * 1024 * 1024 * 1024,
    };

    // Should reject due to insufficient memory
    try testing.expect(!device.canExecute(kernel_with_huge_memory));
}

test "saga compensation on failure" {
    var saga = TrainingSaga{
        .state = .training,
        .context = .{
            .session_id = "test",
            .allocated_devices = &[_]DeviceId{"gpu0", "gpu1"},
            .loaded_datasets = &[_]DatasetId{"dataset1"},
            .checkpoint_path = null,
        },
    };

    // Simulate training failure
    try saga.handle(.{ .training_failed = .{
        .session_id = "test",
        .reason = "OOM",
    }});

    // Should transition to compensating
    try testing.expectEqual(.compensating, saga.state);

    // Should clean up resources
    // (In real implementation, would verify devices are released)
}