26 #ifndef __XRD_CL_PARALLELOPERATION_HH__ 27 #define __XRD_CL_PARALLELOPERATION_HH__ 36 #include <condition_variable> 76 template<
bool HasHndl>
101 template<
class Container>
104 static_assert( !HasHndl,
"Constructor is available only operation without handler");
107 auto begin = std::make_move_iterator( container.begin() );
108 auto end = std::make_move_iterator( container.end() );
109 std::copy( begin, end, std::back_inserter(
pipelines ) );
122 std::ostringstream oss;
124 for(
size_t i = 0; i <
pipelines.size(); i++ )
145 return std::move( *
this );
157 return std::move( *
this );
169 return std::move( *
this );
182 return std::move( *
this );
197 if( status.
IsOK() )
return false;
218 size_t nb =
cnt.fetch_sub( 1 );
220 if( status.
IsOK() )
return true;
222 if( nb == 1 )
return true;
253 size_t f =
failed.fetch_add( 1 );
284 if( status.
IsOK() )
return false;
305 std::unique_lock<std::mutex> lck(
mtx );
306 if(
on )
cv.wait( lck );
316 std::condition_variable
cv;
355 if(
policy->Examine( st ) )
442 std::shared_ptr<Ctx> ctx =
445 uint16_t
timeout = pipelineTimeout < this->timeout ?
446 pipelineTimeout : this->
timeout;
448 for(
size_t i = 0; i <
pipelines.size(); ++i )
466 template<
class Container>
484 template<
typename ... Others>
485 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
488 template<
typename ... Others>
489 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
492 template<
typename ... Others>
493 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
499 template<
typename ... Others>
503 v.emplace_back( operation );
507 template<
typename ... Others>
511 v.emplace_back( operation );
515 template<
typename ... Others>
519 v.emplace_back( std::move( pipeline ) );
529 template<
typename ... Operations>
532 constexpr
size_t size =
sizeof...( operations );
533 std::vector<Pipeline> v;
540 #endif // __XRD_CL_OPERATIONS_HH__ static void Schedule(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Schedule Ctx::Examine to be executed in the client thread-pool.
Definition: XrdClParallelOperation.hh:423
A synchronized queue.
Definition: XrdClJobManager.hh:50
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:475
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
barrier_t barrier
Definition: XrdClParallelOperation.hh:389
std::atomic< size_t > succeeded
Definition: XrdClParallelOperation.hh:262
SomePolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:239
XrdCl::XRootDStatus st
Definition: XrdClParallelOperation.hh:416
Definition: XrdClParallelOperation.hh:193
A wait barrier helper class.
Definition: XrdClParallelOperation.hh:299
AtLeastPolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:276
std::string ToString()
Definition: XrdClParallelOperation.hh:120
ParallelOperation< HasHndl > All()
Definition: XrdClParallelOperation.hh:142
XRootDStatus RunImpl(PipelineHandler *handler, uint16_t pipelineTimeout)
Definition: XrdClParallelOperation.hh:437
uint16_t timeout
Operation timeout.
Definition: XrdClOperations.hh:746
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:120
ParallelOperation< HasHndl > Any()
Definition: XrdClParallelOperation.hh:154
const size_t threshold
Definition: XrdClParallelOperation.hh:263
std::mutex mtx
Definition: XrdClParallelOperation.hh:317
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:215
std::condition_variable cv
Definition: XrdClParallelOperation.hh:316
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:281
The thread-pool job for schedule Ctx::Examine.
Definition: XrdClParallelOperation.hh:395
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
void Run(void *)
The job logic.
Definition: XrdClParallelOperation.hh:408
Definition: XrdClParallelOperation.hh:274
Definition: XrdClOperationHandlers.hh:623
Definition: XrdClParallelOperation.hh:237
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:228
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:365
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition: XrdClParallelOperation.hh:383
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:87
const size_t size
Definition: XrdClParallelOperation.hh:264
barrier_t()
Definition: XrdClParallelOperation.hh:301
AnyPolicy(size_t size)
Definition: XrdClParallelOperation.hh:211
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:195
~ParallelOperation()
Definition: XrdClParallelOperation.hh:113
Request status.
Definition: XrdClXRootDResponses.hh:214
Definition: XrdClAnyObject.hh:25
Definition: XrdClOperations.hh:44
PipelineEnd(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Definition: XrdClParallelOperation.hh:400
ParallelOperation< HasHndl > Some(size_t threshold)
Definition: XrdClParallelOperation.hh:166
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition: XrdClParallelOperation.hh:334
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:244
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:378
Definition: XrdClOperations.hh:58
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:459
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:102
bool on
Definition: XrdClParallelOperation.hh:318
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:467
virtual ~PolicyExecutor()
Definition: XrdClParallelOperation.hh:62
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void wait()
Definition: XrdClParallelOperation.hh:303
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:342
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition: XrdClParallelOperation.hh:179
std::shared_ptr< Ctx > ctx
Definition: XrdClParallelOperation.hh:415
JobManager * GetJobManager()
Get the job manager object user by the post master.
Definition: XrdClParallelOperation.hh:209
Definition: XrdClParallelOperation.hh:60
std::atomic< size_t > failed_cnt
Definition: XrdClParallelOperation.hh:292
void Examine(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:353
static PostMaster * GetPostMaster()
Get default post master.
std::atomic< size_t > failed
Definition: XrdClParallelOperation.hh:261
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
Definition: XrdClOperations.hh:319
Definition: XrdClParallelOperation.hh:77
std::unique_ptr< PolicyExecutor > policy
Definition: XrdClParallelOperation.hh:460
Definition: XrdClParallelOperation.hh:327
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:304
void lift()
Definition: XrdClParallelOperation.hh:309
const size_t failed_threshold
Definition: XrdClParallelOperation.hh:293
Definition: XrdClOperations.hh:521