26 #ifndef __XRD_CL_PARALLELOPERATION_HH__ 27 #define __XRD_CL_PARALLELOPERATION_HH__ 36 #include <condition_variable> 78 template<
bool HasHndl>
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter(
pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i <
pipelines.size(); i++ )
147 return std::move( *
this );
159 return std::move( *
this );
171 return std::move( *
this );
184 return std::move( *
this );
201 if( status.
IsOK() )
return false;
231 size_t nb =
cnt.fetch_sub( 1 );
233 if( status.
IsOK() )
return true;
235 if( nb == 1 )
return true;
274 size_t f =
failed.fetch_add( 1 );
314 if( status.
IsOK() )
return false;
318 if( pending == 0 )
return true;
344 std::unique_lock<std::mutex> lck(
mtx );
345 if(
on )
cv.wait( lck );
355 std::condition_variable
cv;
394 if(
policy->Examine( st ) )
481 std::shared_ptr<Ctx> ctx =
484 uint16_t
timeout = pipelineTimeout < this->timeout ?
485 pipelineTimeout : this->
timeout;
487 for(
size_t i = 0; i <
pipelines.size(); ++i )
505 template<
class Container>
523 template<
typename ... Others>
524 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
527 template<
typename ... Others>
528 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
531 template<
typename ... Others>
532 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
538 template<
typename ... Others>
542 v.emplace_back( operation );
546 template<
typename ... Others>
550 v.emplace_back( operation );
554 template<
typename ... Others>
558 v.emplace_back( std::move( pipeline ) );
568 template<
typename ... Operations>
571 constexpr
size_t size =
sizeof...( operations );
572 std::vector<Pipeline> v;
579 #endif // __XRD_CL_OPERATIONS_HH__ static void Schedule(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Schedule Ctx::Examine to be executed in the client thread-pool.
Definition: XrdClParallelOperation.hh:462
A synchronized queue.
Definition: XrdClJobManager.hh:50
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:514
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
barrier_t barrier
Definition: XrdClParallelOperation.hh:428
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:281
std::atomic< size_t > succeeded
Definition: XrdClParallelOperation.hh:288
SomePolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:258
XrdCl::XRootDStatus st
Definition: XrdClParallelOperation.hh:455
Definition: XrdClParallelOperation.hh:195
A wait barrier helper class.
Definition: XrdClParallelOperation.hh:338
AtLeastPolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:303
std::atomic< size_t > pending_cnt
Definition: XrdClParallelOperation.hh:329
std::string ToString()
Definition: XrdClParallelOperation.hh:122
ParallelOperation< HasHndl > All()
Definition: XrdClParallelOperation.hh:144
XRootDStatus RunImpl(PipelineHandler *handler, uint16_t pipelineTimeout)
Definition: XrdClParallelOperation.hh:476
uint16_t timeout
Operation timeout.
Definition: XrdClOperations.hh:746
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:122
ParallelOperation< HasHndl > Any()
Definition: XrdClParallelOperation.hh:156
const size_t threshold
Definition: XrdClParallelOperation.hh:289
std::mutex mtx
Definition: XrdClParallelOperation.hh:356
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:226
std::condition_variable cv
Definition: XrdClParallelOperation.hh:355
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:309
The thread-pool job for schedule Ctx::Examine.
Definition: XrdClParallelOperation.hh:434
XRootDStatus res
Definition: XrdClParallelOperation.hh:247
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
void Run(void *)
The job logic.
Definition: XrdClParallelOperation.hh:447
Definition: XrdClParallelOperation.hh:301
Definition: XrdClOperationHandlers.hh:623
Definition: XrdClParallelOperation.hh:256
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:246
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:404
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition: XrdClParallelOperation.hh:422
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:89
const size_t size
Definition: XrdClParallelOperation.hh:290
barrier_t()
Definition: XrdClParallelOperation.hh:340
XRootDStatus res
Definition: XrdClParallelOperation.hh:211
AnyPolicy(size_t size)
Definition: XrdClParallelOperation.hh:222
virtual XRootDStatus Result()=0
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:206
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:197
~ParallelOperation()
Definition: XrdClParallelOperation.hh:115
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
Definition: XrdClOperations.hh:44
PipelineEnd(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Definition: XrdClParallelOperation.hh:439
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:240
ParallelOperation< HasHndl > Some(size_t threshold)
Definition: XrdClParallelOperation.hh:168
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition: XrdClParallelOperation.hh:373
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:263
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:417
Definition: XrdClOperations.hh:58
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:498
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:104
bool on
Definition: XrdClParallelOperation.hh:357
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:506
virtual ~PolicyExecutor()
Definition: XrdClParallelOperation.hh:62
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void wait()
Definition: XrdClParallelOperation.hh:342
XRootDStatus res
Definition: XrdClParallelOperation.hh:291
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:381
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition: XrdClParallelOperation.hh:181
std::shared_ptr< Ctx > ctx
Definition: XrdClParallelOperation.hh:454
JobManager * GetJobManager()
Get the job manager object user by the post master.
XRootDStatus res
Definition: XrdClParallelOperation.hh:332
Definition: XrdClParallelOperation.hh:220
Definition: XrdClParallelOperation.hh:60
std::atomic< size_t > failed_cnt
Definition: XrdClParallelOperation.hh:330
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:323
void Examine(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:392
static PostMaster * GetPostMaster()
Get default post master.
std::atomic< size_t > failed
Definition: XrdClParallelOperation.hh:287
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
Definition: XrdClOperations.hh:319
Definition: XrdClParallelOperation.hh:79
std::unique_ptr< PolicyExecutor > policy
Definition: XrdClParallelOperation.hh:499
Definition: XrdClParallelOperation.hh:366
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:304
void lift()
Definition: XrdClParallelOperation.hh:348
const size_t failed_threshold
Definition: XrdClParallelOperation.hh:331
Definition: XrdClOperations.hh:521