Client Library
The client package (gitlab.com/kontinue/kontinue/pkg/client) provides a high-level Go API
for interacting with kontinue resources from outside a worker. Use this library to spawn
executions, monitor progress, and manage workflows programmatically.
Creating a Client
New
func New() (*Client, error)
Creates a new Client using the default Kubernetes configuration. Reads kubeconfig from the default location and uses the current context’s namespace.
cli, err := client.New()
if err != nil {
log.Fatal(err)
}
NewWithOptions
func NewWithOptions(opts Options) (*Client, error)
Creates a new Client with custom options.
cli, err := client.NewWithOptions(client.Options{
Impersonate: "cluster-admin",
QPS: 50,
Burst: 100,
})
NewFromKubeClient
func NewFromKubeClient(k8sClient client.WithWatch, namespace string) *Client
Creates a Client from an existing controller-runtime client. Useful for integrating with existing Kubernetes controllers.
Options
type Options struct {
Impersonate string // Username to impersonate for requests
QPS float32 // Max queries per second (default: 5)
Burst int // Max burst for throttling (default: 10)
}
Client Methods
Namespace
func (c *Client) Namespace() string
Returns the default namespace for this client.
Context
func (c *Client) Context() context.Context
Returns the context for this client.
WithContext
func (c *Client) WithContext(ctx context.Context) *Client
Returns a new client with the given context. Useful for setting deadlines or cancellation.
WithNamespace
func (c *Client) WithNamespace(namespace string) *Client
Returns a new client with the given namespace as the default.
KubeClient
func (c *Client) KubeClient() client.WithWatch
Returns the underlying controller-runtime client for advanced use cases.
Types
FunctionName
type FunctionName string
A type alias for function names to improve type safety. String literals are automatically converted.
Spawning Executions
Spawn
func (c *Client) Spawn(ctx context.Context, function FunctionName, args any, opts *SpawnOptions) (*Execution, error)
Creates a new execution for the given function. Arguments are merged with function defaults (provided arguments take precedence).
exec, err := cli.Spawn(ctx, "deploy-cluster", &DeployArgs{
Environment: "production",
}, &client.SpawnOptions{
Retry: &kontinuev1alpha1.ExecutionRetry{
Retries: 3,
},
TTL: &kontinuev1alpha1.ExecutionTTL{
DeleteAfter: metav1.Duration{Duration: 24 * time.Hour},
},
})
SpawnOptions
type SpawnOptions struct {
Name string // Explicit name (auto-generated if empty)
Namespace string // Override namespace
Retry *kontinuev1alpha1.ExecutionRetry // Retry configuration
TTL *kontinuev1alpha1.ExecutionTTL // TTL/archiving configuration
Timeout *kontinuev1alpha1.ExecutionTimeout // Timeout configuration
Mutex *kontinuev1alpha1.ExecutionMutex // Mutex for concurrency control
}
Querying Executions
Get
func (c *Client) Get(ctx context.Context, name string) (*Execution, error)
Retrieves an execution by name from the default namespace. Falls back to ArchivedExecution if the live execution is not found.
GetNamespaced
func (c *Client) GetNamespaced(ctx context.Context, namespace, name string) (*Execution, error)
Retrieves an execution by name from a specific namespace. Falls back to ArchivedExecution if the live execution is not found.
GetExecution
func (c *Client) GetExecution(ctx context.Context, name string) (*Execution, error)
Alias for Get for consistency with other methods.
List
func (c *Client) List(ctx context.Context, opts *ListOptions) ([]*Execution, error)
Retrieves executions from the default namespace with optional filters. Automatically includes matching archived executions.
execs, err := cli.List(ctx, &client.ListOptions{
Function: "deploy-cluster",
Phases: []kontinuev1alpha1.ExecutionPhase{kontinuev1alpha1.ExecutionPhaseExecuting},
OnlyRoot: true,
})
ListNamespaced
func (c *Client) ListNamespaced(ctx context.Context, namespace string, opts *ListOptions) ([]*Execution, error)
Retrieves executions from a specific namespace with optional filters.
GetChildExecutions
func (c *Client) GetChildExecutions(ctx context.Context, parent *Execution) ([]*Execution, error)
Retrieves direct child executions of a given execution.
GetChildExecutionsNamespaced
func (c *Client) GetChildExecutionsNamespaced(ctx context.Context, namespace, parentName string) ([]*Execution, error)
Retrieves direct child executions by parent name in a specific namespace.
ListOptions
type ListOptions struct {
LabelSelector client.MatchingLabels // Filter by labels
Function string // Filter by function name
Root string // Filter by root execution label
Schedule string // Filter by scheduled execution label
WorkerGroup string // Filter by worker group
Phases []kontinuev1alpha1.ExecutionPhase // Filter by phases
OnlyRoot bool // Only root executions
}
Waiting for Completion
WaitForExecution
func (c *Client) WaitForExecution(ctx context.Context, execution *Execution) (*Execution, error)
Waits for an execution to enter a terminal state (Completed, Failed, Canceled, or Skipped). Uses a Kubernetes watch for efficient waiting.
exec, err := cli.Spawn(ctx, "my-function", args, nil)
if err != nil {
return err
}
completed, err := cli.WaitForExecution(ctx, exec)
if err != nil {
return err
}
if completed.Status.Phase == kontinuev1alpha1.ExecutionPhaseCompleted {
fmt.Println("Success!")
}
ParseResult
func ParseResult[T any](execution *Execution) (*T, error)
Extracts and deserializes the result from a completed Execution. Returns an error if the execution failed, was canceled, or the result cannot be deserialized.
type DeployResult struct {
ClusterID string `json:"clusterId"`
Endpoint string `json:"endpoint"`
}
result, err := client.ParseResult[DeployResult](completed)
if err != nil {
return err
}
fmt.Printf("Deployed to %s\n", result.Endpoint)
Canceling Executions
Cancel
func (c *Client) Cancel(ctx context.Context, execution *Execution, opts *CancelOptions) error
Cancels a running execution. The execution must not already be in a terminal state.
err := cli.Cancel(ctx, exec, &client.CancelOptions{
Message: "Deployment aborted by user",
})
CancelOptions
type CancelOptions struct {
Message string // Cancellation message (default: "Canceled by client")
}
Skipping Executions
Skip
func (c *Client) Skip(ctx context.Context, execution *Execution, opts *SkipOptions) error
Skips a running execution, returning the specified result to any waiting parent. Use this to bypass an execution while still providing a result.
err := cli.Skip(ctx, exec, &client.SkipOptions{
Message: "Skipped due to maintenance window",
Result: &MyResult{Status: "skipped"},
})
SkipOptions
type SkipOptions struct {
Message string // Skip message (default: "Skipped by client")
Result any // Result to return to parent (nil for zero-value)
}
Retrying Executions
RetryExecution
func (c *Client) RetryExecution(ctx context.Context, execution *Execution, opts *RetryOptions) error
Retries a failed or canceled execution by incrementing its retry count. The execution must be in a Failed or Canceled state.
err := cli.RetryExecution(ctx, exec, &client.RetryOptions{
ChildPolicy: kontinuev1alpha1.ChildPolicyRetryFailed,
})
RetryOptions
type RetryOptions struct {
ChildPolicy kontinuev1alpha1.ChildPolicy // How to handle child resources on retry
}
Child policies:
ChildPolicyNone- Reuse existing childrenChildPolicyRetryFailed- Only retry failed/canceled childrenChildPolicyRetryAll- Retry all children
Resuming Suspensions
GetSuspension
func (c *Client) GetSuspension(ctx context.Context, name string) (*Suspension, error)
Retrieves a suspension by name from the default namespace.
GetSuspensionNamespaced
func (c *Client) GetSuspensionNamespaced(ctx context.Context, namespace, name string) (*Suspension, error)
Retrieves a suspension by name from a specific namespace.
ResumeSuspension
func (c *Client) ResumeSuspension(ctx context.Context, suspension *Suspension) error
Resumes a suspended Suspension by updating its phase to Completed.
ResumeExecution
func (c *Client) ResumeExecution(ctx context.Context, execution *Execution, opts *ResumeOptions) error
Resumes an execution by resuming its suspended children.
// Resume direct children only
err := cli.ResumeExecution(ctx, exec, &client.ResumeOptions{})
// Resume recursively (all descendants)
err := cli.ResumeExecution(ctx, exec, &client.ResumeOptions{
Recursive: true,
})
ResumeOptions
type ResumeOptions struct {
Recursive bool // Recursively resume all suspensions in the tree
}
Functions
Function
func (c *Client) Function(ctx context.Context, name FunctionName) (*Function, error)
Retrieves a specific function by name from the default namespace.
FunctionNamespaced
func (c *Client) FunctionNamespaced(ctx context.Context, namespace string, name FunctionName) (*Function, error)
Retrieves a specific function by name from a specific namespace.
Functions
func (c *Client) Functions(ctx context.Context, opts *ListFunctionsOptions) ([]Function, error)
Retrieves all available functions from the default namespace.
By default, excludes functions with the kontinue.cloud/hidden annotation.
functions, err := cli.Functions(ctx, &client.ListFunctionsOptions{
WorkerGroup: "default",
IncludeHidden: false,
})
FunctionsNamespaced
func (c *Client) FunctionsNamespaced(ctx context.Context, namespace string, opts *ListFunctionsOptions) ([]Function, error)
Retrieves all available functions from a specific namespace.
ListFunctionsOptions
type ListFunctionsOptions struct {
IncludeHidden bool // Include functions marked as hidden
WorkerGroup string // Filter by worker group
}
Scheduled Executions
GetScheduledExecution
func (c *Client) GetScheduledExecution(ctx context.Context, name string) (*ScheduledExecution, error)
Retrieves a scheduled execution by name from the default namespace.
GetScheduledExecutionNamespaced
func (c *Client) GetScheduledExecutionNamespaced(ctx context.Context, namespace, name string) (*ScheduledExecution, error)
Retrieves a scheduled execution by name from a specific namespace.
ListScheduledExecutions
func (c *Client) ListScheduledExecutions(ctx context.Context, opts *ScheduleListOptions) ([]ScheduledExecution, error)
Retrieves scheduled executions from the default namespace with optional filters.
ListScheduledExecutionsNamespaced
func (c *Client) ListScheduledExecutionsNamespaced(ctx context.Context, namespace string, opts *ScheduleListOptions) ([]ScheduledExecution, error)
Retrieves scheduled executions from a specific namespace.
SuspendScheduledExecution
func (c *Client) SuspendScheduledExecution(ctx context.Context, sched *ScheduledExecution) error
Suspends a scheduled execution, preventing new executions from being created.
ResumeScheduledExecution
func (c *Client) ResumeScheduledExecution(ctx context.Context, sched *ScheduledExecution) error
Resumes a suspended scheduled execution.
ScheduleListOptions
type ScheduleListOptions struct {
Function FunctionName
LabelSelector map[string]string
}
Jobs
GetJob
func (c *Client) GetJob(ctx context.Context, name string) (*batchv1.Job, error)
Retrieves a Job by name from the default namespace.
GetJobNamespaced
func (c *Client) GetJobNamespaced(ctx context.Context, namespace, name string) (*batchv1.Job, error)
Retrieves a Job by name from a specific namespace.
GetJobLogs
func (c *Client) GetJobLogs(ctx context.Context, jobName string, opts *corev1.PodLogOptions) (io.ReadCloser, error)
Returns an io.ReadCloser for the logs of a job’s pods. If there are multiple pods,
logs from the first pod are returned. The caller must close the returned reader.
logs, err := cli.GetJobLogs(ctx, "my-job", &corev1.PodLogOptions{
Follow: true,
})
if err != nil {
return err
}
defer logs.Close()
io.Copy(os.Stdout, logs)
GetJobLogsNamespaced
func (c *Client) GetJobLogsNamespaced(ctx context.Context, namespace, jobName string, opts *corev1.PodLogOptions) (io.ReadCloser, error)
Returns job logs from a specific namespace.
Execution Trees
GetTree
func (c *Client) GetTree(ctx context.Context, executionName string) (*ExecutionTree, error)
Retrieves the execution tree for a given execution, including all child executions, suspensions, and jobs. Falls back to ArchivedExecution if the live execution is not found.
tree, err := cli.GetTree(ctx, "my-execution")
if err != nil {
return err
}
// Walk the tree
var walk func(node *client.ExecutionTreeNode, depth int)
walk = func(node *client.ExecutionTreeNode, depth int) {
fmt.Printf("%s%s (%s): %s\n",
strings.Repeat(" ", depth),
node.Name,
node.Kind,
node.Phase)
for _, child := range node.Children {
walk(child, depth+1)
}
}
walk(tree.Root(), 0)
GetTreeNamespaced
func (c *Client) GetTreeNamespaced(ctx context.Context, namespace, executionName string) (*ExecutionTree, error)
Retrieves the execution tree from a specific namespace.
WatchTree
func (c *Client) WatchTree(ctx context.Context, executionName string) (*TreeWatcher, error)
Starts watching the execution tree for changes. Returns updates whenever any resource in the tree is modified.
watcher, err := cli.WatchTree(ctx, "my-execution")
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case tree := <-watcher.Updates():
fmt.Printf("Tree updated: %s is %s\n",
tree.Root().Name,
tree.Root().Phase)
case err := <-watcher.Errors():
log.Printf("Watch error: %v", err)
case <-ctx.Done():
return nil
}
}
WatchTreeNamespaced
func (c *Client) WatchTreeNamespaced(ctx context.Context, namespace, executionName string) (*TreeWatcher, error)
Starts watching the execution tree in a specific namespace.
ExecutionTree
type ExecutionTree struct {
// ...
}
func (t *ExecutionTree) Root() *ExecutionTreeNode
func (t *ExecutionTree) IsArchived() bool
| Method | Description |
|---|---|
Root() | Returns the root node of the tree |
IsArchived() | Returns true if the execution is from an ArchivedExecution |
ExecutionTreeNode
type ExecutionTreeNode struct {
Name string
Namespace string
CreatedAt time.Time
StartedAt *time.Time
FinishedAt *time.Time
Kind string // "Execution", "Suspension", "Job", "Attempt"
Phase string
Message string
AttemptNumber int // Set for Attempt nodes
Children []*ExecutionTreeNode
}
Node kind constants:
NodeKindExecution- Execution resourceNodeKindSuspension- Suspension resourceNodeKindJob- Kubernetes JobNodeKindAttempt- Previous execution attempt
Node accessor methods:
func (n *ExecutionTreeNode) Execution() *Execution
func (n *ExecutionTreeNode) Suspension() *Suspension
func (n *ExecutionTreeNode) Job() *batchv1.Job
func (n *ExecutionTreeNode) Attempt() *ExecutionAttempt
Each accessor returns the underlying resource if the node is of that kind, otherwise nil.
TreeWatcher
type TreeWatcher struct { ... }
func (tw *TreeWatcher) Updates() <-chan *ExecutionTree
func (tw *TreeWatcher) Errors() <-chan error
func (tw *TreeWatcher) Stop()
| Method | Description |
|---|---|
Updates() | Returns channel that receives tree updates |
Errors() | Returns channel that receives errors |
Stop() | Stops the watcher and closes channels |