Skip to content

stream_chunker example

A stream re-framer: accepts samples in arbitrary-size bursts and emits them as fixed-size chunks. Demonstrates variable-size input with variable-size output using --variable-output and --no-step.

The key concept: some calls produce zero chunks (not enough data yet); others produce one or several. The Python caller never knows in advance how many samples will come back — it just checks len(view) after each call.

TL;DR — see it work first

. <(curl -fsSL https://just-buildit.github.io/just-makeit/install.sh)
just-makeit example stream_chunker
# stream_chunker: PASSED

Prerequisites

. <(curl -fsSL https://just-buildit.github.io/just-makeit/install.sh)

1. Scaffold

just-makeit new my_chunker
cd my_chunker

just-makeit object chunker \
    --state "chunk_size:int32_t:64" \
    --state "buf:float _Complex[256]" \
    --state "n_buf:int32_t:0" \
    --no-step

just-makeit method chunker push \
    --arg-type "float _Complex" \
    --return-type "float _Complex" \
    --variable-output

--no-step suppresses step() and steps() — the only interface is push().

--variable-output generates two C stubs in chunker_core.c:

Stub Called by ext Your job
chunker_push_max_out(state) Once at __init__ Return max output samples possible
chunker_push(state, in, n_in, out) Every Python call Fill out[], return actual count

2. Implement

Replace both stubs in native/src/chunker/chunker_core.c.

Generated stubs:

/* <<IMPLEMENT>> return max samples chunker_push can ever produce */
size_t
chunker_push_max_out(chunker_state_t *state)
{
    (void)state;
    return 0; /* TODO */
}

/* <<IMPLEMENT>> process input and write results to out[]; return count written */
size_t
chunker_push(chunker_state_t *state, const float complex *in, size_t n_in,
             float complex *out)
{
    (void)state; (void)in; (void)out;
    return 0; /* TODO */
}

Implementation:

size_t
chunker_push_max_out(chunker_state_t *state)
{
    /* buf[] holds 256 samples — the absolute output ceiling.
     * Push at most (256 - n_buf) samples per call to stay safe. */
    (void)state;
    return 256;
}

size_t
chunker_push(chunker_state_t *state, const float complex *in, size_t n_in,
             float complex *out)
{
    size_t n_out = 0;
    for (size_t i = 0; i < n_in; i++) {
        state->buf[state->n_buf++] = in[i];
        if (state->n_buf >= state->chunk_size) {
            memcpy(out + n_out, state->buf,
                   (size_t)state->chunk_size * sizeof(float complex));
            n_out += (size_t)state->chunk_size;
            state->n_buf = 0;
        }
    }
    return n_out;
}

memcpy and complex.h are already included via clib_common.h.

The patch script automates this replacement:

python3 .steps/02_patch.py

3. Build and test

cmake -B build -S . -DCMAKE_BUILD_TYPE=Release
cmake --build build --parallel 4
ctest --test-dir build --output-on-failure

4. Use from Python

"""Integration test: feed irregular bursts into Chunker, verify chunk output.

Run from the project root after building:
    python3 .steps/04_demo.py

Constraint: with chunk_size=64 and an internal buf[256], the output buffer
pre-allocated by --variable-output holds 256 samples (4 complete chunks).
Callers must not push more samples in one call than the output buffer can hold:
  max safe push = floor(256 / chunk_size) * chunk_size - current_n_buf
For chunk_size=64 and worst-case n_buf=63: max push ≈ 192 samples.
"""

import sys
import pathlib

# Add the built extension to sys.path
build_dir = pathlib.Path("build")
for p in build_dir.rglob("my_chunker*.so"):
    sys.path.insert(0, str(p.parent))
    break
sys.path.insert(0, str(pathlib.Path("src")))

import numpy as np
from my_chunker import Chunker

CHUNK = 64

c = Chunker(chunk_size=CHUNK)

# Irregular bursts that collectively push 281 samples.
# Max single burst = 180; worst-case output = 3 chunks = 192 samples < 256.
bursts = [7, 50, 1, 40, 180, 3]      # sum = 281
# Expected: floor(281 / 64) = 4 complete chunks (256 samples), 25 buffered.

collected = []   # copies of complete-chunk views
total_in = 0

for size in bursts:
    block = np.ones(size, dtype=np.complex64) * complex(total_in, 0)
    view = c.push(block)
    # view is a zero-copy slice of the object's internal output buffer.
    # It becomes stale on the next push() call — copy immediately.
    if len(view):
        assert len(view) % CHUNK == 0, \
            f"output length {len(view)} is not a multiple of chunk_size {CHUNK}"
        collected.append(view.copy())
    total_in += size

total_out = sum(len(v) for v in collected)
assert total_out == 4 * CHUNK, \
    f"expected {4 * CHUNK} output samples, got {total_out}"

# Verify that the first burst (7 samples) produced no output
assert len(collected[0]) >= CHUNK, "first non-empty view should be ≥ one chunk"

# reset() clears the accumulator; next push starts fresh
c.reset()
view = c.push(np.zeros(CHUNK, dtype=np.complex64))
assert len(view) == CHUNK, "after reset: one full chunk in → one chunk out"

print("stream_chunker demo: PASSED")
print(f"  fed {total_in} samples in {len(bursts)} irregular bursts")
print(f"  received {total_out} output samples ({total_out // CHUNK} complete "
      f"{CHUNK}-sample chunks)")
print(f"  {total_in - total_out} samples remain buffered (flushed on reset)")
print(f"  {len(collected)} non-empty push() calls (some bursts produced 0 output)")

Memory ownership diagram

c = Chunker(chunk_size=64)
│
└─ ext calls chunker_push_max_out()  → 256
   ext mallocs float complex[256]    ← one malloc, at __init__
   stored as c._out_buf (opaque)

view = c.push(block)
│
├─ ext calls chunker_push(state, block.data, len(block), c._out_buf)
│    → returns n_out (multiple of 64; may be 0)
│
└─ returns numpy view wrapping c._out_buf[:n_out]
   ownership: object retains the buffer
   lifetime:  view is stale after the NEXT push() — copy immediately

Output size constraint

push_max_out returns 256 (the internal buffer capacity). The ext pre-allocates exactly 256 output samples. A single push() call must not produce more than 256 output samples.

With chunk_size=64: each call can emit at most floor(256 / 64) = 4 complete chunks. The safe maximum input per call is:

max_safe_push = 4 * chunk_size - current_n_buf
              = 256 - current_n_buf
              ≤ 256 samples

For larger inputs, split into ≤192-sample slices before calling push().


5. reset()

reset() sets n_buf = 0 and zeroes buf — any partially accumulated samples are discarded. Useful at stream boundaries or after error recovery.

c.reset()
# Next push() starts with an empty accumulation buffer