Pipeline 2: Attach a FIFO to H2D
Contents
Pipeline 2: Attach a FIFO to H2D¶
The previous example stalls if the parameter size
exceeds the capacity of
the internal queues. The size of the queue is architecture-dependent. From the
software development point of view, a program should be independent of any
architecture. One solution is to add a FIFO between H2D and @add16
. The FIFO
receives data from H2D and then forwards the data to @add16
. The WSE
provides an efficient design for FIFO. The user just binds two microthreads to
the FIFO: one pushes data into the FIFO, and the other pops the data out. As
long as the parameter size
does not exceed the capacity of the FIFO, H2D can
always push all data into the FIFO even if @add16
cannot process any data.
Once H2D is done, D2H can continue to drain the data out such that @add16
can progress.
To create a FIFO, we use a builtin @allocate_fifo
to bind a normal tensor.
We create two fabric DSDs: one pushes data from MEMCPYH2D_DATA_1
to the
FIFO and the other pops data from the FIFO to the color C1
. Both DSDs must
use different microthreads.
The routing configuration of color C1
is RAMP to RAMP because
1) the FIFO pops data to the router via C1
and
2) @add16
receives data from the router via C1
The disadvantage of this approach is the resource consumption. The FIFO requires two microthreads and a scratch buffer.
The next example will fix this issue.
layout.csl¶
// Color/ task ID map
//
// ID var ID var ID var ID var
// 0 MEMCPYH2D_1 9 C1 18 27 reserved (memcpy)
// 1 MEMCPYD2H_1 10 19 28 reserved (memcpy)
// 2 11 20 29 reserved
// 3 12 21 reserved (memcpy) 30 reserved (memcpy)
// 4 13 22 reserved (memcpy) 31 reserved
// 5 14 23 reserved (memcpy) 32
// 6 15 24 33
// 7 16 25 34
// 8 main_task_id 17 26 35
// Number of elements sent through core program rectangle
param size: i16;
param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;
const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);
const C1: color = @get_color(9);
const memcpy = @import_module("<memcpy/get_params>", .{
.width = 1,
.height = 1,
.MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
.MEMCPYD2H_1 = MEMCPYD2H_DATA_1
});
layout {
@set_rectangle(1, 1);
@set_tile_code(0, 0, "pe_program.csl", .{
.memcpy_params = memcpy.get_params(0),
.size = size,
.C1 = C1
});
// fifo sends out the data via C1 --> tx = RAMP
// add16 receives data via C1 --> rx = RAMP
@set_color_config(0, 0, C1, .{ .routes = .{ .rx = .{ RAMP }, .tx = .{ RAMP }}});
}
pe_program.csl¶
// Introduce a fifo to buffer the data from H2D, so the H2D can
// finish as long as size does not exceed the capacity of the fifo
//
// H2D --> fifo --> C1 --> addh() --> D2H
param memcpy_params: comptime_struct;
param size: i16;
param main: u16;
// Colors
param C1: color;
// Queue IDs
const h2d_1_iq: input_queue = @get_input_queue(2);
const d2h_1_oq: output_queue = @get_output_queue(3);
const C1_iq: input_queue = @get_input_queue(4);
const C1_oq: output_queue = @get_output_queue(5);
// Task IDs
const main_task_id: local_task_id = @get_local_task_id(8);
const sys_mod = @import_module("<memcpy/memcpy>", memcpy_params);
var fifo_buffer = @zeros([1024]i16);
const fifo = @allocate_fifo(fifo_buffer);
const INFINITE_DSD_LEN: u16 = 0x7fff;
const h2d_in_dsd = @get_dsd(fabin_dsd, .{
.extent = INFINITE_DSD_LEN,
.fabric_color = sys_mod.MEMCPYH2D_1,
.input_queue = h2d_1_iq
});
const C1_out_dsd = @get_dsd(fabout_dsd, .{
.extent = INFINITE_DSD_LEN,
.fabric_color = C1,
.output_queue = C1_oq
});
const C1_in_dsd = @get_dsd(fabin_dsd, .{
.extent = size,
.fabric_color = C1,
.input_queue = C1_iq
});
const d2h_out_dsd = @get_dsd(fabout_dsd, .{
.extent = size,
.fabric_color = sys_mod.MEMCPYD2H_1,
.output_queue = d2h_1_oq
});
const buf = [1]i16{ 1 };
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });
task main_task() void {
// Move from the fabric to the FIFO
@mov16(fifo, h2d_in_dsd, .{ .async = true });
// Move from the FIFO to C1
@mov16(C1_out_dsd, fifo, .{ .async = true });
@add16(d2h_out_dsd, C1_in_dsd, one_dsd, .{ .async = true });
}
comptime {
// activate local task main_task_id at startup
@bind_local_task(main_task, main_task_id);
@activate(main_task_id);
// On WSE-3, we must explicitly initialize input and output queues
if (@is_arch("wse3")) {
@initialize_queue(h2d_1_iq, .{ .color = sys_mod.MEMCPYH2D_1 });
@initialize_queue(d2h_1_oq, .{ .color = sys_mod.MEMCPYD2H_1 });
@initialize_queue(C1_iq, .{ .color = C1 });
@initialize_queue(C1_oq, .{ .color = C1 });
}
}
run.py¶
#!/usr/bin/env cs_python
import argparse
import json
import numpy as np
from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module
parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name
# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
# Size of the input and output tensors; use this value when compiling the
# program, e.g. `cslc --params=size:12 --fabric-dims=8,3 --fabric-offsets=4,1`
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")
memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)
runner.load()
runner.run()
# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)
print("step 1: streaming H2D")
# The type of input_tensor is int16, so we need to extend to 32-bit for memcpy H2D
input_tensor_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, input_tensor_u32, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)
print("step 2: streaming D2H")
# The memcpy D2H buffer must be 32-bit
output_tensor_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(output_tensor_u32, MEMCPYD2H_DATA_1, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(output_tensor_u32, np.dtype(np.int16))
runner.stop()
np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")
commands.sh¶
#!/usr/bin/env bash
set -e
cslc --arch=wse3 ./layout.csl --fabric-dims=8,3 \
--fabric-offsets=4,1 --params=size:32 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out