Pipeline 2: Attach a FIFO to H2D

Pipeline 2: Attach a FIFO to H2D

The previous example stalls if the parameter size exceeds the capacity of the internal queues. The size of the queue is architecture-dependent. From the software development point of view, a program should be independent of any architecture. One solution is to add a FIFO between H2D and @add16. The FIFO receives data from H2D and then forwards the data to @add16. The WSE provides an efficient design for FIFO. The user just binds two microthreads to the FIFO: one pushes data into the FIFO, and the other pops the data out. As long as the parameter size does not exceed the capacity of the FIFO, H2D can always push all data into the FIFO even if @add16 cannot process any data. Once H2D is done, D2H can continue to drain the data out such that @add16 can progress.

To create a FIFO, we use a builtin @allocate_fifo to bind a normal tensor. We create two fabric DSDs: one pushes data from MEMCPYH2D_DATA_1 to the FIFO and the other pops data from the FIFO to the color C1. Both DSDs must use different microthreads.

The routing configuration of color C1 is RAMP to RAMP because 1) the FIFO pops data to the router via C1 and 2) @add16 receives data from the router via C1

We also block the color C1 because @add16 receives data via microthread 3.

The disadvantage of this approach is the resource consumption. The FIFO requires two microthreads and a scratch buffer.

The next example will fix this issue.

layout.csl

// resources to route the data between the host and the device.
//

// color/ task ID map
//
//  ID var       ID var         ID var               ID var
//   0 H2D        9  C1         18                   27 reserved (memcpy)
//   1 D2H       10             19                   28 reserved (memcpy)
//   2           11             20                   29 reserved
//   3           12             21 reserved (memcpy) 30 reserved (memcpy)
//   4           13             22 reserved (memcpy) 31 reserved
//   5           14             23 reserved (memcpy) 32
//   6           15             24                   33
//   7           16             25                   34
//   8 main      17             26                   35
//

param size: i16;

param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;

const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);

const main: u16 = 8;
const C1: color = @get_color(9);

const memcpy = @import_module( "<memcpy/get_params>", .{
    .width = 1,
    .height = 1,
    .MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
    .MEMCPYD2H_1 = MEMCPYD2H_DATA_1
    });

layout {
  @set_rectangle(1, 1);

  @set_tile_code(0, 0, "pe_program.csl", .{
    .size = size,
    .main = main,
    .C1 = C1,
    .memcpy_params = memcpy.get_params(0)
  });
}

pe_program.csl

// Not a complete program; the top-level source file is code.csl.

//
// Introduce a fifo to buffer the data from H2D, so the H2D can
// finish as long as size does not exceed the capacity of the fifo
//
// H2D --> fifo --> C1 --> addh() --> D2H
//

param size: i16;
param main: u16;
param C1: color;

param memcpy_params: comptime_struct;

const main_task_id: local_task_id = @get_local_task_id(main);

const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);

var fifo_buffer = @zeros([1024]i16);
const fifo = @allocate_fifo(fifo_buffer);

const infinite_length: u16 = 0x7fff;

const in_dsd = @get_dsd(fabin_dsd, .{
  .extent = infinite_length,
  .fabric_color = sys_mod.MEMCPYH2D_1,
  .input_queue = @get_input_queue(2)});

const out_dsd = @get_dsd(fabout_dsd, .{
  .extent = infinite_length,
  .fabric_color = C1,
  .output_queue = @get_output_queue(3)});

const inDsd = @get_dsd(fabin_dsd, .{
  .extent = size,
  .fabric_color = C1,
  .input_queue = @get_input_queue(1),
});

const outDsd = @get_dsd(fabout_dsd, .{
  .extent = size,
  .fabric_color = sys_mod.MEMCPYD2H_1,
  .output_queue = @get_output_queue(1)
});

var buf = @zeros([1]i16);
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });

task mainTask() void {

  // Move from the fabric to the FIFO
  @mov16(fifo, in_dsd, .{.async = true});

  // Move from the FIFO to C1
  @mov16(out_dsd, fifo, .{.async = true});

  buf[0] = @as(i16, 1);
  @add16(outDsd, inDsd, one_dsd, .{.async=true});
}

comptime {
  // activate local task mainTask at startup
  @bind_local_task(mainTask, main_task_id);
  @activate(main_task_id);

  // fifo sends out the data via C1 --> tx = RAMP
  // add16 receives data via C1 --> rx = RAMP
  @set_local_color_config(C1, .{.routes = .{.rx = .{RAMP}, .tx = .{RAMP}}});
}

run.py

#!/usr/bin/env cs_python

import argparse
import json
import numpy as np

from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module

parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name

# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
  compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
# Size of the input and output tensors; use this value when compiling the
# program, e.g. `cslc --params=size:12 --fabric-dims=8,3 --fabric-offsets=4,1`
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")

memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)

runner.load()
runner.run()

# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)

print("step 1: streaming H2D")
# "input_tensor" is a 1d array
# The type of input_tensor is int16, we need to extend it to uint32
# There are two kind of extension when using the utility function input_array_to_u32
#    input_array_to_u32(np_arr: np.ndarray, sentinel: Optional[int], fast_dim_sz: int)
# 1) zero extension:
#    sentinel = None
# 2) upper 16-bit is the index of the array:
#    sentinel is Not None
#
# In this example, the upper 16-bit is don't care because pe_program.csl only uses
# @add16 to reads lower 16-bit
tensors_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, tensors_u32, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)

print("step 2: streaming D2H")
# The D2H buffer must be of type u32
out_tensors_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(out_tensors_u32, MEMCPYD2H_DATA_1, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(out_tensors_u32, np.dtype(np.int16))

runner.stop()

np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")

commands.sh

#!/usr/bin/env bash

set -e

cslc ./layout.csl --fabric-dims=8,3 \
--fabric-offsets=4,1 --params=size:32 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out