Concurrency Controls
kontinue provides three mechanisms for controlling concurrency: Mutexes, Function Concurrency Limits, and Worker Concurrency Limits. These work together to provide fine-grained control over how executions run.
| Mechanism | Purpose |
|---|---|
| Mutexes | Ensure only one execution runs at a time for a named resource |
| Function Concurrency | Limit total concurrent executions of a specific function |
| Worker Concurrency | Global limit on total concurrent executions per worker |
Mutexes
Mutexes provide exclusive access to a named resource. Only one execution can hold a mutex at a time.
Via CLI
# Start an execution with a mutex
kontinue execute deploy --mutex deploy-prod --args '{"version": "1.2.3"}'
Via SDK
When spawning child executions, pass the mutex in ExecuteOptions:
result, err := kontinue.Execute[DeployResult](ktx, "deploy", &DeployArgs{
Env: "production",
}, &kontinue.ExecuteOptions{
Mutex: &kontinue.MutexOptions{
Name: "deploy-prod",
},
})
Via Client Library
When spawning executions externally:
_, err := client.Spawn(ctx, "deploy", &DeployArgs{Version: "1.2.3"}, &client.SpawnOptions{
Mutex: &kontinuev1alpha1.ExecutionMutex{
Name: "deploy-prod",
},
})
Common Use Cases
- Deployment Pipelines: Ensure only one deployment runs per environment
- Resource Management: Serialize access to shared resources
- Critical Sections: Protect operations that must not run concurrently
# Different environments can deploy concurrently
kontinue execute deploy --mutex deploy-prod
kontinue execute deploy --mutex deploy-staging
Function Concurrency Limits
Function concurrency limits restrict the total number of concurrent executions of a specific function across all workers in a worker group.
Configure limits when registering a function:
worker.RegisterFunction(w, "heavy-task", HeavyTask, &function.Options{
Description: "A resource-intensive task",
Concurrency: &function.Concurrency{
Limit: 5, // Maximum 5 concurrent executions
},
})
Behavior
- Semaphore Semantics: Acts like a counting semaphore with N slots
- Blocking: Executions wait when all slots are occupied
- Status Updates: Waiting executions show
Pendingwith message “Waiting for concurrency slot” - Per-Function: Each function has its own independent limit
Common Use Cases
// Limit resource-intensive operations
worker.RegisterFunction(w, "video-encode", EncodeVideo, &function.Options{
Concurrency: &function.Concurrency{
Limit: 3, // Max 3 concurrent video encodings
},
})
// Prevent overwhelming external services
worker.RegisterFunction(w, "api-call", CallExternalAPI, &function.Options{
Concurrency: &function.Concurrency{
Limit: 10, // Max 10 concurrent API calls
},
})
// Serialize database migrations
worker.RegisterFunction(w, "db-migration", RunMigration, &function.Options{
Concurrency: &function.Concurrency{
Limit: 1, // One migration at a time
},
})
Worker Concurrency Limit
The worker concurrency limit caps total concurrent executions across all functions on a single worker. This prevents workers from being overloaded.
err := w.Run(ctx, &worker.Options{
Namespace: "default",
Group: "my-workers",
MaxConcurrentExecutions: 5000, // Default: 10000
})
The default of 10,000 is intentionally high to act as a safety net. Adjust based on worker resources and execution characteristics.
How They Work Together
When an execution uses multiple mechanisms, they are acquired in order:
- Worker concurrency slot is acquired first
- Mutex is acquired second (if specified)
- Function concurrency slot is acquired third (if configured)
- Execution runs
- All are released in reverse order
Example
// Worker with global limit
w.Run(ctx, &worker.Options{
MaxConcurrentExecutions: 100, // Max 100 total
})
// Function with concurrency limit
worker.RegisterFunction(w, "deploy", Deploy, &function.Options{
Concurrency: &function.Concurrency{
Limit: 3, // Max 3 deploys at once
},
})
// Execution with mutex
client.Spawn(ctx, "deploy", args, &client.SpawnOptions{
Mutex: &kontinuev1alpha1.ExecutionMutex{
Name: "deploy-prod", // Only one prod deploy at a time
},
})
In this example:
- The worker can run up to 100 concurrent executions total
- The
deployfunction can run up to 3 concurrent executions - Only 1 execution can deploy to production at a time (mutex)
- Other deploys (staging, dev) can run concurrently up to the limit of 3
Monitoring
Executions waiting for concurrency control show their state in status:
| Status | Message |
|---|---|
Pending | ”Waiting for mutex {name}“ |
Pending | ”Waiting for concurrency slot (limit: N)” |
Events are also emitted:
BlockedOnMutex- execution is waiting for a mutexBlockedOnConcurrency- execution is waiting for a concurrency slot
# View execution status
kontinue list
# Watch for events
kubectl get events -n default --watch
Best Practices
Mutexes
- Use descriptive names:
deploy-{environment},backup-{database} - Minimize scope: only use when true serialization is required
- Keep critical sections short
Function Concurrency
- Profile resource usage before setting limits
- Start conservative and increase as needed
- Monitor queue depth to tune limits
Worker Concurrency
- Use the default (10,000) unless resource constraints require adjustment
- Monitor worker resources during peak load
- Size for burst capacity beyond normal load
See Also
- Automatic Retries - Configure retry behavior
- Timeouts - Set execution time limits
- SDK Reference - Complete API reference