pub const ActorSystemDomain = struct {
// Core concepts
pub const Actor = struct {
id: ActorId,
behavior: Behavior,
mailbox: Mailbox,
state: State,
};
pub const Message = struct {
from: ActorId,
to: ActorId,
payload: Payload,
};
pub const Supervisor = struct {
strategy: RestartStrategy,
children: []Actor,
};
// Domain services
pub const ActorRegistry = struct {
fn register(actor: Actor) !void;
fn lookup(id: ActorId) ?*Actor;
fn unregister(id: ActorId) void;
};
pub const MessageRouter = struct {
fn route(msg: Message) !void;
fn broadcast(msg: Message, targets: []ActorId) !void;
};
// Domain events
pub const DomainEvent = union(enum) {
actor_spawned: ActorSpawned,
actor_terminated: ActorTerminated,
message_sent: MessageSent,
supervision_triggered: SupervisionTriggered,
};
};
pub const MLDomain = struct {
// Value objects
pub const TensorShape = struct {
dims: []const usize,
pub fn numElements(self: TensorShape) usize {
var prod: usize = 1;
for (self.dims) |d| prod *= d;
return prod;
}
pub fn isCompatible(self: TensorShape, other: TensorShape) bool {
// Broadcasting rules
// ...
}
};
pub const LearningRate = struct {
value: f32,
schedule: Schedule,
pub fn decay(self: *LearningRate, step: u32) void {
self.value = self.schedule.calculate(step);
}
};
// Entities
pub const Model = struct {
id: ModelId,
architecture: Architecture,
parameters: []Parameter,
version: Version,
// Invariants
pub fn validate(self: Model) !void {
// Check architecture consistency
// Validate parameter shapes
// ...
}
};
pub const TrainingSession = struct {
id: SessionId,
model: Model,
dataset: Dataset,
hyperparameters: Hyperparameters,
state: TrainingState,
pub fn canResume(self: TrainingSession) bool {
return self.state != .completed and self.state != .failed;
}
};
// Aggregates
pub const TrainingRun = struct {
session: TrainingSession,
checkpoints: []Checkpoint,
metrics: MetricsHistory,
// Aggregate root ensures consistency
pub fn addCheckpoint(self: *TrainingRun, cp: Checkpoint) !void {
// Validate checkpoint
if (cp.session_id != self.session.id) {
return error.InvalidCheckpoint;
}
// Ensure monotonic progress
if (self.checkpoints.len > 0) {
const last = self.checkpoints[self.checkpoints.len - 1];
if (cp.epoch <= last.epoch) {
return error.NonMonotonicCheckpoint;
}
}
try self.checkpoints.append(cp);
}
};
// Domain services
pub const TrainingService = struct {
pub fn startTraining(
model: Model,
dataset: Dataset,
config: TrainingConfig,
) !TrainingSession {
// Validate preconditions
try model.validate();
try dataset.validate();
// Create session
const session = TrainingSession{
.id = generateId(),
.model = model,
.dataset = dataset,
.hyperparameters = config.hyperparameters,
.state = .initializing,
};
// Emit domain event
try EventBus.publish(.{
.training_started = .{
.session_id = session.id,
.model_id = model.id,
},
});
return session;
}
};
};
pub const GpuDomain = struct {
// Value objects
pub const ComputeCapability = struct {
major: u8,
minor: u8,
pub fn supports(self: ComputeCapability, feature: Feature) bool {
return switch (feature) {
.tensor_cores => self.major >= 7,
.bf16 => self.major >= 8,
.fp64 => self.major >= 2,
// ...
};
}
};
pub const MemoryRequirement = struct {
size: usize,
alignment: usize,
access_pattern: AccessPattern,
pub fn isSatisfiedBy(self: MemoryRequirement, allocation: Allocation) bool {
return allocation.size >= self.size and
allocation.alignment >= self.alignment;
}
};
// Entities
pub const GpuDevice = struct {
id: DeviceId,
name: []const u8,
capability: ComputeCapability,
memory: MemoryInfo,
health: HealthStatus,
pub fn canExecute(self: GpuDevice, kernel: Kernel) bool {
// Check capability requirements
for (kernel.requirements) |req| {
if (!self.capability.supports(req)) {
return false;
}
}
// Check memory availability
return self.memory.available >= kernel.memory_requirement;
}
};
pub const KernelInstance = struct {
id: InstanceId,
kernel: Kernel,
device: GpuDevice,
state: ExecutionState,
launch_config: LaunchConfig,
// State transitions
pub fn launch(self: *KernelInstance) !void {
if (self.state != .ready) {
return error.InvalidState;
}
self.state = .launching;
// ...
}
};
// Repository pattern
pub const DeviceRepository = struct {
fn findAvailable(requirements: Requirements) ![]GpuDevice;
fn getById(id: DeviceId) ?GpuDevice;
fn save(device: GpuDevice) !void;
};
// Domain service
pub const KernelScheduler = struct {
devices: DeviceRepository,
pub fn scheduleKernel(
kernel: Kernel,
preferences: SchedulingPreferences,
) !KernelInstance {
// Find suitable device
const devices = try self.devices.findAvailable(kernel.requirements);
if (devices.len == 0) {
return error.NoSuitableDevice;
}
// Select best device based on preferences
const device = selectBestDevice(devices, preferences);
// Create kernel instance
const instance = KernelInstance{
.id = generateId(),
.kernel = kernel,
.device = device,
.state = .ready,
.launch_config = calculateLaunchConfig(kernel, device),
};
return instance;
}
};
};
pub const ContextIntegration = struct {
// Anti-corruption layer between Actor System and ML domains
pub const MLActorAdapter = struct {
pub fn adaptModelToActor(model: MLDomain.Model) ActorSystemDomain.Actor {
return .{
.id = generateActorId(model.id),
.behavior = modelActorBehavior,
.mailbox = Mailbox.init(),
.state = .{
.model = model,
},
};
}
pub fn adaptMessageToMLCommand(
msg: ActorSystemDomain.Message,
) ?MLCommand {
return switch (msg.payload) {
.train => |params| MLCommand.StartTraining{
.hyperparameters = params.hyperparameters,
.dataset = params.dataset,
},
.infer => |params| MLCommand.RunInference{
.input = params.input,
.batch_size = params.batch_size,
},
else => null,
};
}
};
// Shared kernel between ML and GPU domains
pub const TensorOperations = struct {
// Shared understanding of tensor operations
pub const Operation = enum {
matmul,
conv2d,
attention,
// ...
};
pub fn getKernelForOperation(
op: Operation,
dtype: DataType,
device: GpuDomain.ComputeCapability,
) Kernel {
// Select optimized kernel based on device
// ...
}
};
// Published language for external systems
pub const ExternalAPI = struct {
pub const TrainingRequest = struct {
model_architecture: []const u8,
dataset_path: []const u8,
config: struct {
batch_size: u32,
learning_rate: f32,
epochs: u32,
},
};
pub fn fromExternalRequest(
req: TrainingRequest,
) !MLDomain.TrainingConfig {
// Translate external format to internal domain model
// ...
}
};
};
pub const EventStore = struct {
events: std.ArrayListUnmanaged(DomainEvent),
snapshots: std.AutoHashMap(AggregateId, Snapshot),
pub const DomainEvent = struct {
id: EventId,
timestamp: i64,
aggregate_id: AggregateId,
version: u32,
payload: EventPayload,
};
pub fn append(self: *EventStore, event: DomainEvent) !void {
// Validate event
if (event.version <= self.getLatestVersion(event.aggregate_id)) {
return error.ConcurrencyConflict;
}
try self.events.append(event);
// Publish to subscribers
try EventBus.publish(event);
}
pub fn replay(
self: *EventStore,
aggregate_id: AggregateId,
from_version: ?u32,
) ![]DomainEvent {
var result = ArrayList(DomainEvent).init();
// Start from snapshot if available
if (self.snapshots.get(aggregate_id)) |snapshot| {
if (from_version == null or snapshot.version >= from_version.?) {
// Use snapshot as starting point
from_version = snapshot.version + 1;
}
}
// Replay events
for (self.events.items) |event| {
if (event.aggregate_id == aggregate_id and
event.version >= (from_version orelse 0)) {
try result.append(event);
}
}
return result.toOwnedSlice();
}
};
pub const TrainingSaga = struct {
pub const State = enum {
allocating_resources,
loading_data,
initializing_model,
training,
checkpointing,
completed,
compensating,
failed,
};
state: State,
context: SagaContext,
pub const SagaContext = struct {
session_id: SessionId,
allocated_devices: []DeviceId,
loaded_datasets: []DatasetId,
checkpoint_path: ?[]const u8,
};
pub fn handle(self: *TrainingSaga, event: DomainEvent) !void {
switch (self.state) {
.allocating_resources => {
switch (event) {
.resource_allocated => |e| {
try self.context.allocated_devices.append(e.device_id);
if (self.context.allocated_devices.len >= required_devices) {
self.state = .loading_data;
try self.loadData();
}
},
.resource_allocation_failed => {
self.state = .compensating;
try self.compensate();
},
else => {},
}
},
.training => {
switch (event) {
.epoch_completed => |e| {
if (e.epoch % checkpoint_interval == 0) {
self.state = .checkpointing;
try self.createCheckpoint();
}
},
.training_failed => {
self.state = .compensating;
try self.compensate();
},
else => {},
}
},
.compensating => {
// Rollback actions
for (self.context.allocated_devices) |device| {
try releaseDevice(device);
}
for (self.context.loaded_datasets) |dataset| {
try unloadDataset(dataset);
}
self.state = .failed;
},
// ...
}
}
};
test "training aggregate maintains invariants" {
var run = MLDomain.TrainingRun{
.session = .{
.id = "test_session",
.model = test_model,
.dataset = test_dataset,
.hyperparameters = .{},
.state = .running,
},
.checkpoints = ArrayList(Checkpoint).init(),
.metrics = MetricsHistory.init(),
};
// Should accept valid checkpoint
const cp1 = Checkpoint{
.session_id = "test_session",
.epoch = 1,
.loss = 0.5,
};
try run.addCheckpoint(cp1);
// Should reject non-monotonic checkpoint
const cp2 = Checkpoint{
.session_id = "test_session",
.epoch = 0, // Earlier than cp1
.loss = 0.6,
};
const result = run.addCheckpoint(cp2);
try testing.expectError(error.NonMonotonicCheckpoint, result);
}
test "device capability checking" {
const device = GpuDomain.GpuDevice{
.id = "gpu0",
.name = "RTX 3090",
.capability = .{ .major = 8, .minor = 6 },
.memory = .{
.total = 24 * 1024 * 1024 * 1024,
.available = 20 * 1024 * 1024 * 1024,
},
.health = .healthy,
};
const kernel_with_tensor_cores = Kernel{
.requirements = &[_]Feature{.tensor_cores},
.memory_requirement = 1024 * 1024 * 1024,
};
// Should support tensor cores (8.6 >= 7.0)
try testing.expect(device.canExecute(kernel_with_tensor_cores));
const kernel_with_huge_memory = Kernel{
.requirements = &[_]Feature{},
.memory_requirement = 30 * 1024 * 1024 * 1024,
};
// Should reject due to insufficient memory
try testing.expect(!device.canExecute(kernel_with_huge_memory));
}
test "saga compensation on failure" {
var saga = TrainingSaga{
.state = .training,
.context = .{
.session_id = "test",
.allocated_devices = &[_]DeviceId{"gpu0", "gpu1"},
.loaded_datasets = &[_]DatasetId{"dataset1"},
.checkpoint_path = null,
},
};
// Simulate training failure
try saga.handle(.{ .training_failed = .{
.session_id = "test",
.reason = "OOM",
}});
// Should transition to compensating
try testing.expectEqual(.compensating, saga.state);
// Should clean up resources
// (In real implementation, would verify devices are released)
}