Pipeline 1: Redirect fabin to fabout

Pipeline 1: Redirect fabin to fabout

While wavelet-triggered tasks enable us to receive and operate on one wavelet at a time, the programmer may need a way to receive a tensor comprised of multiple wavelets using one instruction. This is enabled by fabric input DSDs. Similarly, using fabric output DSDs, the programmer can send multiple wavelets using one instruction.

This example illustrates two fabric DSDs, one for input and another for output. Each fabric DSD requires a corresponding color.

Crucially, when using a fabric input DSD, it is important that the programmer blocks the wavelet’s color, as this example does for the color MEMCPYH2D_DATA_1. Otherwise, wavelets of that color will attempt to activate the (empty) task associated with the color, which in turn will consume the wavelet before it can be consumed by the fabric input DSD.

This example only has a single PE, which receives data via H2D and sends it out via D2H in one vector operation. Logically speaking it is NOT valid because H2D and D2H are serialized. The host triggers D2H only if H2D is done. The hardware has some internal queues to hold the data for I/O, so H2D finishes when it pushes all data into the dedicated queues. This example still works if the size does not exceed the capacity of such queues. Otherwise H2D stalls.

The parameter size controls the number of wavelets of H2D and D2H. The program stalls when size exceeds 14.

Such programming paradigm is called pipelined approach: the kernel receives input data without storing it into memory, instead redirecting the result to the output. The microthread is necessary because the CE (compute engine) must have some resources to run memcpy kernel. The kernel stalls if a blocking instruction @add16(outDsd, inDsd, 1) is used. The simulation stalls, and the instruction trace shows @add16 repeatedly querying data from input queue 1, which is still empty. The router receives the H2D command much later than running @add16. The CE has no resource to run the H2D command received by the router, so it stalls.

layout.csl

// resources to route the data between the host and the device.
//

// color/ task ID map
//
//  ID var       ID var         ID var               ID var
//   0 H2D        9             18                   27 reserved (memcpy)
//   1 D2H       10             19                   28 reserved (memcpy)
//   2           11             20                   29 reserved
//   3           12             21 reserved (memcpy) 30 reserved (memcpy)
//   4           13             22 reserved (memcpy) 31 reserved
//   5           14             23 reserved (memcpy) 32
//   6           15             24                   33
//   7           16             25                   34
//   8 main      17             26                   35
//

param size: i16;

param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;

const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);

const main: u16 = 8;

const memcpy = @import_module( "<memcpy/get_params>", .{
    .width = 1,
    .height = 1,
    .MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
    .MEMCPYD2H_1 = MEMCPYD2H_DATA_1
    });

layout {
  @set_rectangle(1, 1);

  @set_tile_code(0, 0, "pe_program.csl", .{
    .size = size,
    .main = main,
    .memcpy_params = memcpy.get_params(0)
  });
}

pe_program.csl

// Not a complete program; the top-level source file is code.csl.

param size: i16;
param main: u16;

param memcpy_params: comptime_struct;

const main_task_id: local_task_id = @get_local_task_id(main);

const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);

const inDsd = @get_dsd(fabin_dsd, .{
  .extent = size,
  .fabric_color = sys_mod.MEMCPYH2D_1,
  .input_queue = @get_input_queue(1),
});

const outDsd = @get_dsd(fabout_dsd, .{
  .extent = size,
  .fabric_color = sys_mod.MEMCPYD2H_1,
  .output_queue = @get_output_queue(1)
});

var buf = @zeros([1]i16);
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });

task mainTask() void {
  // WARNING: large size can stall.
  // H2D and D2H are serialized. It is NOT safe to run "send" and "recv"
  // involving memcpy at the same time on the same PE.
  //
  // It only works for a small vector because the HW has some internal
  // queues to hold those values from/to IO. If such queues are full,
  // I/O stalls.
  //
  // In this case, if the length exceeds certain amount,
  // H2D cannot finish and D2H has no chance to run.

  buf[0] = @as(i16, 1);
  @add16(outDsd, inDsd, one_dsd, .{.async=true});
}

comptime {

  // activate local task mainTask at startup
  @bind_local_task(mainTask, main_task_id);
  @activate(main_task_id);
}

run.py

#!/usr/bin/env cs_python

import argparse
import json
import numpy as np

from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module

parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name

# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
  compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
# Size of the input and output tensors; use this value when compiling the
# program, e.g. `cslc --params=size:12 --fabric-dims=8,3 --fabric-offsets=4,1`
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")

memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)

runner.load()
runner.run()

# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)

print("step 1: streaming H2D")
# "input_tensor" is a 1d array
# The type of input_tensor is int16, we need to extend it to uint32
# There are two kinds of extension when using the utility function input_array_to_u32
#    input_array_to_u32(np_arr: np.ndarray, sentinel: Optional[int], fast_dim_sz: int)
# 1) zero extension:
#    sentinel = None
# 2) upper 16-bit is the index of the array:
#    sentinel is Not None
#
# In this example, the upper 16-bit is don't care because pe_program.csl only uses
# @add16 to reads lower 16-bit
tensors_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, tensors_u32, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)

print("step 2: streaming D2H")
# The D2H buffer must be of type u32
out_tensors_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(out_tensors_u32, MEMCPYD2H_DATA_1, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(out_tensors_u32, np.dtype(np.int16))

runner.stop()

np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")

commands.sh

#!/usr/bin/env bash

set -e

cslc ./layout.csl --fabric-dims=8,3 \
--fabric-offsets=4,1 --params=size:12 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out