Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@

package io.seqera.wave.service.blob.impl


import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Nullable
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.core.ContainerPlatform
import io.seqera.wave.service.blob.TransferStrategy
import io.seqera.wave.service.k8s.K8sService
import jakarta.inject.Inject
import static io.seqera.wave.util.K8sHelper.getSelectorLabel

import static io.seqera.wave.util.K8sHelper.getNoArchSelector
/**
* Implements {@link TransferStrategy} that runs s5cmd using a
* Kubernetes job
Expand All @@ -55,9 +52,7 @@ class KubeTransferStrategy implements TransferStrategy {

@Override
void launchJob(String jobName, List<String> command) {

final selector = getSelectorLabel(ContainerPlatform.DEFAULT, nodeSelectorMap)

final selector = getNoArchSelector(nodeSelectorMap)
// run the transfer job
k8sService.launchTransferJob(jobName, blobConfig.s5Image, command, blobConfig, selector)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import io.seqera.wave.service.k8s.K8sService
import io.seqera.wave.service.mirror.MirrorRequest
import jakarta.inject.Inject
import jakarta.inject.Singleton
import static io.seqera.wave.util.K8sHelper.getSelectorLabel

import static io.seqera.wave.util.K8sHelper.getNoArchSelector
/**
* Implements a container mirror runner based on Kubernetes
*
Expand Down Expand Up @@ -63,7 +62,7 @@ class KubeMirrorStrategy extends MirrorStrategy {
void mirrorJob(String jobName, MirrorRequest request) {
// docker auth json file
final Path configFile = request.authJson ? request.workDir.resolve('config.json') : null
final selector = getSelectorLabel(request.platform, nodeSelectorMap)
final selector = getNoArchSelector(nodeSelectorMap)

try {
k8sService.launchMirrorJob(
Expand Down
22 changes: 22 additions & 0 deletions src/main/groovy/io/seqera/wave/util/K8sHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package io.seqera.wave.util

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.core.ContainerPlatform
import io.seqera.wave.exception.BadRequestException
/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class K8sHelper {

Expand Down Expand Up @@ -59,6 +61,26 @@ class K8sHelper {
throw new BadRequestException("Unsupported container platform '${platform}'")
}

/**
* Get the node selector for architecture-independent (noarch) workloads
*
* @param nodeSelectors
* A map that associate the platform architecture with a corresponding node selector label
* @return
* A {@link Map} object representing a kubernetes label to be used as node selector for noarch workloads,
* or an empty map when there's no matching
*/
static Map<String,String> getNoArchSelector(Map<String,String> nodeSelectors) {
if( !nodeSelectors )
return Collections.<String,String>emptyMap()
final value = nodeSelectors.get('noarch')
if( !value ) {
log.warn("Node selectors are configured but 'noarch' key is missing - available keys: ${nodeSelectors.keySet()}")
return Collections.<String,String>emptyMap()
}
return toLabelMap(value)
}

/**
* Given a label formatted as key=value, return it as a map
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ class KubeTransferStrategyTest extends Specification {
BlobCacheConfig blobConfig = new BlobCacheConfig(s5Image: 's5cmd', transferTimeout: Duration.ofSeconds(10), retryAttempts: 3)
KubeTransferStrategy strategy = new KubeTransferStrategy(k8sService: k8sService, blobConfig: blobConfig, nodeSelectorMap: [
'linux/amd64': 'service=wave-build',
'linux/arm64': 'service=wave-build-arm64'
'linux/arm64': 'service=wave-build-arm64',
'noarch': 'service=wave-transfer'
])

def "transfer should start a transferJob"() {
given:
final selector = ['service': 'wave-build']
final selector = ['service': 'wave-transfer']
def info = BlobEntry.create("https://test.com/blobs", "https://test.com/bucket/blobs", null, null)
def command = ["transfer", "blob"]
final jobName = "job-123"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.service.mirror.strategy

import spock.lang.Specification

import java.nio.file.Path

import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.service.k8s.K8sService
import io.seqera.wave.service.mirror.MirrorRequest

/**
*
* @author Munish Chouhan <munish.chouhan@seqera.io>
*/
class KubeMirrorStrategyTest extends Specification {

def "should use noarch node selector for mirror job"() {
given:
def k8sService = Mock(K8sService)
def strategy = new KubeMirrorStrategy(
config: Mock(MirrorConfig) {
getSkopeoImage() >> 'quay.io/skopeo/stable:latest'
},
k8sService: k8sService,
nodeSelectorMap: ['linux/amd64': 'service=wave-build',
'linux/arm64': 'service=wave-build-arm64',
'noarch': 'service=wave-mirror']
)
def request = Mock(MirrorRequest) {
getWorkDir() >> Path.of('/tmp/work')
getAuthJson() >> null
}

when:
strategy.mirrorJob('job-123', request)

then:
1 * k8sService.launchMirrorJob('job-123', 'quay.io/skopeo/stable:latest', _, Path.of('/tmp/work'), null, _,
['service':'wave-mirror'])
}

def "should use empty selector when noarch is not configured"() {
given:
def k8sService = Mock(K8sService)
def strategy = new KubeMirrorStrategy(
config: Mock(MirrorConfig) {
getSkopeoImage() >> 'quay.io/skopeo/stable:latest'
},
k8sService: k8sService,
nodeSelectorMap: ['linux/amd64': 'service=wave-build',
'linux/arm64': 'service=wave-build-arm64']
)
def request = Mock(MirrorRequest) {
getWorkDir() >> Path.of('/tmp/work')
getAuthJson() >> null
}

when:
strategy.mirrorJob('job-456', request)

then:
1 * k8sService.launchMirrorJob('job-456', 'quay.io/skopeo/stable:latest', _, Path.of('/tmp/work'), null, _,
[:])
}

}
13 changes: 13 additions & 0 deletions src/test/groovy/io/seqera/wave/util/K8sHelperTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,17 @@ class K8sHelperTest extends Specification {
err.message == "Unsupported container platform 'linux/amd64'"
}

def 'should get noarch selector' () {
expect:
K8sHelper.getNoArchSelector(SELECTORS) == EXPECTED

where:
SELECTORS | EXPECTED
null | [:]
[:] | [:]
['noarch': 'foo=1'] | ['foo': '1']
['amd64': 'bar=2', 'noarch': 'foo=1'] | ['foo': '1']
['amd64': 'bar=2', 'arm64': 'baz=3'] | [:] // logs warning
}

}