Execution SDK

The kontinue package (gitlab.com/kontinue/kontinue/pkg/kontinue) provides the Go SDK for writing durable functions. This reference documents all public types and functions.

ExecutionContext

The ExecutionContext is passed to every function and provides access to the execution runtime and state management APIs.

func MyWorkflow(ktx *kontinue.ExecutionContext, args *MyArgs) (*MyResult, error) {
    // Use ktx to access SDK functions
}

Methods

Context

func (ec *ExecutionContext) Context() context.Context

Returns the underlying Go context for use with external libraries.

Execution

func (ec *ExecutionContext) Execution() *kontinuev1alpha1.Execution

Returns the current Execution resource. Useful for accessing metadata like name, namespace, or labels.

Attempts

func (ec *ExecutionContext) Attempts() int

Returns the number of previous execution attempts, not including the currently running attempt. Use this to implement attempt-specific logic.

Resumptions

func (ec *ExecutionContext) Resumptions() int

Returns the number of times this execution was resumed after the initial start, inclusive of retries. A resumption occurs when a different worker takes over an already-started execution.


Child Executions

FunctionName

type FunctionName string

A type alias for function names to improve type safety. String literals are automatically converted.

Execute

func Execute[Result any](ktx *ExecutionContext, function FunctionName, args any, opts *ExecuteOptions) (*Result, error)

Starts a child execution and waits for it to complete. Provides compile-time type checking for the result type.

result, err := kontinue.Execute[DeployResult](ktx, "deploy-service", &DeployArgs{
    Service: "api",
}, nil)

With options:

result, err := kontinue.Execute[DeployResult](ktx, "deploy-service", &DeployArgs{
    Service: "api",
}, &kontinue.ExecuteOptions{
    StepOptions: kontinue.StepOptions{
        DisplayName: "Deploy API Service",
        Description: "Deploys the API service to the cluster",
    },
    Retry:   &kontinue.RetryOptions{Retries: 3},
    Timeout: &kontinue.TimeoutOptions{Attempt: &metav1.Duration{Duration: 5 * time.Minute}},
})

SpawnExecution

func SpawnExecution(ktx *ExecutionContext, function FunctionName, args any, opts *ExecuteOptions) (*ChildExecution, error)

Starts a child Execution without waiting for it to complete. This is idempotent — if called again during replay, it returns a reference to the existing child execution.

Use with WaitExecution for advanced patterns like spawning multiple children before waiting.

WaitExecution

func WaitExecution[Result any](ktx *ExecutionContext, child *ChildExecution) (*Result, error)

Waits for a spawned child Execution to complete and returns its result.

child, err := kontinue.SpawnExecution(ktx, "long-task", &Args{}, nil)
if err != nil {
    return nil, err
}
// Do other work...
result, err := kontinue.WaitExecution[TaskResult](ktx, child)

ExecuteParallel

func ExecuteParallel[Result any](ktx *ExecutionContext, specs []*ChildExecutionOptions) ([]*Result, error)

Starts multiple child executions and waits for all to complete in parallel. Returns results in the same order as input specs. Children are spawned sequentially but waited on in parallel. Updates Progress automatically if not already set.

results, err := kontinue.ExecuteParallel[TestResult](ktx, []*kontinue.ChildExecutionOptions{
    {Function: "test-unit", Arguments: &TestArgs{}},
    {Function: "test-integration", Arguments: &TestArgs{}},
    {Function: "test-e2e", Arguments: &TestArgs{}},
})

ExecuteSequential

func ExecuteSequential[Result any](ktx *ExecutionContext, specs []*ChildExecutionOptions) ([]*Result, error)

Starts multiple child executions and waits for each to complete before starting the next. Returns results in the same order as input specs. Updates Progress automatically if not already set.

ExecuteOptions

type ExecuteOptions struct {
    StepOptions                    // Naming and metadata options
    Retry   *RetryOptions          // Retry configuration (overrides function defaults)
    Timeout *TimeoutOptions        // Timeout configuration (overrides function defaults)
    Mutex   *ExecutionMutex        // Mutex configuration for concurrency control
}

Optional configuration for child executions. Pass nil when no special options are needed.

ChildExecutionOptions

type ChildExecutionOptions struct {
    Function  FunctionName     // Name of the function to execute
    Arguments any              // Arguments to pass to the function
    Options   *ExecuteOptions  // Optional configuration
}

Used with ExecuteParallel and ExecuteSequential to specify multiple child executions.

ChildExecution

type ChildExecution struct {
    Name      string // Name of the child execution
    Namespace string // Namespace of the child execution
}

Reference to a spawned child execution, returned by SpawnExecution.


State Management

Store

func Store[T any](ktx *ExecutionContext, fn func() (T, error)) (T, error)

Runs the provided function and caches its result on the Execution so that resumed attempts do not recompute the value. Use this to cache expensive results or ensure side-effects are deterministic.

orderID, err := kontinue.Store(ktx, func() (string, error) {
    return uuid.New().String(), nil
})

The function is only executed on the first run. On replay, the cached value is returned.

StoreNamed

func StoreNamed[T any](ktx *ExecutionContext, opts *StoreOptions, fn func() (T, error)) (T, error)

Functions as Store but allows specifying a custom step name via StoreOptions. Useful for avoiding collisions or making stored keys deterministic.

StoreOptions

type StoreOptions struct {
    StepOptions
}

Suspending

Sleep

func Sleep(ktx *ExecutionContext, duration time.Duration) error

Pauses the Execution for the specified duration. This is replay-safe — if the Execution is resumed after a failure, it will not re-sleep for the full duration.

Sleeps can be resumed early via the CLI (kontinue resume) or UI.

if err := kontinue.Sleep(ktx, 1*time.Hour); err != nil {
    return nil, err
}

SleepNamed

func SleepNamed(ktx *ExecutionContext, opts *SleepOptions) error

Functions as Sleep but allows specifying a custom step name and metadata.

err := kontinue.SleepNamed(ktx, &kontinue.SleepOptions{
    Duration: 5 * time.Minute,
    StepOptions: kontinue.StepOptions{
        DisplayName: "Wait for DNS Propagation",
    },
})

Suspend

func Suspend(ktx *ExecutionContext, opts *SuspendOptions) error

Creates a Suspension that must be manually resolved via the API, CLI, or UI. Use this for human approval gates or external trigger points.

// Wait for manual approval
if err := kontinue.Suspend(ktx, &kontinue.SuspendOptions{
    StepOptions: kontinue.StepOptions{
        DisplayName: "Approve Production Deploy",
    },
}); err != nil {
    return nil, err
}

SleepOptions

type SleepOptions struct {
    StepOptions
    Duration time.Duration
}

SuspendOptions

type SuspendOptions struct {
    StepOptions
}

Polling

PollUntil

func PollUntil(ktx *ExecutionContext, condition func() bool, interval time.Duration) error

Polls the condition function at regular intervals until it returns true. The completion is stored via Store() to avoid re-polling on replay.

err := kontinue.PollUntil(ktx, func() bool {
    return checkDeploymentReady()
}, 10*time.Second)

PollUntilNamed

func PollUntilNamed(ktx *ExecutionContext, condition func() bool, opts *PollOptions) error

Functions as PollUntil but allows specifying a custom step name.

PollOptions

type PollOptions struct {
    StepOptions
    Interval time.Duration // Time to wait between poll attempts
}

Jobs

RunJob

func RunJob(ktx *ExecutionContext, opts *JobOptions) (*JobResult, error)

Runs a Kubernetes Job and waits for it to complete. The job is tracked with labels and OwnerReferences like Suspensions and child Executions. Idempotent on replay. Respects the parent’s ChildPolicy for retry behavior. Returns a JobResult with information about all pods in the job.

RunPod

func RunPod(ktx *ExecutionContext, opts *PodOptions) (*PodResult, error)

Runs a single Pod via a Kubernetes Job and returns detailed pod information, including container exit codes and access to logs. Creates a Job with BackoffLimit: 0 to ensure exactly one pod is created.

result, err := kontinue.RunPod(ktx, &kontinue.PodOptions{
    Spec: corev1.PodSpec{
        Containers: []corev1.Container{{
            Name:    "task",
            Image:   "myapp:latest",
            Command: []string{"./run-task"},
        }},
    },
})
if !result.Succeeded() {
    summary, _ := result.LogErrorSummary(ktx)
    return nil, fmt.Errorf("task failed: %s", summary)
}

RunScript

func RunScript(ktx *ExecutionContext, opts *ScriptOptions) (*PodResult, error)

Convenience wrapper around RunPod for simple command execution.

result, err := kontinue.RunScript(ktx, &kontinue.ScriptOptions{
    Image:   "alpine:latest",
    Command: "echo 'Hello, World!'",
    StepOptions: kontinue.StepOptions{
        DisplayName: "Run Setup Script",
    },
})
if !result.Succeeded() {
    return nil, fmt.Errorf("script failed: %s", result.Message)
}

JobOptions

type JobOptions struct {
    StepOptions
    Spec             batchv1.JobSpec   // The job specification to run
    ExtraAnnotations map[string]string // Additional annotations
    ExtraLabels      map[string]string // Additional labels
}

PodOptions

type PodOptions struct {
    StepOptions
    Spec             corev1.PodSpec    // The pod specification to run
    ExtraAnnotations map[string]string // Additional annotations
    ExtraLabels      map[string]string // Additional labels
}

ScriptOptions

type ScriptOptions struct {
    StepOptions
    Image            string            // Container image to use
    Command          string            // Script/command to run
    Shell            []string          // Shell command (defaults to ["/bin/sh", "-c"])
    ExtraAnnotations map[string]string
    ExtraLabels      map[string]string
}

LogOptions

type LogOptions struct {
    TailLines int64  // Number of lines from end to fetch (default: 100)
    Container string // Container name (optional, defaults to first container)
}

JobResult

type JobResult struct {
    SucceededPods int32        // Number of pods that completed successfully
    FailedPods    int32        // Number of pods that failed
    Message       string       // Error or status message
    Pods          []*PodResult // Results for all pods created by the job
}

func (r *JobResult) Succeeded() bool // Returns SucceededPods > 0 && FailedPods == 0

PodResult

type PodResult struct {
    Pod     *corev1.Pod     // The completed Pod resource
    Message string          // Status message from pod conditions
    Phase   corev1.PodPhase // Pod phase (Succeeded, Failed, etc.)
}

func (r *PodResult) Succeeded() bool // Returns true if all containers exited with code 0
func (r *PodResult) Logs(ktx *ExecutionContext, opts *LogOptions) (string, error)
func (r *PodResult) LogErrorSummary(ktx *ExecutionContext) (string, error)

Status Updates

SetMessage

func SetMessage(ktx *ExecutionContext, message string) error

Sets a message on the Execution’s status. Displayed in the UI and CLI. Messages can be overwritten by subsequent calls.

kontinue.SetMessage(ktx, "Deploying to production cluster")

SetProgress

func SetProgress(ktx *ExecutionContext, current, max int) error

Updates the progress indicator (e.g., step 2 of 5). Displayed as a progress bar in the UI and as a fraction in the CLI.

kontinue.SetProgress(ktx, 2, 5)

ClearProgress

func ClearProgress(ktx *ExecutionContext) error

Removes the progress indicator from the execution status.


Step Options

StepOptions

type StepOptions struct {
    Name         string // Explicit step name (must be unique, k8s-compatible)
    GenerateName string // Prefix for generated name (combined with counter)
    DisplayName  string // Human-readable name for UI/CLI display
    Description  string // Description shown in UI details panel
}

Used by all SDK operations to control naming and metadata of child resources.

FieldDescription
NameExplicit step name. Must be unique within the execution and conform to Kubernetes naming rules.
GenerateNamePrefix for auto-generated names. Combined with a deterministic counter.
DisplayNameHuman-readable name shown in UI and CLI instead of the resource name.
DescriptionDetailed description shown in the UI details panel.

If neither Name nor GenerateName is provided, a default name is generated.


Type Aliases

Convenience aliases for API types:

type RetryOptions = kontinuev1alpha1.ExecutionRetry
type TimeoutOptions = kontinuev1alpha1.ExecutionTimeout

See Automatic Retries and Timeouts for field documentation.


Errors

Error Type

type Error struct {
    Kind      ErrorKind // Error category
    Child     string    // Child resource name (if applicable)
    Message   string    // Human-readable message
    Retryable bool      // Whether operation can be retried
    Cause     error     // Underlying error
}

Error Kinds

KindDescriptionRetryable
ErrKindInfrastructureKubernetes API errorYes
ErrKindValidationInvalid options or stateNo
ErrKindChildFailedChild execution failedYes
ErrKindChildCanceledChild execution was canceledYes
ErrKindTimeoutOperation timed outDepends
ErrKindInternalInternal SDK errorNo
ErrKindExecutionUser execution errorConfigurable

Sentinel Errors

var (
    ErrDuplicateStep  // Step name used more than once
    ErrChildFailed    // Child execution failed
    ErrChildCanceled  // Child execution canceled
    ErrJobFailed      // Job failed
    ErrExecution      // Execution failed
    ErrOverallTimeout // Overall timeout exceeded (not retryable)
    ErrAttemptTimeout // Attempt timeout exceeded (retryable)
)

Use with errors.Is:

if errors.Is(err, kontinue.ErrChildFailed) {
    // Handle child failure
}

Error Helpers

NewNonRetryableError

func NewNonRetryableError(cause error, message string) *Error

Creates a non-retryable error. Use to signal that an error should not trigger a retry.

return kontinue.NewNonRetryableError(err, "invalid configuration")

NewRetryableError

func NewRetryableError(cause error, message string) *Error

Creates a retryable error. Use to explicitly mark an error as retryable.

return kontinue.NewRetryableError(err, "temporary service unavailable")

IsRetryableError

func IsRetryableError(err error) bool

Returns true if the error is retryable. For non-kontinue errors, defaults to true.

GetErrorKind

func GetErrorKind(err error) ErrorKind

Returns the error kind, or empty string if not a kontinue Error.