How to process a list of files across a group of agents using pipeline?

Hi, we want to run an executable against each file of a list of files, using a pool of agents (preferably specified by a label).
Each file should be processed only once. The number of files in the list may be greater than the number of agents, so the pipeline stage should process each file when an agent becomes available (i.e. has finished processing a file appearing earlier in the list).

Is this possible using pipeline?

Perhaps another way of asking my question is to ask whether pipeline can implement the equivalent of the Least Load plugin without replacing the default scheduler (for other jobs)?

I assume you want to process the files in parallel.
So you could use the parallel step and inside each parallel stage request a node with a label.

@mawinter69 Thank you. The thing is I would then need to provide one parallel stage per item in the list, i.e. the list would be fixed. I wanted something more like the Matrix syntax where the list can be specified as an entity and could be changed.

you can construct the parallel stages in a for loop where you iterate over the list.

I have only used declarative pipeline. Am I correct in thinking that declarative pipeline doesn’t support for loops?

You can also use for loops in declarative. See Pipeline Syntax

For those cases I want to share data across agents in the same pipeline, I usually use a @field global to the pipeline, a thread safe map most of the times. You have to take special care with the access to the shared data, if not you will have unpredicted results

1 Like

@mawinter69 So far I have:

stage('proc') {

    steps {
        script {
            def nodes = ['jenkins-ubuntu22-1', 'jenkins-ubuntu22-2']
            def files = ['fileA.txt', 'fileB.txt']
            for (int i = 0; i < files.size(); ++i) {
                echo "Processing file: ${files[i]}"
            }
        }
    }                
}

which gives:

[Pipeline] script
[Pipeline] {
[Pipeline] echo
Processing file: fileA.txt
[Pipeline] echo
Processing file: fileB.txt
[Pipeline] }
[Pipeline] // script

How could I develop the code to echo ‘fileA.txt’ using the first node from nodes and ‘fileB.txt’ using the second, in parallel?

Ideally I would like to be able to set the length of files > length of nodes, so that each file is scheduled to the next free node.

how many executors do your agents have? If more than one would it be ok to have 2 files getting processed at the same time on the same agent?

@mawinter69 They can have more than one executor.

If more than one would it be ok to have 2 files getting processed at the same time on the same agent?

Yes, that would be ok.

Following code processes your files in parallel with the total number of executors all agents have mathing the the label expression a||b, so assume you have 2 agents with label a and one agent with label b and each agent has 2 executors it means you will process up to 6 files in parallel

                script {
                    def files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt', 'file5.txt', 'fil65.txt', 'file7.txt', 'file8.txt', 'file9.txt', 'file10.txt']
                    def p = [:]
                    for (file in files) {
                        echo "Preparing $file"
                        p[file] = {
                            node('a||b') {
                                def nextFile = files.pop()
                                echo "$nextFile"
                                sleep 10
                            }
                        }
                    }
                    parallel p
                }
1 Like

Thank you very much. I will try that out.

1 Like

@mawinter69 Your code example worked very well for me. Thank you!

Just from a point of view of style, is it possible to ‘move’ the definition of :

def files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt', 'file5.txt', 'fil65.txt', 'file7.txt', 'file8.txt', 'file9.txt', 'file10.txt']

further up the pipeline so that it can be maintained more easily? For example could be it specified in ‘environment’?

Also, with line:

def nextFile = files.pop()

why is that necessary, rather than just using the for loop’s ‘file’ variable?

1 Like

Yes you can move the files variable anywhere as long as the variable will then be visible where it is used. Maybe have it as a parameter of the job if required
You need the files.pop() because otherwise you would always process the last file. This is because the closure where it is used is evaluated only when the parallel step is executed (but then the file variable has the last value of the for loop) and not when you for loop is executed.

1 Like

Thanks again for your help.

Yes you can move the files variable anywhere as long as the variable will then be visible where it is used. Maybe have it as a parameter of the job if required

Could you help me with the syntax for assigning a parameter to a script variable please? (The assignment below does not work).

pipeline {}
    parameters {
        string defaultValue: 'Ubuntu_22',
               description: 'Label or name of agent(s) to use to run the simulations', 
               name: 'agents', 
    }

    stage('exp') {   
       steps {
              script {
                 <snip>
                 node(${env.agents}) {

you can access the Parameter with params.agents so in your sample
node(params.agents)

Thanks that’s great.

Hopefully my last syntax problem is how to specify a list as a parameter, and assign it
to the ‘files’ variable of your example.

use a text() paramater and split it by the newlines. See Pipeline Syntax

1 Like

Hi again, I’m failing to define a list of strings in a text parameter. My current code is:

parameters {
    string defaultValue: 'Ubuntu_22',
           description: 'Label or name of agent(s) to use to run the simulations', 
           name: 'agents', 
           trim: true
    text defaultValue: 'file1.txt,\nfile2.txt,\nfile3.txt,\nfile4.txt',
           description: 'List of xml files for Zodiac to run', 
           name: 'xml_filenames', 
           trim: true
}

stage('exp3') {
   steps {
          script {
          //def files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']
          def files = params.xml_filenames
          def p = [:]
          for (file in files) {
                 echo "Preparing $file"
                 p[file] = {
                 node(params.agents) {
                        echo "$nextFile"
                        sleep 10
                 }
                 }
          }
          parallel p
          }   
   }
}

The output is:

   [Pipeline] script
   [Pipeline] {
   [Pipeline] echo
   Preparing f
   [Pipeline] echo
   Preparing i
   [Pipeline] echo
   Preparing l
   [Pipeline] echo
   Preparing e
   [Pipeline] echo
   Preparing 1
   [Pipeline] echo
   Preparing .
   [Pipeline] echo
   Preparing t
   [Pipeline] echo
   Preparing x
   [Pipeline] echo
   Preparing t
   [Pipeline] echo
   Preparing ,

So the ‘for’ loop is processing character by character, not file name by file name.

What would be the correct syntax for the default value of the xml_filenames parameter, in order to define a list of strings?