XRootD
Loading...
Searching...
No Matches
XrdCl::CopyProcess Class Reference

Copy the data from one point to another. More...

#include <XrdClCopyProcess.hh>

+ Collaboration diagram for XrdCl::CopyProcess:

Public Member Functions

 CopyProcess ()
 Constructor.
 
virtual ~CopyProcess ()
 Destructor.
 
XRootDStatus AddJob (const PropertyList &properties, PropertyList *results)
 
XRootDStatus Prepare ()
 
XRootDStatus Run (CopyProgressHandler *handler)
 Run the copy jobs.
 

Detailed Description

Copy the data from one point to another.

Definition at line 107 of file XrdClCopyProcess.hh.

Constructor & Destructor Documentation

◆ CopyProcess()

XrdCl::CopyProcess::CopyProcess ( )

Constructor.

Definition at line 212 of file XrdClCopyProcess.cc.

212 : pImpl( new CopyProcessImpl() )
213 {
214 }

◆ ~CopyProcess()

XrdCl::CopyProcess::~CopyProcess ( )
virtual

Destructor.

Definition at line 219 of file XrdClCopyProcess.cc.

220 {
221 CleanUpJobs();
222 delete pImpl;
223 }

Member Function Documentation

◆ AddJob()

XRootDStatus XrdCl::CopyProcess::AddJob ( const PropertyList & properties,
PropertyList * results )

Add job

Parameters
propertiesjob configuration parameters
resultsplaceholder for the results

Configuration properties: source [string] - original source URL target [string] - target directory or file sourceLimit [uint16_t] - maximum number sources force [bool] - overwrite target if exists posc [bool] - persistify only on successful close coerce [bool] - ignore locking semantics on destination makeDir [bool] - create path to the file if it doesn't exist thirdParty [string] - "first" try third party copy, if it fails try normal copy; "only" only try third party copy checkSumMode [string] - "none" - no checksumming "end2end" - end to end checksumming "source" - calculate checksum at source "target" - calculate checksum at target checkSumType [string] - type of the checksum to be used checkSumPreset [string] - checksum preset chunkSize [uint32_t] - size of a copy chunks in bytes parallelChunks [uint8_t] - number of chunks that should be requested in parallel initTimeout [uint16_t] - time limit for successfull initialization of the copy job tpcTimeout [uint16_t] - time limit for the actual copy to finish dynamicSource [bool] - support for the case where the size source file may change during reading process

Configuration job - this is a job that that is supposed to configure the copy process as a whole instead of adding a copy job:

jobType [string] - "configuration" - for configuraion parallel [uint8_t] - nomber of copy jobs to be run in parallel

Results: sourceCheckSum [string] - checksum at source, if requested targetCheckSum [string] - checksum at target, if requested size [uint64_t] - file size status [XRootDStatus] - status of the copy operation sources [vector<string>] - all sources used realTarget [string] - the actual disk server target

Definition at line 228 of file XrdClCopyProcess.cc.

230 {
231 Env *env = DefaultEnv::GetEnv();
232
233 //--------------------------------------------------------------------------
234 // Process a configuraion job
235 //--------------------------------------------------------------------------
236 if( properties.HasProperty( "jobType" ) &&
237 properties.Get<std::string>( "jobType" ) == "configuration" )
238 {
239 if( pImpl->pJobProperties.size() > 0 &&
240 pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
241 pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
242 {
243 PropertyList &config = *pImpl->pJobProperties.rbegin();
244 PropertyList::PropertyMap::const_iterator it;
245 for( it = properties.begin(); it != properties.end(); ++it )
246 config.Set( it->first, it->second );
247 }
248 else
249 pImpl->pJobProperties.push_back( properties );
250 return XRootDStatus();
251 }
252
253 //--------------------------------------------------------------------------
254 // Validate properties
255 //--------------------------------------------------------------------------
256 if( !properties.HasProperty( "source" ) )
257 return XRootDStatus( stError, errInvalidArgs, 0, "source not specified" );
258
259 if( !properties.HasProperty( "target" ) )
260 return XRootDStatus( stError, errInvalidArgs, 0, "target not specified" );
261
262 pImpl->pJobProperties.push_back( properties );
263 PropertyList &p = pImpl->pJobProperties.back();
264
265 const char *bools[] = {"target", "force", "posc", "coerce", "makeDir",
266 "zipArchive", "xcp", "preserveXAttr", "rmOnBadCksum",
267 "continue", "zipAppend", "doServer", 0};
268 for( int i = 0; bools[i]; ++i )
269 if( !p.HasProperty( bools[i] ) )
270 p.Set( bools[i], false );
271
272 if( !p.HasProperty( "thirdParty" ) )
273 p.Set( "thirdParty", "none" );
274
275 if( !p.HasProperty( "checkSumMode" ) )
276 p.Set( "checkSumMode", "none" );
277 else
278 {
279 if( !p.HasProperty( "checkSumType" ) )
280 {
281 pImpl->pJobProperties.pop_back();
282 return XRootDStatus( stError, errInvalidArgs, 0,
283 "checkSumType not specified" );
284 }
285 else
286 {
287 //----------------------------------------------------------------------
288 // Checksum type has to be case insensitive
289 //----------------------------------------------------------------------
290 std::string checkSumType;
291 p.Get( "checkSumType", checkSumType );
292 std::transform(checkSumType.begin(), checkSumType.end(),
293 checkSumType.begin(), ::tolower);
294 p.Set( "checkSumType", checkSumType );
295 }
296 }
297
298 if( !p.HasProperty( "parallelChunks" ) )
299 {
300 int val = DefaultCPParallelChunks;
301 env->GetInt( "CPParallelChunks", val );
302 p.Set( "parallelChunks", val );
303 }
304
305 if( !p.HasProperty( "chunkSize" ) )
306 {
307 int val = DefaultCPChunkSize;
308 env->GetInt( "CPChunkSize", val );
309 p.Set( "chunkSize", val );
310 }
311
312 if( !p.HasProperty( "xcpBlockSize" ) )
313 {
314 int val = DefaultXCpBlockSize;
315 env->GetInt( "XCpBlockSize", val );
316 p.Set( "xcpBlockSize", val );
317 }
318
319 if( !p.HasProperty( "initTimeout" ) )
320 {
321 int val = DefaultCPInitTimeout;
322 env->GetInt( "CPInitTimeout", val );
323 p.Set( "initTimeout", val );
324 }
325
326 if( !p.HasProperty( "tpcTimeout" ) )
327 {
328 int val = DefaultCPTPCTimeout;
329 env->GetInt( "CPTPCTimeout", val );
330 p.Set( "tpcTimeout", val );
331 }
332
333 if( !p.HasProperty( "cpTimeout" ) )
334 {
335 int val = DefaultCPTimeout;
336 env->GetInt( "CPTimeout", val );
337 p.Set( "cpTimeout", val );
338 }
339
340 if( !p.HasProperty( "dynamicSource" ) )
341 p.Set( "dynamicSource", false );
342
343 if( !p.HasProperty( "xrate" ) )
344 p.Set( "xrate", 0 );
345
346 if( !p.HasProperty( "xrateThreshold" ) || p.Get<long long>( "xrateThreshold" ) == 0 )
347 {
348 int val = DefaultXRateThreshold;
349 env->GetInt( "XRateThreshold", val );
350 p.Set( "xrateThreshold", val );
351 }
352
353 //--------------------------------------------------------------------------
354 // Insert the properties
355 //--------------------------------------------------------------------------
356 Log *log = DefaultEnv::GetLog();
357 Utils::LogPropertyList( log, UtilityMsg, "Adding job with properties: %s",
358 p );
359 pImpl->pJobResults.push_back( results );
360 return XRootDStatus();
361 }
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static void LogPropertyList(Log *log, uint64_t topic, const char *format, const PropertyList &list)
Log property list.
const int DefaultCPInitTimeout
const int DefaultXRateThreshold
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
const int DefaultCPParallelChunks
const int DefaultXCpBlockSize
const uint64_t UtilityMsg
const int DefaultCPTimeout
const uint16_t errInvalidArgs
const int DefaultCPTPCTimeout
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::PropertyList::begin(), XrdCl::DefaultCPChunkSize, XrdCl::DefaultCPInitTimeout, XrdCl::DefaultCPParallelChunks, XrdCl::DefaultCPTimeout, XrdCl::DefaultCPTPCTimeout, XrdCl::DefaultXCpBlockSize, XrdCl::DefaultXRateThreshold, XrdCl::PropertyList::end(), XrdCl::errInvalidArgs, XrdCl::PropertyList::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::PropertyList::HasProperty(), XrdCl::Utils::LogPropertyList(), XrdCl::PropertyList::Set(), XrdCl::stError, and XrdCl::UtilityMsg.

Referenced by DoCat(), and main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

XRootDStatus XrdCl::CopyProcess::Prepare ( )

Definition at line 366 of file XrdClCopyProcess.cc.

367 {
368 Log *log = DefaultEnv::GetLog();
369 std::vector<PropertyList>::iterator it;
370
371 log->Debug( UtilityMsg, "CopyProcess: %zu jobs to prepare",
372 pImpl->pJobProperties.size() );
373
374 std::map<std::string, uint32_t> targetFlags;
375 int i = 0;
376 for( it = pImpl->pJobProperties.begin(); it != pImpl->pJobProperties.end(); ++it, ++i )
377 {
378 PropertyList &props = *it;
379
380 if( props.HasProperty( "jobType" ) &&
381 props.Get<std::string>( "jobType" ) == "configuration" )
382 continue;
383
384 PropertyList *res = pImpl->pJobResults[i];
385 std::string tmp;
386
387 props.Get( "source", tmp );
388 URL source = tmp;
389 if( !source.IsValid() )
390 return XRootDStatus( stError, errInvalidArgs, 0, "invalid source" );
391
392 //--------------------------------------------------------------------------
393 // Create a virtual redirector if it is a Metalink file
394 //--------------------------------------------------------------------------
395 if( source.IsMetalink() )
396 {
397 RedirectorRegistry &registry = RedirectorRegistry::Instance();
398 XRootDStatus st = registry.RegisterAndWait( source );
399 if( !st.IsOK() ) return st;
400 }
401
402 // handle UNZIP CGI
403 const URL::ParamsMap &cgi = source.GetParams();
404 URL::ParamsMap::const_iterator itr = cgi.find( "xrdcl.unzip" );
405 if( itr != cgi.end() )
406 {
407 props.Set( "zipArchive", true );
408 props.Set( "zipSource", itr->second );
409 }
410
411 props.Get( "target", tmp );
412 URL target = tmp;
413 if( !target.IsValid() )
414 return XRootDStatus( stError, errInvalidArgs, 0, "invalid target" );
415
416 if( target.GetProtocol() != "stdio" )
417 {
418 // handle directories
419 bool targetIsDir = false;
420 props.Get( "targetIsDir", targetIsDir );
421
422 if( targetIsDir )
423 {
424 std::string path = target.GetPath() + '/';
425 std::string fn;
426
427 bool isZip = false;
428 props.Get( "zipArchive", isZip );
429 if( isZip )
430 {
431 props.Get( "zipSource", fn );
432 }
433 else if( source.IsMetalink() )
434 {
435 RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
436 VirtualRedirector *redirector = registry.Get( source );
437 fn = redirector->GetTargetName();
438 }
439 else
440 {
441 fn = source.GetPath();
442 }
443
444 size_t pos = fn.rfind( '/' );
445 if( pos != std::string::npos )
446 fn = fn.substr( pos + 1 );
447 path += fn;
448 target.SetPath( path );
449 props.Set( "target", target.GetURL() );
450 }
451 }
452
453 bool tpc = false;
454 props.Get( "thirdParty", tmp );
455 if( tmp != "none" )
456 tpc = true;
457
458 //------------------------------------------------------------------------
459 // Check if we have all we need
460 //------------------------------------------------------------------------
461 if( source.GetProtocol() != "stdio" && source.GetPath().empty() )
462 {
463 log->Debug( UtilityMsg, "CopyProcess (job #%d): no source specified.",
464 i );
465 CleanUpJobs();
466 XRootDStatus st = XRootDStatus( stError, errInvalidArgs );
467 res->Set( "status", st );
468 return st;
469 }
470
471 if( target.GetProtocol() != "stdio" && target.GetPath().empty() )
472 {
473 log->Debug( UtilityMsg, "CopyProcess (job #%d): no target specified.",
474 i );
475 CleanUpJobs();
476 XRootDStatus st = XRootDStatus( stError, errInvalidArgs );
477 res->Set( "status", st );
478 return st;
479 }
480
481 //------------------------------------------------------------------------
482 // Check what kind of job we should do
483 //------------------------------------------------------------------------
484 CopyJob *job = 0;
485
486 if( tpc == true )
487 {
488 MarkTPC( props );
489 job = new TPFallBackCopyJob( i+1, &props, res );
490 }
491 else
492 job = new ClassicCopyJob( i+1, &props, res );
493
494 pImpl->pJobs.push_back( job );
495 }
496 return XRootDStatus();
497 }
static RedirectorRegistry & Instance()
Returns reference to the single instance.
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33

References XrdCl::Log::Debug(), XrdCl::errInvalidArgs, XrdCl::PropertyList::Get(), XrdCl::RedirectorRegistry::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetParams(), XrdCl::URL::GetPath(), XrdCl::URL::GetProtocol(), XrdCl::VirtualRedirector::GetTargetName(), XrdCl::URL::GetURL(), XrdCl::PropertyList::HasProperty(), XrdCl::RedirectorRegistry::Instance(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::RedirectorRegistry::RegisterAndWait(), XrdCl::PropertyList::Set(), XrdCl::URL::SetPath(), XrdCl::stError, and XrdCl::UtilityMsg.

Referenced by DoCat(), and main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Run()

XRootDStatus XrdCl::CopyProcess::Run ( CopyProgressHandler * handler)

Run the copy jobs.

Definition at line 502 of file XrdClCopyProcess.cc.

503 {
504 //--------------------------------------------------------------------------
505 // Get the configuration
506 //--------------------------------------------------------------------------
507 uint8_t parallelThreads = 1;
508 if( pImpl->pJobProperties.size() > 0 &&
509 pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
510 pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
511 {
512 PropertyList &config = *pImpl->pJobProperties.rbegin();
513 if( config.HasProperty( "parallel" ) )
514 parallelThreads = (uint8_t)config.Get<int>( "parallel" );
515 }
516
517 //--------------------------------------------------------------------------
518 // Run the show
519 //--------------------------------------------------------------------------
520 std::vector<CopyJob *>::iterator it;
521 uint16_t currentJob = 1;
522 uint16_t totalJobs = pImpl->pJobs.size();
523
524 //--------------------------------------------------------------------------
525 // Single thread
526 //--------------------------------------------------------------------------
527 if( parallelThreads == 1 )
528 {
529 XRootDStatus err;
530
531 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
532 {
533 QueuedCopyJob j( *it, progress, currentJob, totalJobs );
534 j.Run(0);
535
536 XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
537 if( err.IsOK() && !st.IsOK() )
538 {
539 err = st;
540 }
541 ++currentJob;
542 }
543
544 if( !err.IsOK() ) return err;
545 }
546 //--------------------------------------------------------------------------
547 // Multiple threads
548 //--------------------------------------------------------------------------
549 else
550 {
551 uint16_t workers = std::min( (uint16_t)parallelThreads,
552 (uint16_t)pImpl->pJobs.size() );
553 JobManager jm( workers );
554 jm.Initialize();
555 if( !jm.Start() )
556 return XRootDStatus( stError, errOSError, 0,
557 "Unable to start job manager" );
558
559 XrdSysSemaphore *sem = new XrdSysSemaphore(0);
560 std::vector<QueuedCopyJob*> queued;
561 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
562 {
563 QueuedCopyJob *j = new QueuedCopyJob( *it, progress, currentJob,
564 totalJobs, sem );
565
566 queued.push_back( j );
567 jm.QueueJob(j, 0);
568 ++currentJob;
569 }
570
571 std::vector<QueuedCopyJob*>::iterator itQ;
572 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
573 sem->Wait();
574 delete sem;
575
576 if( !jm.Stop() )
577 return XRootDStatus( stError, errOSError, 0,
578 "Unable to stop job manager" );
579 jm.Finalize();
580 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
581 delete *itQ;
582
583 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
584 {
585 XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
586 if( !st.IsOK() ) return st;
587 }
588 };
589 return XRootDStatus();
590 }
const uint16_t errOSError

References XrdCl::errOSError, XrdCl::JobManager::Finalize(), XrdCl::PropertyList::Get(), XrdCl::PropertyList::HasProperty(), XrdCl::JobManager::Initialize(), XrdCl::Status::IsOK(), XrdCl::JobManager::QueueJob(), XrdCl::JobManager::Start(), XrdCl::stError, XrdCl::JobManager::Stop(), and XrdSysSemaphore::Wait().

Referenced by DoCat(), and main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: