Skip to content

feat: Set maximum job duration for Azure Batch #5996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ The following settings are available:
`azure.batch.location`
: The name of the batch service region, e.g. `westeurope` or `eastus2`. This is not needed when the endpoint is specified.

`azure.batch.jobMaxWallClockTime`
: :::{versionadded} 25.04.0-edge
:::
: The maximum elapsed time that jobs may run, measured from the time they are created. If jobs do not complete within this time limit, the Batch service terminates them and any tasks still running (default: `7d`).

`azure.batch.terminateJobsOnCompletion`
: :::{versionadded} 23.05.0-edge
:::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.azure.compute.batch.models.AutoUserScope
import com.azure.compute.batch.models.AutoUserSpecification
import com.azure.compute.batch.models.AzureFileShareConfiguration
import com.azure.compute.batch.models.BatchJobCreateContent
import com.azure.compute.batch.models.BatchJobConstraints
import com.azure.compute.batch.models.BatchJobUpdateContent
import com.azure.compute.batch.models.BatchNodeFillType
import com.azure.compute.batch.models.BatchPool
Expand Down Expand Up @@ -434,6 +435,18 @@ class AzBatchService implements Closeable {
// create a batch job
final jobId = makeJobId(task)
final content = new BatchJobCreateContent(jobId, new BatchPoolInfo(poolId: poolId))

// Add job constraints with maxWallClockTime from config
if (config.batch().jobMaxWallClockTime) {
final constraints = new BatchJobConstraints()
// Convert nextflow.util.Duration to java.time.Duration for the Azure BatchJobConstraints
final long millis = config.batch().jobMaxWallClockTime.toMillis()
final java.time.Duration maxWallTime = java.time.Duration.ofMillis(millis)
constraints.setMaxWallClockTime(maxWallTime)
content.setConstraints(constraints)
log.debug "[AZURE BATCH] Setting job constraint maxWallClockTime: ${config.batch().jobMaxWallClockTime}"
}

apply(() -> client.createJob(content))
return jobId
}
Expand Down Expand Up @@ -945,7 +958,11 @@ class AzBatchService implements Closeable {
apply(() -> client.updateJob(jobId, jobParameter))
}
catch (Exception e) {
log.warn "Unable to terminate Azure Batch job ${jobId} - Reason: ${e.message ?: e}"
if (e.message?.contains('Status code 409') && e.message?.contains('JobCompleted')) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.debug "Azure Batch job ${jobId} already terminated, skipping termination"
} else {
log.warn "Unable to terminate Azure Batch job ${jobId} - Reason: ${e.message ?: e}"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class AzBatchOpts implements CloudTransferOptions {
Boolean deletePoolsOnCompletion
Boolean deleteTasksOnCompletion
CopyToolInstallMode copyToolInstallMode
Duration jobMaxWallClockTime

Map<String,AzPoolOpts> pools

Expand All @@ -71,6 +72,7 @@ class AzBatchOpts implements CloudTransferOptions {
deleteJobsOnCompletion = config.deleteJobsOnCompletion
deletePoolsOnCompletion = config.deletePoolsOnCompletion
deleteTasksOnCompletion = config.deleteTasksOnCompletion
jobMaxWallClockTime = config.jobMaxWallClockTime ? config.jobMaxWallClockTime as Duration : Duration.of('30d')
pools = parsePools(config.pools instanceof Map ? config.pools as Map<String,Map> : Collections.<String,Map>emptyMap())
maxParallelTransfers = config.maxParallelTransfers ? config.maxParallelTransfers as int : MAX_TRANSFER
maxTransferAttempts = config.maxTransferAttempts ? config.maxTransferAttempts as int : MAX_TRANSFER_ATTEMPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.function.Predicate

import com.azure.compute.batch.BatchClient
import com.azure.compute.batch.models.BatchPool
import com.azure.compute.batch.models.BatchJobCreateContent
import com.azure.compute.batch.models.ElevationLevel
import com.azure.core.exception.HttpResponseException
import com.azure.core.http.HttpResponse
Expand All @@ -29,6 +31,8 @@ import nextflow.util.MemoryUnit
import reactor.core.publisher.Flux
import spock.lang.Specification
import spock.lang.Unroll
import com.azure.compute.batch.models.BatchJobConstraints
import com.azure.compute.batch.models.BatchPoolInfo
/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -941,4 +945,42 @@ class AzBatchServiceTest extends Specification {
1 * service.createPool(spec) >> { throw new IllegalArgumentException("Some other error") }
thrown(IllegalArgumentException)
}

def 'should verify job constraints with jobMaxWallClockTime' () {
given:
def CONFIG = [batch: [jobMaxWallClockTime: WALL_CLOCK]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
def service = new AzBatchService(exec)

when:
// This is what happens inside createJob0 for setting constraints
def content = new BatchJobCreateContent('test-job', new BatchPoolInfo(poolId: 'test-pool'))
if (exec.getConfig().batch().jobMaxWallClockTime) {
final constraints = new BatchJobConstraints()
final long millis = exec.getConfig().batch().jobMaxWallClockTime.toMillis()
final java.time.Duration maxWallTime = java.time.Duration.ofMillis(millis)
constraints.setMaxWallClockTime(maxWallTime)
content.setConstraints(constraints)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel like a very good test...but I can't see how to mock the methods inside createJob0? Or is this an indication I should split up the method to a createJobConstraints and createJob0 method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a3dbc7c


then:
if (EXPECTED_DAYS > 0) {
assert content.constraints != null
assert content.constraints.maxWallClockTime != null
assert content.constraints.maxWallClockTime.toDays() == EXPECTED_DAYS
} else {
// For the null case, we don't explicitly set any constraints,
// but Azure SDK applies default constraints with 30 days
assert content.constraints != null
assert content.constraints.maxWallClockTime != null
assert content.constraints.maxWallClockTime.toDays() == 30
}

where:
WALL_CLOCK | EXPECTED_DAYS
'48d' | 48
'24h' | 1
'7d' | 7
null | 30
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,21 @@ class AzBatchOptsTest extends Specification {
CopyToolInstallMode.off | [:] | true

}

def 'should set jobMaxWallClockTime' () {
when:
def opts1 = new AzBatchOpts([:], [:])
then:
opts1.jobMaxWallClockTime.toString() == '30d'

when:
def opts2 = new AzBatchOpts([jobMaxWallClockTime: '3d'], [:])
then:
opts2.jobMaxWallClockTime.toString() == '3d'

when:
def opts3 = new AzBatchOpts([jobMaxWallClockTime: '12h'], [:])
then:
opts3.jobMaxWallClockTime.toString() == '12h'
}
}
Loading