Testing

kontinue provides two approaches for testing workflows: a lightweight test framework for unit-testing execution logic without Kubernetes, and integration tests using envtest for testing against a real API server.

Test Framework

The pkg/testframework package provides an in-memory TestEnvironment that implements the SDK’s runtime interface. Executions, suspensions, and jobs all run in-process without any Kubernetes dependency.

go get gitlab.com/kontinue/kontinue/pkg/testframework

Basic Usage

Register functions, spawn an execution, and wait for the result:

func TestGreeting(t *testing.T) {
    env := testframework.New()

    type GreetArgs struct {
        Name string `json:"name"`
    }

    testframework.RegisterFunction(env, "greet", func(ktx *kontinue.ExecutionContext, args GreetArgs) (string, error) {
        return fmt.Sprintf("Hello, %s!", args.Name), nil
    })

    ctx := context.Background()
    te, err := testframework.SpawnExecution[string](ctx, env, "greet", GreetArgs{Name: "World"})
    require.NoError(t, err)

    result, err := te.Wait()
    require.NoError(t, err)
    require.Equal(t, "Hello, World!", *result)
}

SpawnExecution starts the execution asynchronously and returns a TestExecution handle. Call .Wait() to block until it completes and get the typed result.

Using Store

Store calls work as expected — state is persisted on the in-memory execution resource:

testframework.RegisterFunction(env, "compute", func(ktx *kontinue.ExecutionContext, args MyArgs) (int, error) {
    // This value is cached on the execution, just like in production
    result, err := kontinue.Store(ktx, func() (int, error) {
        return expensiveComputation(args), nil
    })
    if err != nil {
        return 0, err
    }
    return result, nil
})

Parent-Child Executions

Child executions launched with Execute run in their own goroutines, using the registered functions:

func TestParentChild(t *testing.T) {
    env := testframework.New()

    type DoubleArgs struct {
        N int `json:"n"`
    }

    testframework.RegisterFunction(env, "double", func(ktx *kontinue.ExecutionContext, args DoubleArgs) (int, error) {
        return args.N * 2, nil
    })

    type SumArgs struct {
        Values []int `json:"values"`
    }

    testframework.RegisterFunction(env, "sum-doubles", func(ktx *kontinue.ExecutionContext, args SumArgs) (int, error) {
        total := 0
        for _, v := range args.Values {
            result, err := kontinue.Execute[int](ktx, "double", DoubleArgs{N: v}, nil)
            if err != nil {
                return 0, err
            }
            total += *result
        }
        return total, nil
    })

    ctx := context.Background()
    te, err := testframework.SpawnExecution[int](ctx, env, "sum-doubles", SumArgs{Values: []int{1, 2, 3}})
    require.NoError(t, err)

    result, err := te.Wait()
    require.NoError(t, err)
    require.Equal(t, 12, *result) // (1+2+3) * 2
}

Suspensions and Sleep

By default, timed suspensions (Sleep) auto-complete immediately so tests don’t block. Manual suspensions (Suspend) always require explicit resolution.

func TestSleepAutoResume(t *testing.T) {
    env := testframework.New() // autoResumeSleep is true by default

    testframework.RegisterFunction(env, "sleeper", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
        // This returns immediately in tests
        if err := kontinue.Sleep(ktx, 5 * time.Second); err != nil {
            return "", err
        }
        return "done", nil
    })

    ctx := context.Background()
    te, err := testframework.SpawnExecution[string](ctx, env, "sleeper", struct{}{})
    require.NoError(t, err)

    result, err := te.Wait()
    require.NoError(t, err)
    require.Equal(t, "done", *result)
}

Disable auto-resume to control sleep timing explicitly:

func TestManualSleepResume(t *testing.T) {
    env := testframework.New()
    env.SetAutoResumeSleep(false)

    testframework.RegisterFunction(env, "sleeper", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
        if err := kontinue.Sleep(ktx, 10 * time.Second); err != nil {
            return "", err
        }
        return "resumed", nil
    })

    ctx := context.Background()
    te, err := testframework.SpawnExecution[string](ctx, env, "sleeper", struct{}{})
    require.NoError(t, err)

    // Let the execution reach the sleep point
    time.Sleep(50 * time.Millisecond)

    // Resume all pending suspensions
    env.ResumeAll()

    result, err := te.Wait()
    require.NoError(t, err)
    require.Equal(t, "resumed", *result)
}

For manual suspensions, use ResumeSuspension or ResumeAll:

testframework.RegisterFunction(env, "approval", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
    if err := kontinue.Suspend(ktx, &kontinue.SuspendOptions{}); err != nil {
        return "", err
    }
    return "approved", nil
})

// ... spawn execution ...

// Resume after the execution blocks
time.Sleep(50 * time.Millisecond)
env.ResumeAll()

You can also cancel a suspension by name:

err := env.CancelSuspension("my-suspension-name")

Mock Job Handler

By default, RunJob returns a succeeded job. Set a custom handler to control job behavior:

func TestJobFailure(t *testing.T) {
    env := testframework.New()

    env.SetJobHandler(func(job *batchv1.Job) (*batchv1.Job, error) {
        job.Status.Failed = 1
        job.Status.Conditions = []batchv1.JobCondition{{
            Type:    batchv1.JobFailed,
            Status:  corev1.ConditionTrue,
            Message: "OOMKilled",
        }}
        return job, nil
    })

    testframework.RegisterFunction(env, "run-job", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
        result, err := kontinue.RunJob(ktx, &kontinue.JobOptions{
            Spec: batchv1.JobSpec{
                Template: corev1.PodTemplateSpec{
                    Spec: corev1.PodSpec{
                        Containers: []corev1.Container{{
                            Name: "worker", Image: "busybox",
                        }},
                        RestartPolicy: corev1.RestartPolicyNever,
                    },
                },
            },
        })
        if err != nil {
            return "", err
        }
        if !result.Succeeded() {
            return result.Message, nil
        }
        return "ok", nil
    })

    ctx := context.Background()
    te, err := testframework.SpawnExecution[string](ctx, env, "run-job", struct{}{})
    require.NoError(t, err)

    result, err := te.Wait()
    require.NoError(t, err)
    require.Equal(t, "OOMKilled", *result)
}

Return an error from the handler to simulate infrastructure failures:

env.SetJobHandler(func(job *batchv1.Job) (*batchv1.Job, error) {
    return nil, fmt.Errorf("cluster unreachable")
})

Limitations

The mock TestEnvironment does not run a real Kontinue or Kubernetes runtime, so does not support the full feature set including automatic retries, timeouts, or concurrency control.

Integration Testing with envtest

For tests that need a real Kubernetes API server, use envtest from controller-runtime. This starts a lightweight API server and etcd in-process, giving you a real Kubernetes environment without a full cluster.

Setup

Use TestMain to start the environment once for all tests in the package:

package myworkflow_test

import (
    "context"
    "fmt"
    "os"
    "testing"

    "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/envtest"

    kontinuev1alpha1 "gitlab.com/kontinue/kontinue/api/v1alpha1"
    kontinueclient "gitlab.com/kontinue/kontinue/pkg/client"
    "gitlab.com/kontinue/kontinue/pkg/worker"
)

var (
    cfg       *rest.Config
    k8sClient client.WithWatch
    cli       *kontinueclient.Client
    testEnv   *envtest.Environment
    ctx       context.Context
    cancel    context.CancelFunc
)

func TestMain(m *testing.M) {
    ctx, cancel = context.WithCancel(context.Background())
    s := scheme.Scheme

    // Register kontinue types
    if err := kontinuev1alpha1.AddToScheme(s); err != nil {
        fmt.Fprintf(os.Stderr, "failed to add scheme: %v\n", err)
        os.Exit(1)
    }

    // Start envtest API server
    testEnv = &envtest.Environment{
        Scheme:            s,
        CRDDirectoryPaths: []string{"path/to/crds"},
    }

    var err error
    cfg, err = testEnv.Start()
    if err != nil {
        fmt.Fprintf(os.Stderr, "failed to start envtest: %v\n", err)
        os.Exit(1)
    }

    // Create Kubernetes client
    k8sClient, err = client.NewWithWatch(cfg, client.Options{Scheme: s})
    if err != nil {
        fmt.Fprintf(os.Stderr, "failed to create client: %v\n", err)
        os.Exit(1)
    }

    // Create test namespace
    _ = k8sClient.Create(ctx, &v1.Namespace{
        ObjectMeta: metav1.ObjectMeta{Name: "test"},
    })

    // Create kontinue client
    cli = kontinueclient.NewFromKubeClient(k8sClient, "test")
    cli = cli.WithContext(ctx)

    // Start a worker
    w := worker.NewWorker()
    registerFunctions(w) // your function registration
    go w.Run(ctx, &worker.Options{
        ClientConfig: cfg,
        Name:         "test-worker",
        Namespace:    "test",
        Group:        "default",
    })

    code := m.Run()
    cancel()
    _ = testEnv.Stop()
    os.Exit(code)
}

Writing Tests

With envtest running, use the kontinue client to spawn executions and wait for results:

func TestMyWorkflow(t *testing.T) {
    exec, err := cli.Execute(ctx, "my-function", MyArgs{Value: 42})
    require.NoError(t, err)

    result, err := kontinueclient.WaitForResult[MyResult](ctx, cli, exec.Name)
    require.NoError(t, err)
    require.Equal(t, expected, result)
}

Integration tests run against a real API server, so executions go through the full lifecycle including CRD validation, status updates, and owner references. This is useful for testing retry behavior, suspension resolution, and multi-worker scenarios.

See Also