Concurrency Controls

kontinue provides three mechanisms for controlling concurrency: Mutexes, Function Concurrency Limits, and Worker Concurrency Limits. These work together to provide fine-grained control over how executions run.

MechanismPurpose
MutexesEnsure only one execution runs at a time for a named resource
Function ConcurrencyLimit total concurrent executions of a specific function
Worker ConcurrencyGlobal limit on total concurrent executions per worker

Mutexes

Mutexes provide exclusive access to a named resource. Only one execution can hold a mutex at a time.

Via CLI

# Start an execution with a mutex
kontinue execute deploy --mutex deploy-prod --args '{"version": "1.2.3"}'

Via SDK

When spawning child executions, pass the mutex in ExecuteOptions:

result, err := kontinue.Execute[DeployResult](ktx, "deploy", &DeployArgs{
    Env: "production",
}, &kontinue.ExecuteOptions{
    Mutex: &kontinue.MutexOptions{
        Name: "deploy-prod",
    },
})

Via Client Library

When spawning executions externally:

_, err := client.Spawn(ctx, "deploy", &DeployArgs{Version: "1.2.3"}, &client.SpawnOptions{
    Mutex: &kontinuev1alpha1.ExecutionMutex{
        Name: "deploy-prod",
    },
})

Common Use Cases

  • Deployment Pipelines: Ensure only one deployment runs per environment
  • Resource Management: Serialize access to shared resources
  • Critical Sections: Protect operations that must not run concurrently

# Different environments can deploy concurrently
kontinue execute deploy --mutex deploy-prod
kontinue execute deploy --mutex deploy-staging

Function Concurrency Limits

Function concurrency limits restrict the total number of concurrent executions of a specific function across all workers in a worker group.

Configure limits when registering a function:

worker.RegisterFunction(w, "heavy-task", HeavyTask, &function.Options{
    Description: "A resource-intensive task",
    Concurrency: &function.Concurrency{
        Limit: 5, // Maximum 5 concurrent executions
    },
})

Behavior

  • Semaphore Semantics: Acts like a counting semaphore with N slots
  • Blocking: Executions wait when all slots are occupied
  • Status Updates: Waiting executions show Pending with message “Waiting for concurrency slot”
  • Per-Function: Each function has its own independent limit

Common Use Cases

// Limit resource-intensive operations
worker.RegisterFunction(w, "video-encode", EncodeVideo, &function.Options{
    Concurrency: &function.Concurrency{
        Limit: 3, // Max 3 concurrent video encodings
    },
})

// Prevent overwhelming external services
worker.RegisterFunction(w, "api-call", CallExternalAPI, &function.Options{
    Concurrency: &function.Concurrency{
        Limit: 10, // Max 10 concurrent API calls
    },
})

// Serialize database migrations
worker.RegisterFunction(w, "db-migration", RunMigration, &function.Options{
    Concurrency: &function.Concurrency{
        Limit: 1, // One migration at a time
    },
})

Worker Concurrency Limit

The worker concurrency limit caps total concurrent executions across all functions on a single worker. This prevents workers from being overloaded.

err := w.Run(ctx, &worker.Options{
    Namespace: "default",
    Group:     "my-workers",
    MaxConcurrentExecutions: 5000, // Default: 10000
})

The default of 10,000 is intentionally high to act as a safety net. Adjust based on worker resources and execution characteristics.

How They Work Together

When an execution uses multiple mechanisms, they are acquired in order:

  1. Worker concurrency slot is acquired first
  2. Mutex is acquired second (if specified)
  3. Function concurrency slot is acquired third (if configured)
  4. Execution runs
  5. All are released in reverse order

Example

// Worker with global limit
w.Run(ctx, &worker.Options{
    MaxConcurrentExecutions: 100, // Max 100 total
})

// Function with concurrency limit
worker.RegisterFunction(w, "deploy", Deploy, &function.Options{
    Concurrency: &function.Concurrency{
        Limit: 3, // Max 3 deploys at once
    },
})

// Execution with mutex
client.Spawn(ctx, "deploy", args, &client.SpawnOptions{
    Mutex: &kontinuev1alpha1.ExecutionMutex{
        Name: "deploy-prod", // Only one prod deploy at a time
    },
})

In this example:

  • The worker can run up to 100 concurrent executions total
  • The deploy function can run up to 3 concurrent executions
  • Only 1 execution can deploy to production at a time (mutex)
  • Other deploys (staging, dev) can run concurrently up to the limit of 3

Monitoring

Executions waiting for concurrency control show their state in status:

StatusMessage
Pending”Waiting for mutex {name}“
Pending”Waiting for concurrency slot (limit: N)”

Events are also emitted:

  • BlockedOnMutex - execution is waiting for a mutex
  • BlockedOnConcurrency - execution is waiting for a concurrency slot
# View execution status
kontinue list

# Watch for events
kubectl get events -n default --watch

Best Practices

Mutexes

  • Use descriptive names: deploy-{environment}, backup-{database}
  • Minimize scope: only use when true serialization is required
  • Keep critical sections short

Function Concurrency

  • Profile resource usage before setting limits
  • Start conservative and increase as needed
  • Monitor queue depth to tune limits

Worker Concurrency

  • Use the default (10,000) unless resource constraints require adjustment
  • Monitor worker resources during peak load
  • Size for burst capacity beyond normal load

See Also