Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 9: Implementation Roadmap

Project Structure


zbmd/
├── build.zig # Build configuration
├── build.zig.zon # Dependencies
├── src/
│ ├── zbmd.zig # Public API
│ ├── actor/
│ │ ├── actor.zig
│ │ ├── mailbox.zig
│ │ ├── message.zig
│ │ ├── scheduler.zig
│ │ └── supervisor.zig
│ ├── gpu/
│ │ ├── backend.zig
│ │ ├── cuda/
│ │ ├── rocm/
│ │ ├── vulkan/
│ │ └── kernel.zig
│ ├── ml/
│ │ ├── tensor.zig
│ │ ├── model.zig
│ │ ├── optimizer.zig
│ │ └── training.zig
│ ├── distribution/
│ │ ├── cluster.zig
│ │ ├── gossip.zig
│ │ └── consensus.zig
│ └── utils/
│ ├── allocator.zig
│ ├── bounded.zig
│ └── metrics.zig
├── tests/
│ ├── unit/
│ ├── integration/
│ ├── property/
│ └── benchmarks/
├── examples/
│ ├── hello_actor.zig
│ ├── gpu_training.zig
│ └── distributed_training.zig
└── docs/
└── (this mdBook)

Phase 1: Core Actor System (Weeks 1-3)

Milestone 1.1: Basic Actor Infrastructure

// Implement core actor structure
pub const Actor = struct {
    id: ActorId,
    mailbox: BoundedQueue(Message, 256),
    behavior: *const fn (*Actor, Message) Error!void,
    state: ActorState,
    reductions: i32,
};

// Basic message passing
pub fn send(from: ActorId, to: ActorId, payload: MessagePayload) !void {
    const actor = registry.get(to) orelse return error.ActorNotFound;
    try actor.mailbox.push(.{
        .from = from,
        .to = to,
        .payload = payload,
    });
}

// Tests
test "basic actor creation and messaging" {
    var system = try ActorSystem.init(.{});
    const actor = try system.spawn(echoActor, .{});

    try actor.send(.{ .data = "hello" });
    const reply = try actor.receive(100);
    try testing.expectEqualStrings("hello", reply.data);
}

Milestone 1.2: Scheduler Implementation

// Work-stealing scheduler
pub const Scheduler = struct {
    workers: []Worker,
    run_queues: []RunQueue,

    pub fn init(worker_count: u32) !Scheduler {
        // Initialize per-worker queues
        // Start worker threads
        // Set CPU affinity
    }

    pub fn schedule(self: *Scheduler, actor: *Actor) !void {
        // Add to least loaded queue
        // Wake idle workers
    }
};

// Reduction-based preemption
pub fn executeActor(actor: *Actor) !void {
    actor.reductions = 2000;
    while (actor.reductions > 0) {
        if (try actor.mailbox.pop()) |msg| {
            try actor.behavior(actor, msg);
            actor.reductions -= 1;
        } else break;
    }
}

Milestone 1.3: Supervision Trees

// Supervisor implementation
pub const Supervisor = struct {
    actor: Actor,
    children: BoundedArray(Child, 100),
    strategy: RestartStrategy,

    pub fn startChild(self: *Supervisor, spec: ChildSpec) !ActorId {
        const child = try self.actor.system.spawn(spec.behavior, .{
            .supervisor = self.actor.id,
        });
        try self.children.append(.{
            .id = child,
            .spec = spec,
            .state = .running,
        });
        return child;
    }

    pub fn handleChildExit(self: *Supervisor, child: ActorId, reason: ExitReason) !void {
        switch (self.strategy) {
            .one_for_one => try self.restartChild(child),
            .one_for_all => try self.restartAllChildren(),
            .rest_for_one => try self.restartRestForOne(child),
        }
    }
};

Phase 2: GPU Backend (Weeks 4-7)

Milestone 2.1: Backend Abstraction

// Unified GPU interface
pub const GpuBackend = union(enum) {
    cuda: CudaBackend,
    rocm: RocmBackend,
    vulkan: VulkanBackend,
    cpu: CpuBackend,

    pub fn detect() !GpuBackend {
        if (cuda.isAvailable()) return .{ .cuda = try CudaBackend.init() };
        if (rocm.isAvailable()) return .{ .rocm = try RocmBackend.init() };
        if (vulkan.isAvailable()) return .{ .vulkan = try VulkanBackend.init() };
        return .{ .cpu = CpuBackend.init() };
    }
};

Milestone 2.2: CUDA Integration

// Link with CUDA libraries
const cuda = @cImport({
    @cInclude("cuda_runtime.h");
    @cInclude("cublas_v2.h");
    @cInclude("cudnn.h");
});

pub const CudaBackend = struct {
    device: c_int,
    context: cuda.CUcontext,

    pub fn init() !CudaBackend {
        if (cuda.cudaGetDeviceCount() == 0) {
            return error.NoCudaDevices;
        }

        const device = 0;
        try checkCuda(cuda.cudaSetDevice(device));

        return .{
            .device = device,
            .context = null,
        };
    }

    pub fn allocate(self: *CudaBackend, size: usize) !DevicePtr {
        var ptr: ?*anyopaque = null;
        try checkCuda(cuda.cudaMalloc(&ptr, size));
        return @ptrToInt(ptr.?);
    }

    fn checkCuda(err: cuda.cudaError_t) !void {
        if (err != cuda.cudaSuccess) {
            return error.CudaError;
        }
    }
};

Milestone 2.3: Kernel Compilation

// JIT kernel compilation
pub const KernelCompiler = struct {
    nvrtc: *cuda.nvrtcProgram,

    pub fn compileCuda(source: []const u8, options: CompileOptions) ![]const u8 {
        // Create program
        var prog: cuda.nvrtcProgram = undefined;
        try checkNvrtc(cuda.nvrtcCreateProgram(
            &prog,
            source.ptr,
            "kernel.cu",
            0,
            null,
            null,
        ));
        defer cuda.nvrtcDestroyProgram(&prog);

        // Compile
        const opts = [_][*c]const u8{
            "--gpu-architecture=compute_80",
            "--use_fast_math",
        };
        try checkNvrtc(cuda.nvrtcCompileProgram(prog, opts.len, &opts));

        // Get PTX
        var ptx_size: usize = 0;
        try checkNvrtc(cuda.nvrtcGetPTXSize(prog, &ptx_size));

        const ptx = try allocator.alloc(u8, ptx_size);
        try checkNvrtc(cuda.nvrtcGetPTX(prog, ptx.ptr));

        return ptx;
    }
};

Phase 3: ML Primitives (Weeks 8-11)

Milestone 3.1: Tensor Actors

pub const TensorActor = struct {
    actor: Actor,
    tensor: Tensor,
    device: GpuBackend,

    pub fn behavior(self: *Actor, msg: Message) !void {
        const tensor_actor = @fieldParentPtr(TensorActor, "actor", self);

        switch (msg.payload) {
            .matmul => |args| {
                const result = try tensor_actor.matmul(args.other);
                try msg.reply(.{ .tensor = result });
            },
            .add => |args| {
                try tensor_actor.add(args.other);
                try msg.reply(.{ .success = true });
            },
            .migrate => |target| {
                try tensor_actor.migrateTo(target);
                try msg.reply(.{ .success = true });
            },
        }
    }

    fn matmul(self: *TensorActor, other: TensorActor) !Tensor {
        // Use cuBLAS/rocBLAS
        switch (self.device) {
            .cuda => |*cuda| {
                const handle = try cuda.getCublasHandle();
                try cublas.gemm(
                    handle,
                    self.tensor.data,
                    other.tensor.data,
                    result.data,
                );
            },
            .rocm => |*rocm| {
                // Similar for ROCm
            },
            else => {
                // CPU fallback
            },
        }
        return result;
    }
};

Milestone 3.2: Model Actor

pub const ModelActor = struct {
    actor: Actor,
    layers: []LayerActor,
    optimizer: OptimizerActor,

    pub fn forward(self: *ModelActor, input: Tensor) !Tensor {
        var current = input;

        for (self.layers) |layer| {
            current = try layer.call(.{
                .forward = .{ .input = current }
            }, 1000);
        }

        return current;
    }

    pub fn backward(self: *ModelActor, loss: Tensor) !void {
        var grad = loss;

        // Reverse through layers
        var i = self.layers.len;
        while (i > 0) {
            i -= 1;
            grad = try self.layers[i].call(.{
                .backward = .{ .grad = grad }
            }, 1000);
        }

        // Update with optimizer
        try self.optimizer.send(.{
            .step = .{ .gradients = collected_grads }
        });
    }
};

Milestone 3.3: Training Supervisor

pub const TrainingSupervisor = struct {
    supervisor: Supervisor,
    model: ModelActor,
    dataloader: DataloaderActor,

    pub fn train(self: *TrainingSupervisor, config: TrainingConfig) !void {
        for (0..config.epochs) |epoch| {
            var batch_iter = try self.dataloader.getBatches();

            while (try batch_iter.next()) |batch| {
                // Forward pass
                const output = try self.model.forward(batch.inputs);

                // Compute loss
                const loss = try computeLoss(output, batch.targets);

                // Backward pass
                try self.model.backward(loss);

                // Checkpoint periodically
                if (batch.id % config.checkpoint_interval == 0) {
                    try self.checkpoint();
                }
            }

            try self.onEpochComplete(epoch);
        }
    }

    fn handleFailure(self: *TrainingSupervisor, failure: Failure) !void {
        switch (failure.type) {
            .gpu_oom => {
                // Reduce batch size and retry
                self.dataloader.batch_size /= 2;
                try self.retry();
            },
            .gpu_error => {
                // Migrate to different GPU
                const new_device = try selectHealthyDevice();
                try self.model.migrateTo(new_device);
                try self.resumeFromCheckpoint();
            },
            else => {
                // Let supervisor handle
                return failure;
            },
        }
    }
};

Phase 4: Distribution (Weeks 12-15)

Milestone 4.1: Cluster Management

pub const ClusterNode = struct {
    id: NodeId,
    actors: ActorRegistry,
    peers: []Peer,

    pub fn join(self: *ClusterNode, seeds: []Address) !void {
        // Gossip protocol for discovery
        for (seeds) |seed| {
            try self.connectToPeer(seed);
        }

        // Exchange actor registries
        try self.broadcastActorList();

        // Start failure detection
        try self.startHeartbeat();
    }

    pub fn routeMessage(self: *ClusterNode, msg: Message) !void {
        const target_node = self.findNodeForActor(msg.to);

        if (target_node == self.id) {
            // Local delivery
            try self.actors.deliver(msg);
        } else {
            // Remote delivery
            const peer = self.getPeer(target_node);
            try peer.send(msg);
        }
    }
};

Milestone 4.2: Distributed Training

pub const DistributedTraining = struct {
    nodes: []ClusterNode,
    model_replicas: []ModelActor,

    pub fn dataParallel(self: *DistributedTraining, batch: Batch) !void {
        // Split batch across nodes
        const mini_batches = try splitBatch(batch, self.nodes.len);

        // Parallel forward/backward
        var futures = ArrayList(Future).init();
        for (self.model_replicas, mini_batches) |replica, mb| {
            const future = try replica.callAsync(.{
                .train_step = .{ .batch = mb }
            });
            try futures.append(future);
        }

        // Wait for completion
        for (futures.items) |f| {
            _ = try f.await(5000);
        }

        // All-reduce gradients
        try self.allReduceGradients();
    }

    fn allReduceGradients(self: *DistributedTraining) !void {
        // Ring all-reduce implementation
        const n = self.nodes.len;

        // Each node sends to next in ring
        for (0..n-1) |round| {
            for (self.nodes, 0..) |node, i| {
                const next = (i + 1) % n;
                const chunk = self.getGradientChunk(i, round);
                try node.send(self.nodes[next], chunk);
            }

            // Aggregate received chunks
            for (self.nodes) |node| {
                const received = try node.receive();
                try node.aggregateGradients(received);
            }
        }
    }
};

Phase 5: Production Features (Weeks 16-20)

Milestone 5.1: Monitoring & Metrics

pub const MetricsCollector = struct {
    counters: std.StringHashMap(AtomicU64),
    histograms: std.StringHashMap(Histogram),

    pub fn recordLatency(self: *MetricsCollector, name: []const u8, ns: u64) !void {
        const hist = try self.histograms.getOrPut(name);
        if (!hist.found_existing) {
            hist.value_ptr.* = Histogram.init();
        }
        hist.value_ptr.record(ns);
    }

    pub fn exportPrometheus(self: *MetricsCollector) ![]const u8 {
        var buffer = ArrayList(u8).init();

        // Export counters
        // NOTE: This is pseudo-code for illustration.
        // In Zig 0.15.x, you would need to provide an explicit buffer to writer()
        var iter = self.counters.iterator();
        while (iter.next()) |entry| {
            try buffer.writer().print("# TYPE {} counter\n", .{entry.key_ptr.*});
            try buffer.writer().print("{} {}\n", .{
                entry.key_ptr.*,
                entry.value_ptr.load(.acquire)
            });
        }

        // Export histograms
        // ...

        return buffer.toOwnedSlice();
    }
};

Milestone 5.2: Model Serving

pub const ModelServer = struct {
    model: ModelActor,
    batch_manager: BatchManager,

    pub fn serve(self: *ModelServer, port: u16) !void {
        const server = try std.net.StreamServer.init(.{
            .reuse_address = true,
        });
        defer server.deinit();

        const addr = try std.net.Address.parseIp("0.0.0.0", port);
        try server.listen(addr);

        while (true) {
            const conn = try server.accept();

            // Spawn actor for connection
            _ = try self.system.spawn(connectionHandler, .{
                .conn = conn,
                .model = self.model,
                .batch_manager = self.batch_manager,
            });
        }
    }

    fn connectionHandler(self: *Actor, msg: Message) !void {
        // Parse inference request
        const request = try parseRequest(msg.payload.data);

        // Add to batch
        const batch_future = try self.batch_manager.addRequest(request);

        // Wait for batch processing
        const result = try batch_future.await(request.timeout);

        // Send response
        try sendResponse(msg.payload.conn, result);
    }
};

Development Timeline

Month 1: Foundation

  • Week 1-2: Core actor system
  • Week 3: Basic scheduler
  • Week 4: Supervision trees

Month 2: GPU Support

  • Week 5-6: Backend abstraction
  • Week 7: CUDA integration
  • Week 8: Kernel compilation

Month 3: ML Framework

  • Week 9-10: Tensor operations
  • Week 11: Model actors
  • Week 12: Training supervisor

Month 4: Distribution

  • Week 13-14: Cluster management
  • Week 15: Distributed training
  • Week 16: Consensus & fault tolerance

Month 5: Production

  • Week 17: Monitoring/metrics
  • Week 18: Model serving
  • Week 19: Performance optimization
  • Week 20: Documentation & examples

Testing Strategy

// Continuous testing throughout development
test "integration: end-to-end training" {
    var system = try ActorSystem.init(.{});
    defer system.deinit();

    // Create training supervisor
    const trainer = try system.spawn(TrainingSupervisor, .{
        .model_config = simple_mlp,
        .dataset = mnist,
        .batch_size = 32,
    });

    // Start training
    try trainer.send(.{
        .train = .{
            .epochs = 10,
            .learning_rate = 0.001,
        }
    });

    // Wait for completion
    const result = try trainer.call(.{ .get_status = {} }, 60_000);

    try testing.expect(result.status == .completed);
    try testing.expect(result.final_loss < 0.1);
}

test "fault tolerance: GPU failure during training" {
    var system = try ActorSystem.init(.{
        .fault_injection = .{
            .gpu_failure_rate = 0.1,
        },
    });
    defer system.deinit();

    const trainer = try createTrainer(system);

    // Training should complete despite failures
    const result = try trainer.train(test_config);

    try testing.expect(result.completed);
    try testing.expect(result.recovery_count > 0);
}

Success Criteria

  • 1M+ messages/second throughput
  • < 1μs message latency
  • < 1% overhead vs native CUDA
  • 99.999% training uptime
  • < 1 second recovery from GPU failure
  • Linear scaling to 100+ GPUs
  • Zero data loss on failures
  • Compatible with PyTorch models
  • Single binary deployment
  • < 100MB binary size