zbmd/
├── build.zig # Build configuration
├── build.zig.zon # Dependencies
├── src/
│ ├── zbmd.zig # Public API
│ ├── actor/
│ │ ├── actor.zig
│ │ ├── mailbox.zig
│ │ ├── message.zig
│ │ ├── scheduler.zig
│ │ └── supervisor.zig
│ ├── gpu/
│ │ ├── backend.zig
│ │ ├── cuda/
│ │ ├── rocm/
│ │ ├── vulkan/
│ │ └── kernel.zig
│ ├── ml/
│ │ ├── tensor.zig
│ │ ├── model.zig
│ │ ├── optimizer.zig
│ │ └── training.zig
│ ├── distribution/
│ │ ├── cluster.zig
│ │ ├── gossip.zig
│ │ └── consensus.zig
│ └── utils/
│ ├── allocator.zig
│ ├── bounded.zig
│ └── metrics.zig
├── tests/
│ ├── unit/
│ ├── integration/
│ ├── property/
│ └── benchmarks/
├── examples/
│ ├── hello_actor.zig
│ ├── gpu_training.zig
│ └── distributed_training.zig
└── docs/
└── (this mdBook)
// Implement core actor structure
pub const Actor = struct {
id: ActorId,
mailbox: BoundedQueue(Message, 256),
behavior: *const fn (*Actor, Message) Error!void,
state: ActorState,
reductions: i32,
};
// Basic message passing
pub fn send(from: ActorId, to: ActorId, payload: MessagePayload) !void {
const actor = registry.get(to) orelse return error.ActorNotFound;
try actor.mailbox.push(.{
.from = from,
.to = to,
.payload = payload,
});
}
// Tests
test "basic actor creation and messaging" {
var system = try ActorSystem.init(.{});
const actor = try system.spawn(echoActor, .{});
try actor.send(.{ .data = "hello" });
const reply = try actor.receive(100);
try testing.expectEqualStrings("hello", reply.data);
}
// Work-stealing scheduler
pub const Scheduler = struct {
workers: []Worker,
run_queues: []RunQueue,
pub fn init(worker_count: u32) !Scheduler {
// Initialize per-worker queues
// Start worker threads
// Set CPU affinity
}
pub fn schedule(self: *Scheduler, actor: *Actor) !void {
// Add to least loaded queue
// Wake idle workers
}
};
// Reduction-based preemption
pub fn executeActor(actor: *Actor) !void {
actor.reductions = 2000;
while (actor.reductions > 0) {
if (try actor.mailbox.pop()) |msg| {
try actor.behavior(actor, msg);
actor.reductions -= 1;
} else break;
}
}
// Supervisor implementation
pub const Supervisor = struct {
actor: Actor,
children: BoundedArray(Child, 100),
strategy: RestartStrategy,
pub fn startChild(self: *Supervisor, spec: ChildSpec) !ActorId {
const child = try self.actor.system.spawn(spec.behavior, .{
.supervisor = self.actor.id,
});
try self.children.append(.{
.id = child,
.spec = spec,
.state = .running,
});
return child;
}
pub fn handleChildExit(self: *Supervisor, child: ActorId, reason: ExitReason) !void {
switch (self.strategy) {
.one_for_one => try self.restartChild(child),
.one_for_all => try self.restartAllChildren(),
.rest_for_one => try self.restartRestForOne(child),
}
}
};
// Unified GPU interface
pub const GpuBackend = union(enum) {
cuda: CudaBackend,
rocm: RocmBackend,
vulkan: VulkanBackend,
cpu: CpuBackend,
pub fn detect() !GpuBackend {
if (cuda.isAvailable()) return .{ .cuda = try CudaBackend.init() };
if (rocm.isAvailable()) return .{ .rocm = try RocmBackend.init() };
if (vulkan.isAvailable()) return .{ .vulkan = try VulkanBackend.init() };
return .{ .cpu = CpuBackend.init() };
}
};
// Link with CUDA libraries
const cuda = @cImport({
@cInclude("cuda_runtime.h");
@cInclude("cublas_v2.h");
@cInclude("cudnn.h");
});
pub const CudaBackend = struct {
device: c_int,
context: cuda.CUcontext,
pub fn init() !CudaBackend {
if (cuda.cudaGetDeviceCount() == 0) {
return error.NoCudaDevices;
}
const device = 0;
try checkCuda(cuda.cudaSetDevice(device));
return .{
.device = device,
.context = null,
};
}
pub fn allocate(self: *CudaBackend, size: usize) !DevicePtr {
var ptr: ?*anyopaque = null;
try checkCuda(cuda.cudaMalloc(&ptr, size));
return @ptrToInt(ptr.?);
}
fn checkCuda(err: cuda.cudaError_t) !void {
if (err != cuda.cudaSuccess) {
return error.CudaError;
}
}
};
// JIT kernel compilation
pub const KernelCompiler = struct {
nvrtc: *cuda.nvrtcProgram,
pub fn compileCuda(source: []const u8, options: CompileOptions) ![]const u8 {
// Create program
var prog: cuda.nvrtcProgram = undefined;
try checkNvrtc(cuda.nvrtcCreateProgram(
&prog,
source.ptr,
"kernel.cu",
0,
null,
null,
));
defer cuda.nvrtcDestroyProgram(&prog);
// Compile
const opts = [_][*c]const u8{
"--gpu-architecture=compute_80",
"--use_fast_math",
};
try checkNvrtc(cuda.nvrtcCompileProgram(prog, opts.len, &opts));
// Get PTX
var ptx_size: usize = 0;
try checkNvrtc(cuda.nvrtcGetPTXSize(prog, &ptx_size));
const ptx = try allocator.alloc(u8, ptx_size);
try checkNvrtc(cuda.nvrtcGetPTX(prog, ptx.ptr));
return ptx;
}
};
pub const TensorActor = struct {
actor: Actor,
tensor: Tensor,
device: GpuBackend,
pub fn behavior(self: *Actor, msg: Message) !void {
const tensor_actor = @fieldParentPtr(TensorActor, "actor", self);
switch (msg.payload) {
.matmul => |args| {
const result = try tensor_actor.matmul(args.other);
try msg.reply(.{ .tensor = result });
},
.add => |args| {
try tensor_actor.add(args.other);
try msg.reply(.{ .success = true });
},
.migrate => |target| {
try tensor_actor.migrateTo(target);
try msg.reply(.{ .success = true });
},
}
}
fn matmul(self: *TensorActor, other: TensorActor) !Tensor {
// Use cuBLAS/rocBLAS
switch (self.device) {
.cuda => |*cuda| {
const handle = try cuda.getCublasHandle();
try cublas.gemm(
handle,
self.tensor.data,
other.tensor.data,
result.data,
);
},
.rocm => |*rocm| {
// Similar for ROCm
},
else => {
// CPU fallback
},
}
return result;
}
};
pub const ModelActor = struct {
actor: Actor,
layers: []LayerActor,
optimizer: OptimizerActor,
pub fn forward(self: *ModelActor, input: Tensor) !Tensor {
var current = input;
for (self.layers) |layer| {
current = try layer.call(.{
.forward = .{ .input = current }
}, 1000);
}
return current;
}
pub fn backward(self: *ModelActor, loss: Tensor) !void {
var grad = loss;
// Reverse through layers
var i = self.layers.len;
while (i > 0) {
i -= 1;
grad = try self.layers[i].call(.{
.backward = .{ .grad = grad }
}, 1000);
}
// Update with optimizer
try self.optimizer.send(.{
.step = .{ .gradients = collected_grads }
});
}
};
pub const TrainingSupervisor = struct {
supervisor: Supervisor,
model: ModelActor,
dataloader: DataloaderActor,
pub fn train(self: *TrainingSupervisor, config: TrainingConfig) !void {
for (0..config.epochs) |epoch| {
var batch_iter = try self.dataloader.getBatches();
while (try batch_iter.next()) |batch| {
// Forward pass
const output = try self.model.forward(batch.inputs);
// Compute loss
const loss = try computeLoss(output, batch.targets);
// Backward pass
try self.model.backward(loss);
// Checkpoint periodically
if (batch.id % config.checkpoint_interval == 0) {
try self.checkpoint();
}
}
try self.onEpochComplete(epoch);
}
}
fn handleFailure(self: *TrainingSupervisor, failure: Failure) !void {
switch (failure.type) {
.gpu_oom => {
// Reduce batch size and retry
self.dataloader.batch_size /= 2;
try self.retry();
},
.gpu_error => {
// Migrate to different GPU
const new_device = try selectHealthyDevice();
try self.model.migrateTo(new_device);
try self.resumeFromCheckpoint();
},
else => {
// Let supervisor handle
return failure;
},
}
}
};
pub const ClusterNode = struct {
id: NodeId,
actors: ActorRegistry,
peers: []Peer,
pub fn join(self: *ClusterNode, seeds: []Address) !void {
// Gossip protocol for discovery
for (seeds) |seed| {
try self.connectToPeer(seed);
}
// Exchange actor registries
try self.broadcastActorList();
// Start failure detection
try self.startHeartbeat();
}
pub fn routeMessage(self: *ClusterNode, msg: Message) !void {
const target_node = self.findNodeForActor(msg.to);
if (target_node == self.id) {
// Local delivery
try self.actors.deliver(msg);
} else {
// Remote delivery
const peer = self.getPeer(target_node);
try peer.send(msg);
}
}
};
pub const DistributedTraining = struct {
nodes: []ClusterNode,
model_replicas: []ModelActor,
pub fn dataParallel(self: *DistributedTraining, batch: Batch) !void {
// Split batch across nodes
const mini_batches = try splitBatch(batch, self.nodes.len);
// Parallel forward/backward
var futures = ArrayList(Future).init();
for (self.model_replicas, mini_batches) |replica, mb| {
const future = try replica.callAsync(.{
.train_step = .{ .batch = mb }
});
try futures.append(future);
}
// Wait for completion
for (futures.items) |f| {
_ = try f.await(5000);
}
// All-reduce gradients
try self.allReduceGradients();
}
fn allReduceGradients(self: *DistributedTraining) !void {
// Ring all-reduce implementation
const n = self.nodes.len;
// Each node sends to next in ring
for (0..n-1) |round| {
for (self.nodes, 0..) |node, i| {
const next = (i + 1) % n;
const chunk = self.getGradientChunk(i, round);
try node.send(self.nodes[next], chunk);
}
// Aggregate received chunks
for (self.nodes) |node| {
const received = try node.receive();
try node.aggregateGradients(received);
}
}
}
};
pub const MetricsCollector = struct {
counters: std.StringHashMap(AtomicU64),
histograms: std.StringHashMap(Histogram),
pub fn recordLatency(self: *MetricsCollector, name: []const u8, ns: u64) !void {
const hist = try self.histograms.getOrPut(name);
if (!hist.found_existing) {
hist.value_ptr.* = Histogram.init();
}
hist.value_ptr.record(ns);
}
pub fn exportPrometheus(self: *MetricsCollector) ![]const u8 {
var buffer = ArrayList(u8).init();
// Export counters
// NOTE: This is pseudo-code for illustration.
// In Zig 0.15.x, you would need to provide an explicit buffer to writer()
var iter = self.counters.iterator();
while (iter.next()) |entry| {
try buffer.writer().print("# TYPE {} counter\n", .{entry.key_ptr.*});
try buffer.writer().print("{} {}\n", .{
entry.key_ptr.*,
entry.value_ptr.load(.acquire)
});
}
// Export histograms
// ...
return buffer.toOwnedSlice();
}
};
pub const ModelServer = struct {
model: ModelActor,
batch_manager: BatchManager,
pub fn serve(self: *ModelServer, port: u16) !void {
const server = try std.net.StreamServer.init(.{
.reuse_address = true,
});
defer server.deinit();
const addr = try std.net.Address.parseIp("0.0.0.0", port);
try server.listen(addr);
while (true) {
const conn = try server.accept();
// Spawn actor for connection
_ = try self.system.spawn(connectionHandler, .{
.conn = conn,
.model = self.model,
.batch_manager = self.batch_manager,
});
}
}
fn connectionHandler(self: *Actor, msg: Message) !void {
// Parse inference request
const request = try parseRequest(msg.payload.data);
// Add to batch
const batch_future = try self.batch_manager.addRequest(request);
// Wait for batch processing
const result = try batch_future.await(request.timeout);
// Send response
try sendResponse(msg.payload.conn, result);
}
};
- Week 1-2: Core actor system
- Week 3: Basic scheduler
- Week 4: Supervision trees
- Week 5-6: Backend abstraction
- Week 7: CUDA integration
- Week 8: Kernel compilation
- Week 9-10: Tensor operations
- Week 11: Model actors
- Week 12: Training supervisor
- Week 13-14: Cluster management
- Week 15: Distributed training
- Week 16: Consensus & fault tolerance
- Week 17: Monitoring/metrics
- Week 18: Model serving
- Week 19: Performance optimization
- Week 20: Documentation & examples
// Continuous testing throughout development
test "integration: end-to-end training" {
var system = try ActorSystem.init(.{});
defer system.deinit();
// Create training supervisor
const trainer = try system.spawn(TrainingSupervisor, .{
.model_config = simple_mlp,
.dataset = mnist,
.batch_size = 32,
});
// Start training
try trainer.send(.{
.train = .{
.epochs = 10,
.learning_rate = 0.001,
}
});
// Wait for completion
const result = try trainer.call(.{ .get_status = {} }, 60_000);
try testing.expect(result.status == .completed);
try testing.expect(result.final_loss < 0.1);
}
test "fault tolerance: GPU failure during training" {
var system = try ActorSystem.init(.{
.fault_injection = .{
.gpu_failure_rate = 0.1,
},
});
defer system.deinit();
const trainer = try createTrainer(system);
// Training should complete despite failures
const result = try trainer.train(test_config);
try testing.expect(result.completed);
try testing.expect(result.recovery_count > 0);
}