Testing
kontinue provides two approaches for testing workflows: a lightweight test framework for
unit-testing execution logic without Kubernetes, and integration tests using envtest for
testing against a real API server.
Test Framework
The pkg/testframework package provides an in-memory TestEnvironment that implements the
SDK’s runtime interface. Executions, suspensions, and jobs all run in-process without any
Kubernetes dependency.
go get gitlab.com/kontinue/kontinue/pkg/testframework
Basic Usage
Register functions, spawn an execution, and wait for the result:
func TestGreeting(t *testing.T) {
env := testframework.New()
type GreetArgs struct {
Name string `json:"name"`
}
testframework.RegisterFunction(env, "greet", func(ktx *kontinue.ExecutionContext, args GreetArgs) (string, error) {
return fmt.Sprintf("Hello, %s!", args.Name), nil
})
ctx := context.Background()
te, err := testframework.SpawnExecution[string](ctx, env, "greet", GreetArgs{Name: "World"})
require.NoError(t, err)
result, err := te.Wait()
require.NoError(t, err)
require.Equal(t, "Hello, World!", *result)
}
SpawnExecution starts the execution asynchronously and returns a TestExecution handle.
Call .Wait() to block until it completes and get the typed result.
Using Store
Store calls work as expected — state is persisted on the in-memory execution
resource:
testframework.RegisterFunction(env, "compute", func(ktx *kontinue.ExecutionContext, args MyArgs) (int, error) {
// This value is cached on the execution, just like in production
result, err := kontinue.Store(ktx, func() (int, error) {
return expensiveComputation(args), nil
})
if err != nil {
return 0, err
}
return result, nil
})
Parent-Child Executions
Child executions launched with Execute run in their own goroutines, using the registered
functions:
func TestParentChild(t *testing.T) {
env := testframework.New()
type DoubleArgs struct {
N int `json:"n"`
}
testframework.RegisterFunction(env, "double", func(ktx *kontinue.ExecutionContext, args DoubleArgs) (int, error) {
return args.N * 2, nil
})
type SumArgs struct {
Values []int `json:"values"`
}
testframework.RegisterFunction(env, "sum-doubles", func(ktx *kontinue.ExecutionContext, args SumArgs) (int, error) {
total := 0
for _, v := range args.Values {
result, err := kontinue.Execute[int](ktx, "double", DoubleArgs{N: v}, nil)
if err != nil {
return 0, err
}
total += *result
}
return total, nil
})
ctx := context.Background()
te, err := testframework.SpawnExecution[int](ctx, env, "sum-doubles", SumArgs{Values: []int{1, 2, 3}})
require.NoError(t, err)
result, err := te.Wait()
require.NoError(t, err)
require.Equal(t, 12, *result) // (1+2+3) * 2
}
Suspensions and Sleep
By default, timed suspensions (Sleep) auto-complete immediately so tests don’t block.
Manual suspensions (Suspend) always require explicit resolution.
func TestSleepAutoResume(t *testing.T) {
env := testframework.New() // autoResumeSleep is true by default
testframework.RegisterFunction(env, "sleeper", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
// This returns immediately in tests
if err := kontinue.Sleep(ktx, 5 * time.Second); err != nil {
return "", err
}
return "done", nil
})
ctx := context.Background()
te, err := testframework.SpawnExecution[string](ctx, env, "sleeper", struct{}{})
require.NoError(t, err)
result, err := te.Wait()
require.NoError(t, err)
require.Equal(t, "done", *result)
}
Disable auto-resume to control sleep timing explicitly:
func TestManualSleepResume(t *testing.T) {
env := testframework.New()
env.SetAutoResumeSleep(false)
testframework.RegisterFunction(env, "sleeper", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
if err := kontinue.Sleep(ktx, 10 * time.Second); err != nil {
return "", err
}
return "resumed", nil
})
ctx := context.Background()
te, err := testframework.SpawnExecution[string](ctx, env, "sleeper", struct{}{})
require.NoError(t, err)
// Let the execution reach the sleep point
time.Sleep(50 * time.Millisecond)
// Resume all pending suspensions
env.ResumeAll()
result, err := te.Wait()
require.NoError(t, err)
require.Equal(t, "resumed", *result)
}
For manual suspensions, use ResumeSuspension or ResumeAll:
testframework.RegisterFunction(env, "approval", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
if err := kontinue.Suspend(ktx, &kontinue.SuspendOptions{}); err != nil {
return "", err
}
return "approved", nil
})
// ... spawn execution ...
// Resume after the execution blocks
time.Sleep(50 * time.Millisecond)
env.ResumeAll()
You can also cancel a suspension by name:
err := env.CancelSuspension("my-suspension-name")
Mock Job Handler
By default, RunJob returns a succeeded job. Set a custom handler to control job behavior:
func TestJobFailure(t *testing.T) {
env := testframework.New()
env.SetJobHandler(func(job *batchv1.Job) (*batchv1.Job, error) {
job.Status.Failed = 1
job.Status.Conditions = []batchv1.JobCondition{{
Type: batchv1.JobFailed,
Status: corev1.ConditionTrue,
Message: "OOMKilled",
}}
return job, nil
})
testframework.RegisterFunction(env, "run-job", func(ktx *kontinue.ExecutionContext, _ struct{}) (string, error) {
result, err := kontinue.RunJob(ktx, &kontinue.JobOptions{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "worker", Image: "busybox",
}},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
})
if err != nil {
return "", err
}
if !result.Succeeded() {
return result.Message, nil
}
return "ok", nil
})
ctx := context.Background()
te, err := testframework.SpawnExecution[string](ctx, env, "run-job", struct{}{})
require.NoError(t, err)
result, err := te.Wait()
require.NoError(t, err)
require.Equal(t, "OOMKilled", *result)
}
Return an error from the handler to simulate infrastructure failures:
env.SetJobHandler(func(job *batchv1.Job) (*batchv1.Job, error) {
return nil, fmt.Errorf("cluster unreachable")
})
Limitations
The mock TestEnvironment does not run a real Kontinue or Kubernetes runtime, so does not support the full feature set including automatic retries, timeouts, or concurrency control.
Integration Testing with envtest
For tests that need a real Kubernetes API server, use envtest from controller-runtime. This starts a lightweight API server and etcd in-process, giving you a real Kubernetes environment without a full cluster.
Setup
Use TestMain to start the environment once for all tests in the package:
package myworkflow_test
import (
"context"
"fmt"
"os"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
kontinuev1alpha1 "gitlab.com/kontinue/kontinue/api/v1alpha1"
kontinueclient "gitlab.com/kontinue/kontinue/pkg/client"
"gitlab.com/kontinue/kontinue/pkg/worker"
)
var (
cfg *rest.Config
k8sClient client.WithWatch
cli *kontinueclient.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
)
func TestMain(m *testing.M) {
ctx, cancel = context.WithCancel(context.Background())
s := scheme.Scheme
// Register kontinue types
if err := kontinuev1alpha1.AddToScheme(s); err != nil {
fmt.Fprintf(os.Stderr, "failed to add scheme: %v\n", err)
os.Exit(1)
}
// Start envtest API server
testEnv = &envtest.Environment{
Scheme: s,
CRDDirectoryPaths: []string{"path/to/crds"},
}
var err error
cfg, err = testEnv.Start()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to start envtest: %v\n", err)
os.Exit(1)
}
// Create Kubernetes client
k8sClient, err = client.NewWithWatch(cfg, client.Options{Scheme: s})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create client: %v\n", err)
os.Exit(1)
}
// Create test namespace
_ = k8sClient.Create(ctx, &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
})
// Create kontinue client
cli = kontinueclient.NewFromKubeClient(k8sClient, "test")
cli = cli.WithContext(ctx)
// Start a worker
w := worker.NewWorker()
registerFunctions(w) // your function registration
go w.Run(ctx, &worker.Options{
ClientConfig: cfg,
Name: "test-worker",
Namespace: "test",
Group: "default",
})
code := m.Run()
cancel()
_ = testEnv.Stop()
os.Exit(code)
}
Writing Tests
With envtest running, use the kontinue client to spawn executions and wait for results:
func TestMyWorkflow(t *testing.T) {
exec, err := cli.Execute(ctx, "my-function", MyArgs{Value: 42})
require.NoError(t, err)
result, err := kontinueclient.WaitForResult[MyResult](ctx, cli, exec.Name)
require.NoError(t, err)
require.Equal(t, expected, result)
}
Integration tests run against a real API server, so executions go through the full lifecycle including CRD validation, status updates, and owner references. This is useful for testing retry behavior, suspension resolution, and multi-worker scenarios.
See Also
- Execution SDK - Complete SDK reference
- Functions - Defining and registering functions
- Running Jobs - Job execution in workflows
- Suspending - Sleep and suspend operations