Skip to content

Commit 2144746

Browse files
author
Ivan Duplenskikh
committed
Make file processing queue-based
1 parent eb408a2 commit 2144746

File tree

6 files changed

+726
-465
lines changed

6 files changed

+726
-465
lines changed

Tasks/CopyFilesOverSSHV0/copyfilesoverssh.ts

+154-155
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as tl from 'azure-pipelines-task-lib/task';
44
import * as minimatch from 'minimatch';
55
import * as utils from './utils';
66
import { SshHelper } from './sshhelper';
7+
import Queue from './queue';
78

89
// This method will find the list of matching files for the specified contents
910
// This logic is the same as the one used by CopyFiles task except for allowing dot folders to be copied
@@ -133,192 +134,190 @@ function getUniqueFolders(filesToCopy: string[]) {
133134
}
134135

135136
async function run() {
136-
let sshHelper: SshHelper;
137-
138-
try {
139-
tl.setResourcePath(path.join(__dirname, 'task.json'));
140-
141-
// read SSH endpoint input
142-
const sshEndpoint = tl.getInput('sshEndpoint', true);
143-
const username: string = tl.getEndpointAuthorizationParameter(sshEndpoint, 'username', false);
144-
const password: string = tl.getEndpointAuthorizationParameter(sshEndpoint, 'password', true); //passphrase is optional
145-
const privateKey: string = process.env['ENDPOINT_DATA_' + sshEndpoint + '_PRIVATEKEY']; //private key is optional, password can be used for connecting
146-
const hostname: string = tl.getEndpointDataParameter(sshEndpoint, 'host', false);
147-
let port: string = tl.getEndpointDataParameter(sshEndpoint, 'port', true); //port is optional, will use 22 as default port if not specified
148-
if (!port) {
149-
console.log(tl.loc('UseDefaultPort'));
150-
port = '22';
137+
tl.setResourcePath(path.join(__dirname, 'task.json'));
138+
139+
// Read SSH endpoint input
140+
const sshEndpoint = tl.getInput('sshEndpoint', true);
141+
const username = tl.getEndpointAuthorizationParameter(sshEndpoint, 'username', false);
142+
// Passphrase is optional
143+
const password = tl.getEndpointAuthorizationParameter(sshEndpoint, 'password', true);
144+
// Private key is optional, password can be used for connecting
145+
const privateKey = process.env['ENDPOINT_DATA_' + sshEndpoint + '_PRIVATEKEY'];
146+
const hostname = tl.getEndpointDataParameter(sshEndpoint, 'host', false);
147+
// Port is optional, will use 22 as default port if not specified
148+
let port = tl.getEndpointDataParameter(sshEndpoint, 'port', true);
149+
150+
if (!port) {
151+
console.log(tl.loc('UseDefaultPort'));
152+
port = '22';
153+
}
154+
155+
const readyTimeout = parseInt(tl.getInput('readyTimeout', true), 10);
156+
const useFastPut = !(process.env['USE_FAST_PUT'] === 'false');
157+
158+
// Set up the SSH connection configuration based on endpoint details
159+
let sshConfig: Object = {
160+
host: hostname,
161+
port: port,
162+
username: username,
163+
readyTimeout: readyTimeout,
164+
useFastPut: useFastPut
165+
};
166+
167+
if (privateKey) {
168+
tl.debug('Using private key for ssh connection.');
169+
170+
sshConfig = {
171+
...sshConfig,
172+
privateKey,
173+
password
151174
}
175+
} else {
176+
// Use password
177+
tl.debug('Using username and password for ssh connection.');
152178

153-
const readyTimeout = getReadyTimeoutVariable();
154-
const useFastPut: boolean = !(process.env['USE_FAST_PUT'] === 'false');
155-
156-
// set up the SSH connection configuration based on endpoint details
157-
let sshConfig;
158-
if (privateKey) {
159-
tl.debug('Using private key for ssh connection.');
160-
sshConfig = {
161-
host: hostname,
162-
port: port,
163-
username: username,
164-
privateKey: privateKey,
165-
passphrase: password,
166-
readyTimeout: readyTimeout,
167-
useFastPut: useFastPut
168-
}
169-
} else {
170-
// use password
171-
tl.debug('Using username and password for ssh connection.');
172-
sshConfig = {
173-
host: hostname,
174-
port: port,
175-
username: username,
176-
password: password,
177-
readyTimeout: readyTimeout,
178-
useFastPut: useFastPut
179-
}
179+
sshConfig = {
180+
...sshConfig,
181+
password,
180182
}
183+
}
181184

182-
// contents is a multiline input containing glob patterns
183-
const contents: string[] = tl.getDelimitedInput('contents', '\n', true);
184-
const sourceFolder: string = tl.getPathInput('sourceFolder', true, true);
185-
let targetFolder: string = tl.getInput('targetFolder');
185+
// Contents is a multiline input containing glob patterns
186+
const contents = tl.getDelimitedInput('contents', '\n', true);
187+
const sourceFolder = tl.getPathInput('sourceFolder', true, true);
188+
let targetFolder = tl.getInput('targetFolder');
186189

187-
if (!targetFolder) {
188-
targetFolder = "./";
189-
} else {
190-
// '~/' is unsupported
191-
targetFolder = targetFolder.replace(/^~\//, "./");
192-
}
190+
if (!targetFolder) {
191+
targetFolder = "./";
192+
} else {
193+
// '~/' is unsupported
194+
targetFolder = targetFolder.replace(/^~\//, "./");
195+
}
193196

194-
// read the copy options
195-
const cleanTargetFolder: boolean = tl.getBoolInput('cleanTargetFolder', false);
196-
const overwrite: boolean = tl.getBoolInput('overwrite', false);
197-
const failOnEmptySource: boolean = tl.getBoolInput('failOnEmptySource', false);
198-
const flattenFolders: boolean = tl.getBoolInput('flattenFolders', false);
197+
// Read the copy options
198+
const cleanTargetFolder = tl.getBoolInput('cleanTargetFolder', false);
199+
const overwrite = tl.getBoolInput('overwrite', false);
200+
const failOnEmptySource = tl.getBoolInput('failOnEmptySource', false);
201+
const flattenFolders = tl.getBoolInput('flattenFolders', false);
199202

200-
if (!tl.stats(sourceFolder).isDirectory()) {
201-
throw tl.loc('SourceNotFolder');
202-
}
203+
if (!tl.stats(sourceFolder).isDirectory()) {
204+
tl.setResult(tl.TaskResult.Failed, tl.loc('SourceNotFolder'));
205+
return;
206+
}
203207

204-
// initialize the SSH helpers, set up the connection
205-
sshHelper = new SshHelper(sshConfig);
206-
await sshHelper.setupConnection();
207-
208-
if (cleanTargetFolder && await sshHelper.checkRemotePathExists(targetFolder)) {
209-
console.log(tl.loc('CleanTargetFolder', targetFolder));
210-
const isWindowsOnTarget: boolean = tl.getBoolInput('isWindowsOnTarget', false);
211-
const cleanHiddenFilesInTarget: boolean = tl.getBoolInput('cleanHiddenFilesInTarget', false);
212-
const cleanTargetFolderCmd: string = utils.getCleanTargetFolderCmd(targetFolder, isWindowsOnTarget, cleanHiddenFilesInTarget);
213-
try {
214-
await sshHelper.runCommandOnRemoteMachine(cleanTargetFolderCmd, null);
215-
} catch (err) {
216-
throw tl.loc('CleanTargetFolderFailed', err);
217-
}
208+
// Initialize the SSH helpers, set up the connection
209+
const sshHelper = new SshHelper(sshConfig);
210+
await sshHelper.setupConnection();
211+
212+
if (cleanTargetFolder && await sshHelper.checkRemotePathExists(targetFolder)) {
213+
console.log(tl.loc('CleanTargetFolder', targetFolder));
214+
const isWindowsOnTarget = tl.getBoolInput('isWindowsOnTarget', false);
215+
const cleanHiddenFilesInTarget = tl.getBoolInput('cleanHiddenFilesInTarget', false);
216+
const cleanTargetFolderCmd = utils.getCleanTargetFolderCmd(targetFolder, isWindowsOnTarget, cleanHiddenFilesInTarget);
217+
218+
try {
219+
await sshHelper.runCommandOnRemoteMachine(cleanTargetFolderCmd, null);
220+
} catch (error) {
221+
tl.setResult(tl.TaskResult.Failed, tl.loc('CleanTargetFolderFailed', error));
222+
tl.debug('Closing the client connection');
223+
await sshHelper.closeConnection();
224+
return;
218225
}
226+
}
219227

220-
if (contents.length === 1 && contents[0] === "**") {
221-
tl.debug("Upload a directory to a remote machine");
228+
if (contents.length === 1 && contents[0] === "**") {
229+
tl.debug("Upload a directory to a remote machine");
222230

223-
try {
224-
const completedDirectory = await sshHelper.uploadFolder(sourceFolder, targetFolder);
225-
console.log(tl.loc('CopyDirectoryCompleted', completedDirectory));
226-
} catch (err) {
227-
throw tl.loc("CopyDirectoryFailed", sourceFolder, err);
228-
}
229-
} else {
230-
// identify the files to copy
231-
const filesToCopy = getFilesToCopy(sourceFolder, contents);
231+
try {
232+
const completedDirectory = await sshHelper.uploadFolder(sourceFolder, targetFolder);
233+
tl.setResult(tl.TaskResult.Succeeded, tl.loc('CopyDirectoryCompleted', completedDirectory));
234+
} catch (error) {
235+
tl.setResult(tl.TaskResult.Failed, tl.loc("CopyDirectoryFailed", sourceFolder, error));
236+
}
232237

233-
// copy files to remote machine
234-
if (filesToCopy) {
235-
const preparedFiles = prepareFiles(filesToCopy, sourceFolder, targetFolder, flattenFolders);
238+
tl.debug('Closing the client connection');
239+
await sshHelper.closeConnection();
240+
return;
241+
}
236242

237-
tl.debug('Number of files to copy = ' + preparedFiles.length);
238-
tl.debug('filesToCopy = ' + preparedFiles);
243+
// Identify the files to copy
244+
const filesToCopy = getFilesToCopy(sourceFolder, contents);
239245

240-
console.log(tl.loc('CopyingFiles', preparedFiles.length));
246+
// Copy files to remote machine
247+
if (filesToCopy.length === 0) {
248+
if (failOnEmptySource) {
249+
tl.setResult(tl.TaskResult.Failed, tl.loc('NothingToCopy'));
250+
return;
251+
} else {
252+
tl.warning(tl.loc('NothingToCopy'));
253+
return;
254+
}
255+
}
241256

242-
// Create remote folders structure
243-
const folderStructure = getUniqueFolders(preparedFiles.map(x => x[1]).sort());
257+
const preparedFiles = prepareFiles(filesToCopy, sourceFolder, targetFolder, flattenFolders);
244258

245-
for (const foldersPath of folderStructure) {
246-
try {
247-
await sshHelper.createRemoteDirectory(foldersPath);
248-
console.log(tl.loc("FolderCreated", foldersPath));
249-
} catch (error) {
250-
throw tl.loc('TargetNotCreated', foldersPath, error);
251-
}
252-
}
259+
tl.debug(`Number of files to copy = ${preparedFiles.length}`);
260+
tl.debug(`filesToCopy = ${preparedFiles}`);
253261

254-
console.log(tl.loc("FoldersCreated", folderStructure.length));
262+
console.log(tl.loc('CopyingFiles', preparedFiles.length));
255263

256-
// Upload files to remote machine
257-
const chunkSize = 10;
258-
const chunks = Math.ceil(preparedFiles.length / chunkSize);
259-
let successedFiles = 0;
264+
// Create remote folders structure
265+
const folderStructure = getUniqueFolders(preparedFiles.map(x => x[1]).sort());
260266

261-
for (let i = 0; i < chunks; i++) {
262-
// Splice files to copy into chunks
263-
const chunk = preparedFiles.slice(i * chunkSize, i * chunkSize + chunkSize);
267+
for (const foldersPath of folderStructure) {
268+
try {
269+
await sshHelper.createRemoteDirectory(foldersPath);
270+
console.log(tl.loc("FolderCreated", foldersPath));
271+
} catch (error) {
272+
await sshHelper.closeConnection();
273+
tl.setResult(tl.TaskResult.Failed, tl.loc('TargetNotCreated', foldersPath, error));
274+
return;
275+
}
276+
}
264277

265-
const results = await Promise.allSettled(chunk.map(async (pathTuple) => {
266-
const [ fileToCopy, targetPath ] = pathTuple;
278+
console.log(tl.loc("FoldersCreated", folderStructure.length));
267279

268-
tl.debug('fileToCopy = ' + fileToCopy);
280+
// Upload files to remote machine
281+
const q = new Queue({
282+
concurrent: 5,
283+
delay: 0,
284+
throwOnError: false
285+
});
269286

270-
console.log(tl.loc('StartedFileCopy', fileToCopy, targetPath));
287+
q.enqueue(preparedFiles.map((pathTuple) => async () => {
288+
const [ fileToCopy, targetPath ] = pathTuple;
271289

272-
if (!overwrite) {
273-
const fileExists: boolean = await sshHelper.checkRemotePathExists(targetPath);
290+
tl.debug('fileToCopy = ' + fileToCopy);
274291

275-
if (fileExists) {
276-
throw tl.loc('FileExists', targetPath);
277-
}
278-
}
292+
console.log(tl.loc('StartedFileCopy', fileToCopy, targetPath));
279293

280-
return sshHelper.uploadFile(fileToCopy, targetPath);
281-
}));
294+
if (!overwrite && await sshHelper.checkRemotePathExists(targetPath)) {
295+
throw tl.loc('FileExists', targetPath);
296+
}
282297

283-
var errors = results.filter(p => p.status === 'rejected') as PromiseRejectedResult[];
298+
return await sshHelper.uploadFile(fileToCopy, targetPath);
299+
}));
284300

285-
if (errors.length > 0) {
286-
errors.forEach(x => tl.error(x.reason));
287-
tl.setResult(tl.TaskResult.Failed, tl.loc('NumberFailed', errors.length));
288-
return;
289-
}
301+
const errors = [];
302+
let successfullyCopiedFilesCount = 0;
290303

291-
successedFiles += results.filter(p => p.status === 'fulfilled').length;
292-
}
304+
q.on('reject', (error) => errors.push(error));
305+
q.on('resolve', () => successfullyCopiedFilesCount++);
306+
q.on('empty', () => tl.debug('Queue is empty'));
307+
q.on('end', async () => {
308+
tl.debug('End of the queue processing');
309+
tl.debug(`Errors count ${errors.length}`);
293310

294-
console.log(tl.loc('CopyCompleted', successedFiles));
295-
} else if (failOnEmptySource) {
296-
throw tl.loc('NothingToCopy');
297-
} else {
298-
tl.warning(tl.loc('NothingToCopy'));
299-
}
300-
}
301-
} catch (err) {
302-
tl.setResult(tl.TaskResult.Failed, err);
303-
} finally {
304-
// close the client connection to halt build execution
305-
if (sshHelper) {
306-
tl.debug('Closing the client connection');
311+
if (errors.length > 0) {
312+
errors.forEach(x => tl.error(x));
307313
await sshHelper.closeConnection();
314+
tl.setResult(tl.TaskResult.Failed, tl.loc('NumberFailed', errors.length));
315+
return;
308316
}
309-
}
310-
}
311317

312-
run().then(() => {
313-
tl.debug('Task successfully accomplished');
314-
})
315-
.catch(err => {
316-
tl.debug('Run was unexpectedly failed due to: ' + err);
318+
await sshHelper.closeConnection();
319+
tl.setResult(tl.TaskResult.Succeeded, tl.loc('CopyCompleted', successfullyCopiedFilesCount));
317320
});
318-
319-
function getReadyTimeoutVariable(): number {
320-
let readyTimeoutString: string = tl.getInput('readyTimeout', true);
321-
const readyTimeout: number = parseInt(readyTimeoutString, 10);
322-
323-
return readyTimeout;
324321
}
322+
323+
run();

0 commit comments

Comments
 (0)