Execution SDK
The kontinue package (gitlab.com/kontinue/kontinue/pkg/kontinue) provides the Go SDK
for writing durable functions. This reference documents all public types and functions.
ExecutionContext
The ExecutionContext is passed to every function and provides access to the execution
runtime and state management APIs.
func MyWorkflow(ktx *kontinue.ExecutionContext, args *MyArgs) (*MyResult, error) {
// Use ktx to access SDK functions
}
Methods
Context
func (ec *ExecutionContext) Context() context.Context
Returns the underlying Go context for use with external libraries.
Execution
func (ec *ExecutionContext) Execution() *kontinuev1alpha1.Execution
Returns the current Execution resource. Useful for accessing metadata like name, namespace, or labels.
Attempts
func (ec *ExecutionContext) Attempts() int
Returns the number of previous execution attempts, not including the currently running attempt. Use this to implement attempt-specific logic.
Resumptions
func (ec *ExecutionContext) Resumptions() int
Returns the number of times this execution was resumed after the initial start, inclusive of retries. A resumption occurs when a different worker takes over an already-started execution.
Child Executions
FunctionName
type FunctionName string
A type alias for function names to improve type safety. String literals are automatically converted.
Execute
func Execute[Result any](ktx *ExecutionContext, function FunctionName, args any, opts *ExecuteOptions) (*Result, error)
Starts a child execution and waits for it to complete. Provides compile-time type checking for the result type.
result, err := kontinue.Execute[DeployResult](ktx, "deploy-service", &DeployArgs{
Service: "api",
}, nil)
With options:
result, err := kontinue.Execute[DeployResult](ktx, "deploy-service", &DeployArgs{
Service: "api",
}, &kontinue.ExecuteOptions{
StepOptions: kontinue.StepOptions{
DisplayName: "Deploy API Service",
Description: "Deploys the API service to the cluster",
},
Retry: &kontinue.RetryOptions{Retries: 3},
Timeout: &kontinue.TimeoutOptions{Attempt: &metav1.Duration{Duration: 5 * time.Minute}},
})
SpawnExecution
func SpawnExecution(ktx *ExecutionContext, function FunctionName, args any, opts *ExecuteOptions) (*ChildExecution, error)
Starts a child Execution without waiting for it to complete. This is idempotent — if called again during replay, it returns a reference to the existing child execution.
Use with WaitExecution for advanced patterns like spawning multiple children before waiting.
WaitExecution
func WaitExecution[Result any](ktx *ExecutionContext, child *ChildExecution) (*Result, error)
Waits for a spawned child Execution to complete and returns its result.
child, err := kontinue.SpawnExecution(ktx, "long-task", &Args{}, nil)
if err != nil {
return nil, err
}
// Do other work...
result, err := kontinue.WaitExecution[TaskResult](ktx, child)
ExecuteParallel
func ExecuteParallel[Result any](ktx *ExecutionContext, specs []*ChildExecutionOptions) ([]*Result, error)
Starts multiple child executions and waits for all to complete in parallel. Returns results in the same order as input specs. Children are spawned sequentially but waited on in parallel. Updates Progress automatically if not already set.
results, err := kontinue.ExecuteParallel[TestResult](ktx, []*kontinue.ChildExecutionOptions{
{Function: "test-unit", Arguments: &TestArgs{}},
{Function: "test-integration", Arguments: &TestArgs{}},
{Function: "test-e2e", Arguments: &TestArgs{}},
})
ExecuteSequential
func ExecuteSequential[Result any](ktx *ExecutionContext, specs []*ChildExecutionOptions) ([]*Result, error)
Starts multiple child executions and waits for each to complete before starting the next. Returns results in the same order as input specs. Updates Progress automatically if not already set.
ExecuteOptions
type ExecuteOptions struct {
StepOptions // Naming and metadata options
Retry *RetryOptions // Retry configuration (overrides function defaults)
Timeout *TimeoutOptions // Timeout configuration (overrides function defaults)
Mutex *ExecutionMutex // Mutex configuration for concurrency control
}
Optional configuration for child executions. Pass nil when no special options are needed.
ChildExecutionOptions
type ChildExecutionOptions struct {
Function FunctionName // Name of the function to execute
Arguments any // Arguments to pass to the function
Options *ExecuteOptions // Optional configuration
}
Used with ExecuteParallel and ExecuteSequential to specify multiple child executions.
ChildExecution
type ChildExecution struct {
Name string // Name of the child execution
Namespace string // Namespace of the child execution
}
Reference to a spawned child execution, returned by SpawnExecution.
State Management
Store
func Store[T any](ktx *ExecutionContext, fn func() (T, error)) (T, error)
Runs the provided function and caches its result on the Execution so that resumed attempts do not recompute the value. Use this to cache expensive results or ensure side-effects are deterministic.
orderID, err := kontinue.Store(ktx, func() (string, error) {
return uuid.New().String(), nil
})
The function is only executed on the first run. On replay, the cached value is returned.
StoreNamed
func StoreNamed[T any](ktx *ExecutionContext, opts *StoreOptions, fn func() (T, error)) (T, error)
Functions as Store but allows specifying a custom step name via StoreOptions.
Useful for avoiding collisions or making stored keys deterministic.
StoreOptions
type StoreOptions struct {
StepOptions
}
Suspending
Sleep
func Sleep(ktx *ExecutionContext, duration time.Duration) error
Pauses the Execution for the specified duration. This is replay-safe — if the Execution is resumed after a failure, it will not re-sleep for the full duration.
Sleeps can be resumed early via the CLI (kontinue resume) or UI.
if err := kontinue.Sleep(ktx, 1*time.Hour); err != nil {
return nil, err
}
SleepNamed
func SleepNamed(ktx *ExecutionContext, opts *SleepOptions) error
Functions as Sleep but allows specifying a custom step name and metadata.
err := kontinue.SleepNamed(ktx, &kontinue.SleepOptions{
Duration: 5 * time.Minute,
StepOptions: kontinue.StepOptions{
DisplayName: "Wait for DNS Propagation",
},
})
Suspend
func Suspend(ktx *ExecutionContext, opts *SuspendOptions) error
Creates a Suspension that must be manually resolved via the API, CLI, or UI. Use this for human approval gates or external trigger points.
// Wait for manual approval
if err := kontinue.Suspend(ktx, &kontinue.SuspendOptions{
StepOptions: kontinue.StepOptions{
DisplayName: "Approve Production Deploy",
},
}); err != nil {
return nil, err
}
SleepOptions
type SleepOptions struct {
StepOptions
Duration time.Duration
}
SuspendOptions
type SuspendOptions struct {
StepOptions
}
Polling
PollUntil
func PollUntil(ktx *ExecutionContext, condition func() bool, interval time.Duration) error
Polls the condition function at regular intervals until it returns true. The completion
is stored via Store() to avoid re-polling on replay.
err := kontinue.PollUntil(ktx, func() bool {
return checkDeploymentReady()
}, 10*time.Second)
PollUntilNamed
func PollUntilNamed(ktx *ExecutionContext, condition func() bool, opts *PollOptions) error
Functions as PollUntil but allows specifying a custom step name.
PollOptions
type PollOptions struct {
StepOptions
Interval time.Duration // Time to wait between poll attempts
}
Jobs
RunJob
func RunJob(ktx *ExecutionContext, opts *JobOptions) (*JobResult, error)
Runs a Kubernetes Job and waits for it to complete. The job is tracked with labels and
OwnerReferences like Suspensions and child Executions. Idempotent on replay.
Respects the parent’s ChildPolicy for retry behavior. Returns a JobResult with
information about all pods in the job.
RunPod
func RunPod(ktx *ExecutionContext, opts *PodOptions) (*PodResult, error)
Runs a single Pod via a Kubernetes Job and returns detailed pod information, including
container exit codes and access to logs. Creates a Job with BackoffLimit: 0 to ensure
exactly one pod is created.
result, err := kontinue.RunPod(ktx, &kontinue.PodOptions{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "task",
Image: "myapp:latest",
Command: []string{"./run-task"},
}},
},
})
if !result.Succeeded() {
summary, _ := result.LogErrorSummary(ktx)
return nil, fmt.Errorf("task failed: %s", summary)
}
RunScript
func RunScript(ktx *ExecutionContext, opts *ScriptOptions) (*PodResult, error)
Convenience wrapper around RunPod for simple command execution.
result, err := kontinue.RunScript(ktx, &kontinue.ScriptOptions{
Image: "alpine:latest",
Command: "echo 'Hello, World!'",
StepOptions: kontinue.StepOptions{
DisplayName: "Run Setup Script",
},
})
if !result.Succeeded() {
return nil, fmt.Errorf("script failed: %s", result.Message)
}
JobOptions
type JobOptions struct {
StepOptions
Spec batchv1.JobSpec // The job specification to run
ExtraAnnotations map[string]string // Additional annotations
ExtraLabels map[string]string // Additional labels
}
PodOptions
type PodOptions struct {
StepOptions
Spec corev1.PodSpec // The pod specification to run
ExtraAnnotations map[string]string // Additional annotations
ExtraLabels map[string]string // Additional labels
}
ScriptOptions
type ScriptOptions struct {
StepOptions
Image string // Container image to use
Command string // Script/command to run
Shell []string // Shell command (defaults to ["/bin/sh", "-c"])
ExtraAnnotations map[string]string
ExtraLabels map[string]string
}
LogOptions
type LogOptions struct {
TailLines int64 // Number of lines from end to fetch (default: 100)
Container string // Container name (optional, defaults to first container)
}
JobResult
type JobResult struct {
SucceededPods int32 // Number of pods that completed successfully
FailedPods int32 // Number of pods that failed
Message string // Error or status message
Pods []*PodResult // Results for all pods created by the job
}
func (r *JobResult) Succeeded() bool // Returns SucceededPods > 0 && FailedPods == 0
PodResult
type PodResult struct {
Pod *corev1.Pod // The completed Pod resource
Message string // Status message from pod conditions
Phase corev1.PodPhase // Pod phase (Succeeded, Failed, etc.)
}
func (r *PodResult) Succeeded() bool // Returns true if all containers exited with code 0
func (r *PodResult) Logs(ktx *ExecutionContext, opts *LogOptions) (string, error)
func (r *PodResult) LogErrorSummary(ktx *ExecutionContext) (string, error)
Status Updates
SetMessage
func SetMessage(ktx *ExecutionContext, message string) error
Sets a message on the Execution’s status. Displayed in the UI and CLI. Messages can be overwritten by subsequent calls.
kontinue.SetMessage(ktx, "Deploying to production cluster")
SetProgress
func SetProgress(ktx *ExecutionContext, current, max int) error
Updates the progress indicator (e.g., step 2 of 5). Displayed as a progress bar in the UI and as a fraction in the CLI.
kontinue.SetProgress(ktx, 2, 5)
ClearProgress
func ClearProgress(ktx *ExecutionContext) error
Removes the progress indicator from the execution status.
Step Options
StepOptions
type StepOptions struct {
Name string // Explicit step name (must be unique, k8s-compatible)
GenerateName string // Prefix for generated name (combined with counter)
DisplayName string // Human-readable name for UI/CLI display
Description string // Description shown in UI details panel
}
Used by all SDK operations to control naming and metadata of child resources.
| Field | Description |
|---|---|
Name | Explicit step name. Must be unique within the execution and conform to Kubernetes naming rules. |
GenerateName | Prefix for auto-generated names. Combined with a deterministic counter. |
DisplayName | Human-readable name shown in UI and CLI instead of the resource name. |
Description | Detailed description shown in the UI details panel. |
If neither Name nor GenerateName is provided, a default name is generated.
Type Aliases
Convenience aliases for API types:
type RetryOptions = kontinuev1alpha1.ExecutionRetry
type TimeoutOptions = kontinuev1alpha1.ExecutionTimeout
See Automatic Retries and Timeouts for field documentation.
Errors
Error Type
type Error struct {
Kind ErrorKind // Error category
Child string // Child resource name (if applicable)
Message string // Human-readable message
Retryable bool // Whether operation can be retried
Cause error // Underlying error
}
Error Kinds
| Kind | Description | Retryable |
|---|---|---|
ErrKindInfrastructure | Kubernetes API error | Yes |
ErrKindValidation | Invalid options or state | No |
ErrKindChildFailed | Child execution failed | Yes |
ErrKindChildCanceled | Child execution was canceled | Yes |
ErrKindTimeout | Operation timed out | Depends |
ErrKindInternal | Internal SDK error | No |
ErrKindExecution | User execution error | Configurable |
Sentinel Errors
var (
ErrDuplicateStep // Step name used more than once
ErrChildFailed // Child execution failed
ErrChildCanceled // Child execution canceled
ErrJobFailed // Job failed
ErrExecution // Execution failed
ErrOverallTimeout // Overall timeout exceeded (not retryable)
ErrAttemptTimeout // Attempt timeout exceeded (retryable)
)
Use with errors.Is:
if errors.Is(err, kontinue.ErrChildFailed) {
// Handle child failure
}
Error Helpers
NewNonRetryableError
func NewNonRetryableError(cause error, message string) *Error
Creates a non-retryable error. Use to signal that an error should not trigger a retry.
return kontinue.NewNonRetryableError(err, "invalid configuration")
NewRetryableError
func NewRetryableError(cause error, message string) *Error
Creates a retryable error. Use to explicitly mark an error as retryable.
return kontinue.NewRetryableError(err, "temporary service unavailable")
IsRetryableError
func IsRetryableError(err error) bool
Returns true if the error is retryable. For non-kontinue errors, defaults to true.
GetErrorKind
func GetErrorKind(err error) ErrorKind
Returns the error kind, or empty string if not a kontinue Error.