Pipeline 3: Add an artificial halo

The disadvantage of FIFO in the previous example is the resource consumption. The FIFO requires two microthreads and a scratch buffer.

The simple workaround is to move such FIFO outside the kernel. We add another halo, which we call an artificial halo, around the kernel (pe_program.csl). The west side is west.csl and east side is east.csl. The west.csl implements a FIFO to receive the data from H2D. The east.csl implements a FIFO to receive the data from pe_program.csl and redirect it to D2H.

There is no more FIFO in pe_program.csl. Instead, we replace the colors MEMCPYH2D_DATA_1 by Cin and MEMCPYD2H_DATA_1 by Cout. The color Cin receives data from the west to the ramp. The color Cout sends the data from ramp to the east.

This example has the same property as pipeline-02-fifo: as long as the parameter size does not exceed the capacity of the FIFO in west.csl, H2D can always finish so the @add16 can progress.

layout.csl

// resources to route the data between the host and the device.
//

// color/ task ID map
//
//  ID var       ID var         ID var               ID var
//   0 H2D        9  STARTUP    18                   27 reserved (memcpy)
//   1 D2H       10             19                   28 reserved (memcpy)
//   2           11             20                   29 reserved
//   3           12             21 reserved (memcpy) 30 reserved (memcpy)
//   4           13             22 reserved (memcpy) 31 reserved
//   5           14             23 reserved (memcpy) 32
//   6 Cin       15             24                   33
//   7 Cout      16             25                   34
//   8 main      17             26                   35
//

param size: i16;

param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;

const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);

const Cin: color = @get_color(6);
const Cout: color = @get_color(7);

const main: u16 = 8;
const STARTUP: local_task_id = @get_local_task_id(9);

const memcpy = @import_module( "<memcpy/get_params>", .{
    .width = 3,
    .height = 1,
    .MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
    .MEMCPYD2H_1 = MEMCPYD2H_DATA_1
    });

layout {
  @set_rectangle(3, 1);

  // west.csl has a H2D
  @set_tile_code(0, 0, "memcpyEdge/west.csl", .{
    .USER_IN_1 = Cin,
    .STARTUP = STARTUP,
    .memcpy_params = memcpy.get_params(0)
  });

  @set_tile_code(1, 0, "pe_program.csl", .{
    .size = size,
    .main = main,
    .Cin = Cin,
    .Cout = Cout,
    .memcpy_params = memcpy.get_params(1)
  });

  // east.csl only hase a D2H
  @set_tile_code(2, 0, "memcpyEdge/east.csl", .{
    .USER_OUT_1 = Cout,
    .STARTUP = STARTUP,
    .memcpy_params = memcpy.get_params(2)
  });
}

pe_program.csl

// Not a complete program; the top-level source file is code.csl.

param size: i16;
param main: u16;

param Cin: color;
param Cout: color;

param memcpy_params: comptime_struct;

const main_task_id: local_task_id = @get_local_task_id(main);

const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);

const inDsd = @get_dsd(fabin_dsd, .{
  .extent = size,
  .fabric_color = Cin,
  .input_queue = @get_input_queue(1),
});

const outDsd = @get_dsd(fabout_dsd, .{
  .extent = size,
  .fabric_color = Cout,
  .output_queue = @get_output_queue(1)
});

var buf = @zeros([1]i16);
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });

task mainTask() void {

  buf[0] = @as(i16, 1);
  @add16(outDsd, inDsd, one_dsd, .{.async=true});
}

comptime {
  // activate local task mainTask at startup
  @bind_local_task(mainTask, main_task_id);
  @activate(main_task_id);

  const input_route = .{ .rx = .{ WEST }, .tx = .{ RAMP } };
  @set_local_color_config(Cin, .{ .routes = input_route });

  const output_route = .{ .rx = .{ RAMP }, .tx = .{ EAST } };
  @set_local_color_config(Cout, .{ .routes = output_route });
}

run.py

#!/usr/bin/env cs_python

import argparse
import json
import numpy as np

from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module

parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name

# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
  compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
# Size of the input and output tensors; use this value when compiling the
# program, e.g. `cslc --params=size:12 --fabric-dims=8,3 --fabric-offsets=4,1`
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")

memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)

runner.load()
runner.run()

# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)

print("step 1: streaming H2D to P0.0")
# "input_tensor" is a 1d array
# The type of input_tensor is int16, we need to extend it to uint32
# There are two kind of extension when using the utility function input_array_to_u32
#    input_array_to_u32(np_arr: np.ndarray, sentinel: Optional[int], fast_dim_sz: int)
# 1) zero extension:
#    sentinel = None
# 2) upper 16-bit is the index of the array:
#    sentinel is Not None
#
# In this example, the upper 16-bit is don't care because pe_program.csl only uses
# @add16 to reads lower 16-bit
tensors_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, tensors_u32, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)

print("step 2: streaming D2H at P2.0")
# The D2H buffer must be of type u32
out_tensors_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(out_tensors_u32, MEMCPYD2H_DATA_1, 2, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(out_tensors_u32, np.dtype(np.int16))

runner.stop()

np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")

memcpyEdge/memcpy_edge.csl

// This is a template of memcpy over the edges.
// memcpy_edge.csl can be "north", "south", "west" or "east"
// of the following layout.
//        +---------+
//        |  north  |
// +------+---------+------+
// | west |  core   | east |
// +------+---------+------+
//        |  south  |
//        +---------+
// north.csl, south.csl, west.csl and east.csl instantiate
// memcpy_edge.csl with a proper direction.
//
// memcpy_edge.csl supports 2 streaming H2Ds and one
// streaming D2H. Such constraint depends on the design.
// The current implementation binds a FIFO for a H2D or D2H,
// so we can only support 3 in total.
// We choose 2 H2Ds and 1 D2H.
// if we replace FIFO by WTT, we could support more.
//
// However the user can instantiate memcpy_edge.csl for each
// edge. The maximum number of H2Ds is 2*4 = 8 and maximum
// number of D2Hs is 1*4 = 4.
//
// If the user only has a H2D at north, for example, he only
// needs to configure color USER_IN_1, i.e. only a single
// streaming H2D is used.
//
// For example,
//   @set_tile_code(pe_x, 0, "north.csl", .{
//      .USER_IN_1 = mainColor,
//      .STARTUP = STARTUP,
//      .memcpy_params = memcpy_params,
//      .MEMCPYH2D_DATA_1 = MEMCPYH2D_DATA_1,
//      .MEMCPYD2H_DATA_1 = MEMCPYD2H_DATA_1
//    });

// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);

// receive data from the "core"
param USER_OUT_1: color = @get_color(32);

// entrypoint
param STARTUP: local_task_id;

// ----------
// Every PE needs to import memcpy module otherwise the I/O cannot
// propagate the data to the destination.

param memcpy_params: comptime_struct;

// The direction of "core", for example
// north.csl has dir = SOUTH
// south.csl has dir = NORTH
// west.csl has dir = EAST
// east.csl has dir = WEST
param dir: direction;

// memcpy module reserves input queue 0 and output queue 0
const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);
// ----------

const h2d_mod = @import_module("h2d.csl", .{
     .USER_IN_1 = USER_IN_1,
     .USER_IN_2 = USER_IN_2,
     .MEMCPYH2D_1 = memcpy_params.MEMCPYH2D_1,
     .MEMCPYH2D_2 = memcpy_params.MEMCPYH2D_2,
     .txdir = dir
      });

const d2h_mod = @import_module("d2h.csl", .{
     .USER_OUT_1 = USER_OUT_1,
     .MEMCPYD2H_1 = memcpy_params.MEMCPYD2H_1,
     .rxdir = dir
      });

task f_startup() void {
    h2d_mod.f_startup();
    d2h_mod.f_startup();
}

comptime {
    @bind_local_task(f_startup, STARTUP);
    @activate(STARTUP);
}

memcpyEdge/h2d.csl

// Two streaming H2Ds:
// 1st H2D: UT 1 and UT 2
// 2nd H2D: UT 3 and UT 4

param MEMCPYH2D_1: color = @get_color(32);
param MEMCPYH2D_2: color = @get_color(32);

// Color along which we send a wavelet to pe_program
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);

param txdir: direction;

const max_fifo_len = 256*20; // maximum length of the fifo

var fifo1_buffer = @zeros([max_fifo_len]u32);
const fifo1 = @allocate_fifo(fifo1_buffer);

var fifo2_buffer = @zeros([max_fifo_len]u32);
const fifo2 = @allocate_fifo(fifo2_buffer);

// length=inf
var fab_recv_wdsd_1 = @get_dsd(fabin_dsd, .{
   .extent = 0x7fff,
   .fabric_color = MEMCPYH2D_1,
   .input_queue = @get_input_queue(1)
});

// length=inf
var fab_trans_wdsd_1 = @get_dsd(fabout_dsd, .{
    .extent = 0x7fff,
    .fabric_color = USER_IN_1,
    .output_queue = @get_output_queue(2)
});

// length=inf
var fab_recv_wdsd_2 = @get_dsd(fabin_dsd, .{
   .extent = 0x7fff,
   .fabric_color = MEMCPYH2D_2,
   .input_queue = @get_input_queue(3)
});

// length=inf
var fab_trans_wdsd_2 = @get_dsd(fabout_dsd, .{
    .extent = 0x7fff,
    .fabric_color = USER_IN_2,
    .output_queue = @get_output_queue(4)
});

// if no user's color is defined, f_startup() is empty
fn f_startup() void {
    if ( (@get_int(MEMCPYH2D_1) < 24) and (@get_int(USER_IN_1) < 24) ){
        // receive data from streaming H2D
        @mov32(fifo1, fab_recv_wdsd_1, .{.async=true} );

        // forward data to USER_IN_1
        @mov32(fab_trans_wdsd_1, fifo1, .{.async=true} );
    }

    if ( (@get_int(MEMCPYH2D_2) < 24) and (@get_int(USER_IN_2) < 24) ){
        // receive data from streaming H2D
        @mov32(fifo2, fab_recv_wdsd_2, .{.async=true} );

        // forward data to USER_IN_1
        @mov32(fab_trans_wdsd_2, fifo2, .{.async=true} );
    }
}

comptime {
    if (@get_int(USER_IN_1) < 24){
        const h2d_route = .{ .rx = .{ RAMP }, .tx = .{ txdir } };
        @set_local_color_config(USER_IN_1, .{ .routes = h2d_route });
    }

    if (@get_int(USER_IN_2) < 24){
        const h2d_route = .{ .rx = .{ RAMP }, .tx = .{ txdir } };
        @set_local_color_config(USER_IN_2, .{ .routes = h2d_route });
    }
}

memcpyEdge/d2h.csl

// One streaming D2H:
// 1st D2H: UT 5 and UT 6

param MEMCPYD2H_1: color = @get_color(32);

// Color along which we expect a wavelet
param USER_OUT_1: color = @get_color(32);

param rxdir: direction;

const max_fifo_len = 256*40; // maximum length of the fifo

var fifo1_buffer = @zeros([max_fifo_len]u32);
const fifo1 = @allocate_fifo(fifo1_buffer);

// length=inf
var fab_recv_wdsd = @get_dsd(fabin_dsd, .{
   .extent = 0x7fff,
   .fabric_color = USER_OUT_1,
   .input_queue = @get_input_queue(6)
});

// length=inf
var fab_trans_wdsd = @get_dsd(fabout_dsd, .{
    .extent = 0x7fff,
    .fabric_color = MEMCPYD2H_1,
    .output_queue = @get_output_queue(5)
});

// if USER_OUT_1 is not valid, f_startup() is empty
fn f_startup() void {
    if ( (@get_int(MEMCPYD2H_1) < 24) and (@get_int(USER_OUT_1) < 24) ){
        // receive data from USER_OUT_1
        @mov32(fifo1, fab_recv_wdsd, .{.async=true} );

        // forward data to MEMCPYD2H_1
        @mov32(fab_trans_wdsd, fifo1, .{.async=true} );
    }
}

comptime {
    if (@get_int(USER_OUT_1) < 24){
        const d2h_route = .{ .rx = .{ rxdir }, .tx = .{ RAMP } };
        @set_local_color_config(USER_OUT_1, .{ .routes = d2h_route });
    }
}

memcpyEdge/east.csl

// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);

// receive data from the "core"
param USER_OUT_1: color = @get_color(32);

// entrypoint
param STARTUP: local_task_id;

param memcpy_params: comptime_struct;

const edge_mod = @import_module( "memcpy_edge.csl", .{
     .memcpy_params = memcpy_params,
     .USER_IN_1 = USER_IN_1,
     .USER_IN_2 = USER_IN_2,
     .USER_OUT_1 = USER_OUT_1,
     .STARTUP = STARTUP,
     .dir = WEST
      });

memcpyEdge/west.csl

// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);

// receive data from the "core"
param USER_OUT_1: color = @get_color(32);

// entrypoint
param STARTUP: local_task_id;

param memcpy_params: comptime_struct;

const edge_mod = @import_module( "memcpy_edge.csl", .{
     .memcpy_params = memcpy_params,
     .USER_IN_1 = USER_IN_1,
     .USER_IN_2 = USER_IN_2,
     .USER_OUT_1 = USER_OUT_1,
     .STARTUP = STARTUP,
     .dir = EAST
      });

memcpyEdge/north.csl

// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);

// receive data from the "core"
param USER_OUT_1: color = @get_color(32);

// entrypoint
param STARTUP: local_task_id;

param memcpy_params: comptime_struct;

const edge_mod = @import_module( "memcpy_edge.csl", .{
     .memcpy_params = memcpy_params,
     .USER_IN_1 = USER_IN_1,
     .USER_IN_2 = USER_IN_2,
     .USER_OUT_1 = USER_OUT_1,
     .STARTUP = STARTUP,
     .dir = SOUTH
      });

memcpyEdge/south.csl

// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);

// receive data from the "core"
param USER_OUT_1: color = @get_color(32);

// entrypoint
param STARTUP: local_task_id;

param memcpy_params: comptime_struct;

const edge_mod = @import_module( "memcpy_edge.csl", .{
     .memcpy_params = memcpy_params,
     .USER_IN_1 = USER_IN_1,
     .USER_IN_2 = USER_IN_2,
     .USER_OUT_1 = USER_OUT_1,
     .STARTUP = STARTUP,
     .dir = NORTH
      });

commands.sh

#!/usr/bin/env bash

set -e

cslc ./layout.csl --fabric-dims=10,3 \
--fabric-offsets=4,1 --params=size:32 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out