Skip to content

feat: Set maximum job duration for Azure Batch #5996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ The following settings are available:
`azure.batch.location`
: The name of the batch service region, e.g. `westeurope` or `eastus2`. This is not needed when the endpoint is specified.

`azure.batch.jobMaxWallClockTime`
: :::{versionadded} 25.04.0-edge
:::
: The maximum elapsed time that jobs may run, measured from the time they are created. If jobs do not complete within this time limit, the Batch service terminates them and any tasks still running (default: `7d`).

`azure.batch.terminateJobsOnCompletion`
: :::{versionadded} 23.05.0-edge
:::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.azure.compute.batch.models.AutoUserScope
import com.azure.compute.batch.models.AutoUserSpecification
import com.azure.compute.batch.models.AzureFileShareConfiguration
import com.azure.compute.batch.models.BatchJobCreateContent
import com.azure.compute.batch.models.BatchJobConstraints
import com.azure.compute.batch.models.BatchJobUpdateContent
import com.azure.compute.batch.models.BatchNodeFillType
import com.azure.compute.batch.models.BatchPool
Expand Down Expand Up @@ -429,11 +430,26 @@ class AzBatchService implements Closeable {
return jobId
}

protected BatchJobConstraints createJobConstraints(nextflow.util.Duration time) {
final constraints = new BatchJobConstraints()
if (time && time.toMillis() > 0) {
final long millis = time.toMillis()
final java.time.Duration maxWallTime = java.time.Duration.ofMillis(millis)
constraints.setMaxWallClockTime(maxWallTime)
}
return constraints
}

protected String createJob0(String poolId, TaskRun task) {
log.debug "[AZURE BATCH] created job for ${task.processor.name} with pool ${poolId}"
// create a batch job
final jobId = makeJobId(task)
final content = new BatchJobCreateContent(jobId, new BatchPoolInfo(poolId: poolId))

if (config.batch().jobMaxWallClockTime) {
content.setConstraints(createJobConstraints(config.batch().jobMaxWallClockTime))
}

apply(() -> client.createJob(content))
return jobId
}
Expand Down Expand Up @@ -945,7 +961,11 @@ class AzBatchService implements Closeable {
apply(() -> client.updateJob(jobId, jobParameter))
}
catch (Exception e) {
log.warn "Unable to terminate Azure Batch job ${jobId} - Reason: ${e.message ?: e}"
if (e.message?.contains('Status code 409') && e.message?.contains('JobCompleted')) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.debug "Azure Batch job ${jobId} already terminated, skipping termination"
} else {
log.warn "Unable to terminate Azure Batch job ${jobId} - Reason: ${e.message ?: e}"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class AzBatchOpts implements CloudTransferOptions {
Boolean deletePoolsOnCompletion
Boolean deleteTasksOnCompletion
CopyToolInstallMode copyToolInstallMode
Duration jobMaxWallClockTime

Map<String,AzPoolOpts> pools

Expand All @@ -71,6 +72,7 @@ class AzBatchOpts implements CloudTransferOptions {
deleteJobsOnCompletion = config.deleteJobsOnCompletion
deletePoolsOnCompletion = config.deletePoolsOnCompletion
deleteTasksOnCompletion = config.deleteTasksOnCompletion
jobMaxWallClockTime = config.jobMaxWallClockTime ? config.jobMaxWallClockTime as Duration : Duration.of('30d')
pools = parsePools(config.pools instanceof Map ? config.pools as Map<String,Map> : Collections.<String,Map>emptyMap())
maxParallelTransfers = config.maxParallelTransfers ? config.maxParallelTransfers as int : MAX_TRANSFER
maxTransferAttempts = config.maxTransferAttempts ? config.maxTransferAttempts as int : MAX_TRANSFER_ATTEMPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.function.Predicate

import com.azure.compute.batch.BatchClient
import com.azure.compute.batch.models.BatchPool
import com.azure.compute.batch.models.BatchJobCreateContent
import com.azure.compute.batch.models.ElevationLevel
import com.azure.core.exception.HttpResponseException
import com.azure.core.http.HttpResponse
Expand All @@ -29,6 +31,8 @@ import nextflow.util.MemoryUnit
import reactor.core.publisher.Flux
import spock.lang.Specification
import spock.lang.Unroll
import com.azure.compute.batch.models.BatchJobConstraints
import com.azure.compute.batch.models.BatchPoolInfo
/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -941,4 +945,31 @@ class AzBatchServiceTest extends Specification {
1 * service.createPool(spec) >> { throw new IllegalArgumentException("Some other error") }
thrown(IllegalArgumentException)
}

def 'should test createJobConstraints method with Duration input' () {
given:
def exec = Mock(AzBatchExecutor)
def service = new AzBatchService(exec)
def nfDuration = TIME_STR ? nextflow.util.Duration.of(TIME_STR) : null

when:
def result = service.createJobConstraints(nfDuration)

then:
result != null
if (TIME_STR) {
assert result.maxWallClockTime != null
assert result.maxWallClockTime.toDays() == EXPECTED_DAYS
} else {
assert result.maxWallClockTime == null
}

where:
TIME_STR | EXPECTED_DAYS
'48d' | 48
'24h' | 1
'7d' | 7
null | 0
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,21 @@ class AzBatchOptsTest extends Specification {
CopyToolInstallMode.off | [:] | true

}

def 'should set jobMaxWallClockTime' () {
when:
def opts1 = new AzBatchOpts([:], [:])
then:
opts1.jobMaxWallClockTime.toString() == '30d'

when:
def opts2 = new AzBatchOpts([jobMaxWallClockTime: '3d'], [:])
then:
opts2.jobMaxWallClockTime.toString() == '3d'

when:
def opts3 = new AzBatchOpts([jobMaxWallClockTime: '12h'], [:])
then:
opts3.jobMaxWallClockTime.toString() == '12h'
}
}
Loading