Pipeline 1: Redirect fabin to fabout
Contents
Pipeline 1: Redirect fabin to fabout¶
While wavelet-triggered tasks enable us to receive and operate on one wavelet at a time, the programmer may need a way to receive a tensor comprised of multiple wavelets using one instruction. This is enabled by fabric input DSDs. Similarly, using fabric output DSDs, the programmer can send multiple wavelets using one instruction.
This example illustrates two fabric DSDs, one for input and another for output. Each fabric DSD requires a corresponding color.
Crucially, when using a fabric input DSD, it is important that the programmer
blocks the wavelet’s color, as this example does for the color
MEMCPYH2D_DATA_1
.
Otherwise, wavelets of that color will attempt to activate the (empty) task
associated with the color, which in turn will consume the wavelet before it can
be consumed by the fabric input DSD.
This example only has a single PE, which receives data via H2D and sends it out via D2H in one vector operation. Logically speaking it is NOT valid because H2D and D2H are serialized. The host triggers D2H only if H2D is done. The hardware has some internal queues to hold the data for I/O, so H2D finishes when it pushes all data into the dedicated queues. This example still works if the size does not exceed the capacity of such queues. Otherwise H2D stalls.
The parameter size
controls the number of wavelets of H2D and D2H. The
program stalls when size
exceeds 14.
Such programming paradigm is called pipelined approach: the kernel receives
input data without storing it into memory, instead redirecting the result to
the output. The microthread is necessary because the CE (compute engine) must
have some resources to run memcpy
kernel. The kernel stalls if a blocking
instruction @add16(outDsd, inDsd, 1)
is used. The simulation stalls, and
the instruction trace shows @add16
repeatedly querying data from input
queue 1, which is still empty. The router receives the H2D command much later
than running @add16
. The CE has no resource to run the H2D command received
by the router, so it stalls.
layout.csl¶
// Color/ task ID map
//
// ID var ID var ID var ID var
// 0 MEMCPYH2D_1 9 18 27 reserved (memcpy)
// 1 MEMCPYD2H_1 10 19 28 reserved (memcpy)
// 2 11 20 29 reserved
// 3 12 21 reserved (memcpy) 30 reserved (memcpy)
// 4 13 22 reserved (memcpy) 31 reserved
// 5 14 23 reserved (memcpy) 32
// 6 15 24 33
// 7 16 25 34
// 8 main_task_id 17 26 35
param size: i16;
param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;
const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);
const memcpy = @import_module("<memcpy/get_params>", .{
.width = 1,
.height = 1,
.MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
.MEMCPYD2H_1 = MEMCPYD2H_DATA_1
});
layout {
@set_rectangle(1, 1);
@set_tile_code(0, 0, "pe_program.csl", .{
.size = size,
.memcpy_params = memcpy.get_params(0)
});
}
pe_program.csl¶
param memcpy_params: comptime_struct;
// number of elements received from host
param size: i16;
const sys_mod = @import_module("<memcpy/memcpy>", memcpy_params);
// Queues
const h2d_1_iq: input_queue = @get_input_queue(2);
const d2h_1_oq: output_queue = @get_output_queue(3);
// Task IDs
const main_task_id: local_task_id = @get_local_task_id(8);
const in_dsd = @get_dsd(fabin_dsd, .{
.extent = size,
.fabric_color = sys_mod.MEMCPYH2D_1,
.input_queue = h2d_1_iq
});
const out_dsd = @get_dsd(fabout_dsd, .{
.extent = size,
.fabric_color = sys_mod.MEMCPYD2H_1,
.output_queue = d2h_1_oq
});
var buf = @zeros([1]i16);
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });
task main_task() void {
// WARNING: large size can stall.
// H2D and D2H are serialized. It is NOT safe to run "send" and "recv"
// involving memcpy at the same time on the same PE.
//
// It only works for a small vector because the HW has some internal
// queues to hold those values from/to IO. If such queues are full,
// I/O stalls.
//
// In this case, if the length exceeds certain amount,
// H2D cannot finish and D2H has no chance to run.
buf[0] = @as(i16, 1);
@add16(out_dsd, in_dsd, one_dsd, .{ .async = true });
}
comptime {
// activate local task main_task at startup
@activate(main_task_id);
@bind_local_task(main_task, main_task_id);
// On WSE-3, we must explicitly initialize input and output queues
if (@is_arch("wse3")) {
@initialize_queue(h2d_1_iq, .{ .color = sys_mod.MEMCPYH2D_1 });
@initialize_queue(d2h_1_oq, .{ .color = sys_mod.MEMCPYD2H_1 });
}
}
run.py¶
#!/usr/bin/env cs_python
import argparse
import json
import numpy as np
from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module
parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name
# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")
memcpy_dtype = MemcpyDataType.MEMCPY_32BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)
runner.load()
runner.run()
# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)
print("step 1: streaming H2D")
# The type of input_tensor is int16, so we need to extend to 32-bit for memcpy H2D
input_tensor_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, input_tensor_u32, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)
print("step 2: streaming D2H")
# The memcpy D2H buffer must be 32-bit
output_tensor_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(output_tensor_u32, MEMCPYD2H_DATA_1, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(output_tensor_u32, np.dtype(np.int16))
runner.stop()
np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")
commands.sh¶
#!/usr/bin/env bash
set -e
cslc --arch=wse2 ./layout.csl --fabric-dims=8,3 \
--fabric-offsets=4,1 --params=size:12 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out