Client Library

The client package (gitlab.com/kontinue/kontinue/pkg/client) provides a high-level Go API for interacting with kontinue resources from outside a worker. Use this library to spawn executions, monitor progress, and manage workflows programmatically.

Creating a Client

New

func New() (*Client, error)

Creates a new Client using the default Kubernetes configuration. Reads kubeconfig from the default location and uses the current context’s namespace.

cli, err := client.New()
if err != nil {
    log.Fatal(err)
}

NewWithOptions

func NewWithOptions(opts Options) (*Client, error)

Creates a new Client with custom options.

cli, err := client.NewWithOptions(client.Options{
    Impersonate: "cluster-admin",
    QPS:         50,
    Burst:       100,
})

NewFromKubeClient

func NewFromKubeClient(k8sClient client.WithWatch, namespace string) *Client

Creates a Client from an existing controller-runtime client. Useful for integrating with existing Kubernetes controllers.

Options

type Options struct {
    Impersonate string  // Username to impersonate for requests
    QPS         float32 // Max queries per second (default: 5)
    Burst       int     // Max burst for throttling (default: 10)
}

Client Methods

Namespace

func (c *Client) Namespace() string

Returns the default namespace for this client.

Context

func (c *Client) Context() context.Context

Returns the context for this client.

WithContext

func (c *Client) WithContext(ctx context.Context) *Client

Returns a new client with the given context. Useful for setting deadlines or cancellation.

WithNamespace

func (c *Client) WithNamespace(namespace string) *Client

Returns a new client with the given namespace as the default.

KubeClient

func (c *Client) KubeClient() client.WithWatch

Returns the underlying controller-runtime client for advanced use cases.


Types

FunctionName

type FunctionName string

A type alias for function names to improve type safety. String literals are automatically converted.


Spawning Executions

Spawn

func (c *Client) Spawn(ctx context.Context, function FunctionName, args any, opts *SpawnOptions) (*Execution, error)

Creates a new execution for the given function. Arguments are merged with function defaults (provided arguments take precedence).

exec, err := cli.Spawn(ctx, "deploy-cluster", &DeployArgs{
    Environment: "production",
}, &client.SpawnOptions{
    Retry: &kontinuev1alpha1.ExecutionRetry{
        Retries: 3,
    },
    TTL: &kontinuev1alpha1.ExecutionTTL{
        DeleteAfter: metav1.Duration{Duration: 24 * time.Hour},
    },
})

SpawnOptions

type SpawnOptions struct {
    Name      string                             // Explicit name (auto-generated if empty)
    Namespace string                             // Override namespace
    Retry     *kontinuev1alpha1.ExecutionRetry   // Retry configuration
    TTL       *kontinuev1alpha1.ExecutionTTL     // TTL/archiving configuration
    Timeout   *kontinuev1alpha1.ExecutionTimeout // Timeout configuration
    Mutex     *kontinuev1alpha1.ExecutionMutex   // Mutex for concurrency control
}

Querying Executions

Get

func (c *Client) Get(ctx context.Context, name string) (*Execution, error)

Retrieves an execution by name from the default namespace. Falls back to ArchivedExecution if the live execution is not found.

GetNamespaced

func (c *Client) GetNamespaced(ctx context.Context, namespace, name string) (*Execution, error)

Retrieves an execution by name from a specific namespace. Falls back to ArchivedExecution if the live execution is not found.

GetExecution

func (c *Client) GetExecution(ctx context.Context, name string) (*Execution, error)

Alias for Get for consistency with other methods.

List

func (c *Client) List(ctx context.Context, opts *ListOptions) ([]*Execution, error)

Retrieves executions from the default namespace with optional filters. Automatically includes matching archived executions.

execs, err := cli.List(ctx, &client.ListOptions{
    Function: "deploy-cluster",
    Phases:   []kontinuev1alpha1.ExecutionPhase{kontinuev1alpha1.ExecutionPhaseExecuting},
    OnlyRoot: true,
})

ListNamespaced

func (c *Client) ListNamespaced(ctx context.Context, namespace string, opts *ListOptions) ([]*Execution, error)

Retrieves executions from a specific namespace with optional filters.

GetChildExecutions

func (c *Client) GetChildExecutions(ctx context.Context, parent *Execution) ([]*Execution, error)

Retrieves direct child executions of a given execution.

GetChildExecutionsNamespaced

func (c *Client) GetChildExecutionsNamespaced(ctx context.Context, namespace, parentName string) ([]*Execution, error)

Retrieves direct child executions by parent name in a specific namespace.

ListOptions

type ListOptions struct {
    LabelSelector client.MatchingLabels            // Filter by labels
    Function      string                           // Filter by function name
    Root          string                           // Filter by root execution label
    Schedule      string                           // Filter by scheduled execution label
    WorkerGroup   string                           // Filter by worker group
    Phases        []kontinuev1alpha1.ExecutionPhase // Filter by phases
    OnlyRoot      bool                             // Only root executions
}

Waiting for Completion

WaitForExecution

func (c *Client) WaitForExecution(ctx context.Context, execution *Execution) (*Execution, error)

Waits for an execution to enter a terminal state (Completed, Failed, Canceled, or Skipped). Uses a Kubernetes watch for efficient waiting.

exec, err := cli.Spawn(ctx, "my-function", args, nil)
if err != nil {
    return err
}

completed, err := cli.WaitForExecution(ctx, exec)
if err != nil {
    return err
}

if completed.Status.Phase == kontinuev1alpha1.ExecutionPhaseCompleted {
    fmt.Println("Success!")
}

ParseResult

func ParseResult[T any](execution *Execution) (*T, error)

Extracts and deserializes the result from a completed Execution. Returns an error if the execution failed, was canceled, or the result cannot be deserialized.

type DeployResult struct {
    ClusterID string `json:"clusterId"`
    Endpoint  string `json:"endpoint"`
}

result, err := client.ParseResult[DeployResult](completed)
if err != nil {
    return err
}
fmt.Printf("Deployed to %s\n", result.Endpoint)

Canceling Executions

Cancel

func (c *Client) Cancel(ctx context.Context, execution *Execution, opts *CancelOptions) error

Cancels a running execution. The execution must not already be in a terminal state.

err := cli.Cancel(ctx, exec, &client.CancelOptions{
    Message: "Deployment aborted by user",
})

CancelOptions

type CancelOptions struct {
    Message string // Cancellation message (default: "Canceled by client")
}

Skipping Executions

Skip

func (c *Client) Skip(ctx context.Context, execution *Execution, opts *SkipOptions) error

Skips a running execution, returning the specified result to any waiting parent. Use this to bypass an execution while still providing a result.

err := cli.Skip(ctx, exec, &client.SkipOptions{
    Message: "Skipped due to maintenance window",
    Result:  &MyResult{Status: "skipped"},
})

SkipOptions

type SkipOptions struct {
    Message string // Skip message (default: "Skipped by client")
    Result  any    // Result to return to parent (nil for zero-value)
}

Retrying Executions

RetryExecution

func (c *Client) RetryExecution(ctx context.Context, execution *Execution, opts *RetryOptions) error

Retries a failed or canceled execution by incrementing its retry count. The execution must be in a Failed or Canceled state.

err := cli.RetryExecution(ctx, exec, &client.RetryOptions{
    ChildPolicy: kontinuev1alpha1.ChildPolicyRetryFailed,
})

RetryOptions

type RetryOptions struct {
    ChildPolicy kontinuev1alpha1.ChildPolicy // How to handle child resources on retry
}

Child policies:

  • ChildPolicyNone - Reuse existing children
  • ChildPolicyRetryFailed - Only retry failed/canceled children
  • ChildPolicyRetryAll - Retry all children

Resuming Suspensions

GetSuspension

func (c *Client) GetSuspension(ctx context.Context, name string) (*Suspension, error)

Retrieves a suspension by name from the default namespace.

GetSuspensionNamespaced

func (c *Client) GetSuspensionNamespaced(ctx context.Context, namespace, name string) (*Suspension, error)

Retrieves a suspension by name from a specific namespace.

ResumeSuspension

func (c *Client) ResumeSuspension(ctx context.Context, suspension *Suspension) error

Resumes a suspended Suspension by updating its phase to Completed.

ResumeExecution

func (c *Client) ResumeExecution(ctx context.Context, execution *Execution, opts *ResumeOptions) error

Resumes an execution by resuming its suspended children.

// Resume direct children only
err := cli.ResumeExecution(ctx, exec, &client.ResumeOptions{})

// Resume recursively (all descendants)
err := cli.ResumeExecution(ctx, exec, &client.ResumeOptions{
    Recursive: true,
})

ResumeOptions

type ResumeOptions struct {
    Recursive bool // Recursively resume all suspensions in the tree
}

Functions

Function

func (c *Client) Function(ctx context.Context, name FunctionName) (*Function, error)

Retrieves a specific function by name from the default namespace.

FunctionNamespaced

func (c *Client) FunctionNamespaced(ctx context.Context, namespace string, name FunctionName) (*Function, error)

Retrieves a specific function by name from a specific namespace.

Functions

func (c *Client) Functions(ctx context.Context, opts *ListFunctionsOptions) ([]Function, error)

Retrieves all available functions from the default namespace. By default, excludes functions with the kontinue.cloud/hidden annotation.

functions, err := cli.Functions(ctx, &client.ListFunctionsOptions{
    WorkerGroup:   "default",
    IncludeHidden: false,
})

FunctionsNamespaced

func (c *Client) FunctionsNamespaced(ctx context.Context, namespace string, opts *ListFunctionsOptions) ([]Function, error)

Retrieves all available functions from a specific namespace.

ListFunctionsOptions

type ListFunctionsOptions struct {
    IncludeHidden bool   // Include functions marked as hidden
    WorkerGroup   string // Filter by worker group
}

Scheduled Executions

GetScheduledExecution

func (c *Client) GetScheduledExecution(ctx context.Context, name string) (*ScheduledExecution, error)

Retrieves a scheduled execution by name from the default namespace.

GetScheduledExecutionNamespaced

func (c *Client) GetScheduledExecutionNamespaced(ctx context.Context, namespace, name string) (*ScheduledExecution, error)

Retrieves a scheduled execution by name from a specific namespace.

ListScheduledExecutions

func (c *Client) ListScheduledExecutions(ctx context.Context, opts *ScheduleListOptions) ([]ScheduledExecution, error)

Retrieves scheduled executions from the default namespace with optional filters.

ListScheduledExecutionsNamespaced

func (c *Client) ListScheduledExecutionsNamespaced(ctx context.Context, namespace string, opts *ScheduleListOptions) ([]ScheduledExecution, error)

Retrieves scheduled executions from a specific namespace.

SuspendScheduledExecution

func (c *Client) SuspendScheduledExecution(ctx context.Context, sched *ScheduledExecution) error

Suspends a scheduled execution, preventing new executions from being created.

ResumeScheduledExecution

func (c *Client) ResumeScheduledExecution(ctx context.Context, sched *ScheduledExecution) error

Resumes a suspended scheduled execution.

ScheduleListOptions

type ScheduleListOptions struct {
    Function      FunctionName
    LabelSelector map[string]string
}

Jobs

GetJob

func (c *Client) GetJob(ctx context.Context, name string) (*batchv1.Job, error)

Retrieves a Job by name from the default namespace.

GetJobNamespaced

func (c *Client) GetJobNamespaced(ctx context.Context, namespace, name string) (*batchv1.Job, error)

Retrieves a Job by name from a specific namespace.

GetJobLogs

func (c *Client) GetJobLogs(ctx context.Context, jobName string, opts *corev1.PodLogOptions) (io.ReadCloser, error)

Returns an io.ReadCloser for the logs of a job’s pods. If there are multiple pods, logs from the first pod are returned. The caller must close the returned reader.

logs, err := cli.GetJobLogs(ctx, "my-job", &corev1.PodLogOptions{
    Follow: true,
})
if err != nil {
    return err
}
defer logs.Close()

io.Copy(os.Stdout, logs)

GetJobLogsNamespaced

func (c *Client) GetJobLogsNamespaced(ctx context.Context, namespace, jobName string, opts *corev1.PodLogOptions) (io.ReadCloser, error)

Returns job logs from a specific namespace.


Execution Trees

GetTree

func (c *Client) GetTree(ctx context.Context, executionName string) (*ExecutionTree, error)

Retrieves the execution tree for a given execution, including all child executions, suspensions, and jobs. Falls back to ArchivedExecution if the live execution is not found.

tree, err := cli.GetTree(ctx, "my-execution")
if err != nil {
    return err
}

// Walk the tree
var walk func(node *client.ExecutionTreeNode, depth int)
walk = func(node *client.ExecutionTreeNode, depth int) {
    fmt.Printf("%s%s (%s): %s\n",
        strings.Repeat("  ", depth),
        node.Name,
        node.Kind,
        node.Phase)
    for _, child := range node.Children {
        walk(child, depth+1)
    }
}
walk(tree.Root(), 0)

GetTreeNamespaced

func (c *Client) GetTreeNamespaced(ctx context.Context, namespace, executionName string) (*ExecutionTree, error)

Retrieves the execution tree from a specific namespace.

WatchTree

func (c *Client) WatchTree(ctx context.Context, executionName string) (*TreeWatcher, error)

Starts watching the execution tree for changes. Returns updates whenever any resource in the tree is modified.

watcher, err := cli.WatchTree(ctx, "my-execution")
if err != nil {
    return err
}
defer watcher.Stop()

for {
    select {
    case tree := <-watcher.Updates():
        fmt.Printf("Tree updated: %s is %s\n",
            tree.Root().Name,
            tree.Root().Phase)
    case err := <-watcher.Errors():
        log.Printf("Watch error: %v", err)
    case <-ctx.Done():
        return nil
    }
}

WatchTreeNamespaced

func (c *Client) WatchTreeNamespaced(ctx context.Context, namespace, executionName string) (*TreeWatcher, error)

Starts watching the execution tree in a specific namespace.

ExecutionTree

type ExecutionTree struct {
    // ...
}

func (t *ExecutionTree) Root() *ExecutionTreeNode
func (t *ExecutionTree) IsArchived() bool
MethodDescription
Root()Returns the root node of the tree
IsArchived()Returns true if the execution is from an ArchivedExecution

ExecutionTreeNode

type ExecutionTreeNode struct {
    Name          string
    Namespace     string
    CreatedAt     time.Time
    StartedAt     *time.Time
    FinishedAt    *time.Time
    Kind          string    // "Execution", "Suspension", "Job", "Attempt"
    Phase         string
    Message       string
    AttemptNumber int       // Set for Attempt nodes
    Children      []*ExecutionTreeNode
}

Node kind constants:

  • NodeKindExecution - Execution resource
  • NodeKindSuspension - Suspension resource
  • NodeKindJob - Kubernetes Job
  • NodeKindAttempt - Previous execution attempt

Node accessor methods:

func (n *ExecutionTreeNode) Execution() *Execution
func (n *ExecutionTreeNode) Suspension() *Suspension
func (n *ExecutionTreeNode) Job() *batchv1.Job
func (n *ExecutionTreeNode) Attempt() *ExecutionAttempt

Each accessor returns the underlying resource if the node is of that kind, otherwise nil.

TreeWatcher

type TreeWatcher struct { ... }

func (tw *TreeWatcher) Updates() <-chan *ExecutionTree
func (tw *TreeWatcher) Errors() <-chan error
func (tw *TreeWatcher) Stop()
MethodDescription
Updates()Returns channel that receives tree updates
Errors()Returns channel that receives errors
Stop()Stops the watcher and closes channels