xrootd
XrdClOperations.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_OPERATIONS_HH__
27 #define __XRD_CL_OPERATIONS_HH__
28 
29 #include <memory>
30 #include <stdexcept>
31 #include <sstream>
32 #include <tuple>
33 #include <future>
36 #include "XrdCl/XrdClArg.hh"
39 #include "XrdSys/XrdSysPthread.hh"
40 
41 namespace XrdCl
42 {
43 
44  template<bool HasHndl> class Operation;
45 
46  class Pipeline;
47 
48 
49  //----------------------------------------------------------------------------
51  //----------------------------------------------------------------------------
52  typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func;
53 
54  //----------------------------------------------------------------------------
57  //----------------------------------------------------------------------------
59  {
60  template<bool> friend class Operation;
61 
62  public:
63 
64  //------------------------------------------------------------------------
69  //------------------------------------------------------------------------
70  PipelineHandler( ResponseHandler *handler );
71 
72  //------------------------------------------------------------------------
74  //------------------------------------------------------------------------
76  {
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
82  void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
83  HostList *hostList );
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  void HandleResponse( XRootDStatus *status, AnyObject *response );
89 
90  //------------------------------------------------------------------------
92  //------------------------------------------------------------------------
94  {
95  }
96 
97  //------------------------------------------------------------------------
101  //------------------------------------------------------------------------
102  void AddOperation( Operation<true> *operation );
103 
104  //------------------------------------------------------------------------
111  //------------------------------------------------------------------------
112  void Assign( const Timeout &timeout,
113  std::promise<XRootDStatus> prms,
114  std::function<void(const XRootDStatus&)> final,
115  Operation<true> *opr );
116 
117  //------------------------------------------------------------------------
119  //------------------------------------------------------------------------
120  void Assign( std::function<void(const XRootDStatus&)> final );
121 
122  private:
123 
124  //------------------------------------------------------------------------
126  //------------------------------------------------------------------------
127  void HandleResponseImpl( XRootDStatus *status, AnyObject *response,
128  HostList *hostList = nullptr );
129 
130  inline void dealloc( XRootDStatus *status, AnyObject *response,
131  HostList *hostList )
132  {
133  delete status;
134  delete response;
135  delete hostList;
136  }
137 
138  //------------------------------------------------------------------------
140  //------------------------------------------------------------------------
141  std::unique_ptr<ResponseHandler> responseHandler;
142 
143  //------------------------------------------------------------------------
145  //------------------------------------------------------------------------
146  std::unique_ptr<Operation<true>> currentOperation;
147 
148  //------------------------------------------------------------------------
150  //------------------------------------------------------------------------
151  std::unique_ptr<Operation<true>> nextOperation;
152 
153  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
157 
158  //------------------------------------------------------------------------
160  //------------------------------------------------------------------------
161  std::promise<XRootDStatus> prms;
162 
163  //------------------------------------------------------------------------
166  //------------------------------------------------------------------------
167  std::function<void(const XRootDStatus&)> final;
168  };
169 
170  //----------------------------------------------------------------------------
176  //----------------------------------------------------------------------------
177  template<bool HasHndl>
178  class Operation
179  {
180  // Declare friendship between templates
181  template<bool>
182  friend class Operation;
183 
184  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
185 
186  friend class Pipeline;
187  friend class PipelineHandler;
188 
189  public:
190 
191  //------------------------------------------------------------------------
193  //------------------------------------------------------------------------
194  Operation() : valid( true )
195  {
196  }
197 
198  //------------------------------------------------------------------------
200  //------------------------------------------------------------------------
201  template<bool from>
203  handler( std::move( op.handler ) ), valid( true )
204  {
205  if( !op.valid ) throw std::invalid_argument( "Cannot construct "
206  "Operation from an invalid Operation!" );
207  op.valid = false;
208  }
209 
210  //------------------------------------------------------------------------
212  //------------------------------------------------------------------------
213  virtual ~Operation()
214  {
215  }
216 
217  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  virtual std::string ToString() = 0;
221 
222  //------------------------------------------------------------------------
226  //------------------------------------------------------------------------
227  virtual Operation<HasHndl>* Move() = 0;
228 
229  //------------------------------------------------------------------------
234  //------------------------------------------------------------------------
235  virtual Operation<true>* ToHandled() = 0;
236 
237  protected:
238 
239  //------------------------------------------------------------------------
249  //------------------------------------------------------------------------
250  void Run( Timeout timeout,
251  std::promise<XRootDStatus> prms,
252  std::function<void(const XRootDStatus&)> final )
253  {
254  static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
255  handler->Assign( timeout, std::move( prms ), std::move( final ), this );
256 
257  PipelineHandler *h = handler.release();
258  XRootDStatus st;
259  try
260  {
261  st = RunImpl( h, timeout );
262  }
263  catch( const operation_expired& ex )
264  {
266  }
267  catch( const PipelineException& ex ) // probably not needed
268  {
269  st = ex.GetError();
270  }
271  catch( const std::exception& ex )
272  {
273  st = XRootDStatus( stError, errInternal, 0, ex.what() );
274  }
275 
276  if( !st.IsOK() )
277  h->HandleResponse( new XRootDStatus( st ), nullptr );
278  }
279 
280  //------------------------------------------------------------------------
287  //------------------------------------------------------------------------
288  virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
289 
290  //------------------------------------------------------------------------
294  //------------------------------------------------------------------------
296  {
297  if( handler )
298  handler->AddOperation( op );
299  }
300 
301  //------------------------------------------------------------------------
303  //------------------------------------------------------------------------
304  std::unique_ptr<PipelineHandler> handler;
305 
306  //------------------------------------------------------------------------
308  //------------------------------------------------------------------------
309  bool valid;
310  };
311 
312  //----------------------------------------------------------------------------
318  //----------------------------------------------------------------------------
319  class Pipeline
320  {
321  template<bool> friend class ParallelOperation;
322  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
323  friend class PipelineHandler;
324 
325  public:
326 
327  //------------------------------------------------------------------------
329  //------------------------------------------------------------------------
331  operation( op->Move() )
332  {
333 
334  }
335 
336  //------------------------------------------------------------------------
338  //------------------------------------------------------------------------
340  operation( op.Move() )
341  {
342 
343  }
344 
345  //------------------------------------------------------------------------
347  //------------------------------------------------------------------------
349  operation( op.Move() )
350  {
351 
352  }
353 
355  operation( op->ToHandled() )
356  {
357 
358  }
359 
360  //------------------------------------------------------------------------
362  //------------------------------------------------------------------------
364  operation( op.ToHandled() )
365  {
366 
367  }
368 
369  //------------------------------------------------------------------------
371  //------------------------------------------------------------------------
373  operation( op.ToHandled() )
374  {
375 
376  }
377 
378  Pipeline( Pipeline &&pipe ) :
379  operation( std::move( pipe.operation ) )
380  {
381 
382  }
383 
384  //------------------------------------------------------------------------
386  //------------------------------------------------------------------------
388  {
389  operation = std::move( pipe.operation );
390  return *this;
391  }
392 
393  //------------------------------------------------------------------------
397  //------------------------------------------------------------------------
398  operator Operation<true>&()
399  {
400  if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
401  return *operation.get();
402  }
403 
404  //------------------------------------------------------------------------
408  //------------------------------------------------------------------------
409  operator bool()
410  {
411  return bool( operation );
412  }
413 
414  //------------------------------------------------------------------------
418  //------------------------------------------------------------------------
419  static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
420 
421  //------------------------------------------------------------------------
423  //------------------------------------------------------------------------
424  static void Repeat();
425 
426  //------------------------------------------------------------------------
428  //------------------------------------------------------------------------
429  static void Replace( Operation<false> &&opr );
430 
431  //------------------------------------------------------------------------
433  //------------------------------------------------------------------------
434  static void Replace( Pipeline p );
435 
436  //------------------------------------------------------------------------
438  //------------------------------------------------------------------------
439  static void Ignore();
440 
441  private:
442 
443  //------------------------------------------------------------------------
448  //------------------------------------------------------------------------
450  {
451  return operation.get();
452  }
453 
454  //------------------------------------------------------------------------
459  //------------------------------------------------------------------------
460  void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
461  {
462  if( ftr.valid() )
463  throw std::logic_error( "Pipeline is already running" );
464 
465  // a promise that the pipe will have a result
466  std::promise<XRootDStatus> prms;
467  ftr = prms.get_future();
468 
469  Operation<true> *opr = operation.release();
470  opr->Run( timeout, std::move( prms ), std::move( final ) );
471  }
472 
473  //------------------------------------------------------------------------
475  //------------------------------------------------------------------------
476  std::unique_ptr<Operation<true>> operation;
477 
478  //------------------------------------------------------------------------
480  //------------------------------------------------------------------------
481  std::future<XRootDStatus> ftr;
482 
483  };
484 
485  //----------------------------------------------------------------------------
492  //----------------------------------------------------------------------------
493  inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
494  {
495  pipeline.Run( timeout );
496  return std::move( pipeline.ftr );
497  }
498 
499  //----------------------------------------------------------------------------
506  //----------------------------------------------------------------------------
507  inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
508  {
509  return Async( std::move( pipeline ), timeout ).get();
510  }
511 
512  //----------------------------------------------------------------------------
519  //----------------------------------------------------------------------------
520  template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
521  class ConcreteOperation: public Operation<HasHndl>
522  {
523  template<template<bool> class, bool, typename, typename ...>
524  friend class ConcreteOperation;
525 
526  public:
527 
528  //------------------------------------------------------------------------
532  //------------------------------------------------------------------------
533  ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
534  timeout( 0 )
535  {
536  static_assert( !HasHndl, "It is only possible to construct operation without handler" );
537  }
538 
539  //------------------------------------------------------------------------
545  //------------------------------------------------------------------------
546  template<bool from>
548  Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
549  {
550  }
551 
552  //------------------------------------------------------------------------
560  //------------------------------------------------------------------------
561  template<typename Hdlr>
562  Derived<true> operator>>( Hdlr &&hdlr )
563  {
564  return this->StreamImpl( HdlrFactory::Create( hdlr ) );
565  }
566 
567  //------------------------------------------------------------------------
573  //------------------------------------------------------------------------
574  Derived<true> operator|( Operation<true> &op )
575  {
576  return PipeImpl( *this, op );
577  }
578 
579  //------------------------------------------------------------------------
585  //------------------------------------------------------------------------
586  Derived<true> operator|( Operation<true> &&op )
587  {
588  return PipeImpl( *this, op );
589  }
590 
591  //------------------------------------------------------------------------
597  //------------------------------------------------------------------------
598  Derived<true> operator|( Operation<false> &op )
599  {
600  return PipeImpl( *this, op );
601  }
602 
603  //------------------------------------------------------------------------
609  //------------------------------------------------------------------------
610  Derived<true> operator|( Operation<false> &&op )
611  {
612  return PipeImpl( *this, op );
613  }
614 
615  //------------------------------------------------------------------------
617  //------------------------------------------------------------------------
618  Derived<true> operator|( FinalOperation &&fo )
619  {
620  AllocHandler( *this );
621  this->handler->Assign( fo.final );
622  return this->template Transform<true>();
623  }
624 
625  //------------------------------------------------------------------------
629  //------------------------------------------------------------------------
631  {
632  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
633  return new Derived<HasHndl>( std::move( *me ) );
634  }
635 
636  //------------------------------------------------------------------------
640  //------------------------------------------------------------------------
642  {
643  this->handler.reset( new PipelineHandler() );
644  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
645  return new Derived<true>( std::move( *me ) );
646  }
647 
648  //------------------------------------------------------------------------
650  //------------------------------------------------------------------------
651  Derived<HasHndl> Timeout( uint16_t timeout )
652  {
653  this->timeout = timeout;
654  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
655  return std::move( *me );
656  }
657 
658  protected:
659 
660  //------------------------------------------------------------------------
664  //------------------------------------------------------------------------
665  template<bool to>
666  inline Derived<to> Transform()
667  {
668  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
669  return Derived<to>( std::move( *me ) );
670  }
671 
672  //------------------------------------------------------------------------
678  //------------------------------------------------------------------------
679  inline Derived<true> StreamImpl( ResponseHandler *handler )
680  {
681  static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
682  this->handler.reset( new PipelineHandler( handler ) );
683  return Transform<true>();
684  }
685 
686  //------------------------------------------------------------------------
687  // Allocate handler if necessary
688  //------------------------------------------------------------------------
689  inline static
691  {
692  // nothing to do
693  }
694 
695  //------------------------------------------------------------------------
696  // Allocate handler if necessary
697  //------------------------------------------------------------------------
698  inline static
700  {
701  me.handler.reset( new PipelineHandler() );
702  }
703 
704  //------------------------------------------------------------------------
711  //------------------------------------------------------------------------
712  inline static
713  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
714  Args...> &me, Operation<true> &op )
715  {
716  AllocHandler( me ); // if HasHndl is false allocate handler
717  me.AddOperation( op.Move() );
718  return me.template Transform<true>();
719  }
720 
721  //------------------------------------------------------------------------
728  //------------------------------------------------------------------------
729  inline static
730  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
731  Args...> &me, Operation<false> &op )
732  {
733  AllocHandler( me ); // if HasHndl is false allocate handler
734  me.AddOperation( op.ToHandled() );
735  return me.template Transform<true>();
736  }
737 
738  //------------------------------------------------------------------------
740  //------------------------------------------------------------------------
741  std::tuple<Args...> args;
742 
743  //------------------------------------------------------------------------
745  //------------------------------------------------------------------------
746  uint16_t timeout;
747  };
748 }
749 
750 #endif // __XRD_CL_OPERATIONS_HH__
std::unique_ptr< Operation< true > > operation
First operation in the pipeline.
Definition: XrdClOperations.hh:476
Definition: XrdClAnyObject.hh:32
Operation(Operation< from > &&op)
Move constructor between template instances.
Definition: XrdClOperations.hh:202
bool valid
Flag indicating if it is a valid object.
Definition: XrdClOperations.hh:309
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
Definition: XrdClOperations.hh:493
std::future< XRootDStatus > ftr
The future result of the pipeline.
Definition: XrdClOperations.hh:481
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
Definition: XrdClOperations.hh:547
Pipeline(Operation< false > &&op)
Constructor.
Definition: XrdClOperations.hh:372
Definition: XrdClOperationTimeout.hh:20
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
Definition: XrdClOperations.hh:713
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Definition: XrdSysKernelBuffer.hh:452
Derived< true > operator|(Operation< true > &&op)
Definition: XrdClOperations.hh:586
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Definition: XrdClOperations.hh:387
static void Repeat()
Repeat current operation.
Derived< true > operator|(Operation< true > &op)
Definition: XrdClOperations.hh:574
Derived< true > operator>>(Hdlr &&hdlr)
Definition: XrdClOperations.hh:562
uint16_t timeout
Operation timeout.
Definition: XrdClOperations.hh:746
void HandleResponseImpl(XRootDStatus *status, AnyObject *response, HostList *hostList=nullptr)
Callback function implementation;.
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:122
void dealloc(XRootDStatus *status, AnyObject *response, HostList *hostList)
Definition: XrdClOperations.hh:130
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Definition: XrdClOperations.hh:618
ConcreteOperation(Args &&... args)
Definition: XrdClOperations.hh:533
void AddOperation(Operation< true > *op)
Definition: XrdClOperations.hh:295
Derived< true > StreamImpl(ResponseHandler *handler)
Definition: XrdClOperations.hh:679
Definition: XrdClFinalOperation.hh:39
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Definition: XrdClOperations.hh:651
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
Definition: XrdClOperations.hh:493
~PipelineHandler()
Destructor.
Definition: XrdClOperations.hh:93
Definition: XrdClOperationTimeout.hh:18
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
Definition: XrdClOperations.hh:493
Derived< true > operator|(Operation< false > &op)
Definition: XrdClOperations.hh:598
static void Replace(Operation< false > &&opr)
Replace current operation.
virtual ~Operation()
Destructor.
Definition: XrdClOperations.hh:213
Pipeline exception, wrapps an XRootDStatus.
Definition: XrdClOperationHandlers.hh:356
Pipeline(Operation< true > &&op)
Constructor.
Definition: XrdClOperations.hh:348
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
Definition: XrdClOperations.hh:46
std::vector< HostInfo > HostList
Definition: XrdClXRootDResponses.hh:1035
void Run(Timeout timeout, std::function< void(const XRootDStatus &)> final=nullptr)
Definition: XrdClOperations.hh:460
Timeout timeout
Pipeline timeout.
Definition: XrdClOperations.hh:156
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
static void Ignore()
Ignore error and proceed with the pipeline.
friend class ConcreteOperation
Definition: XrdClOperations.hh:524
std::tuple< Args... > args
Operation arguments.
Definition: XrdClOperations.hh:741
Pipeline(Pipeline &&pipe)
Definition: XrdClOperations.hh:378
PipelineHandler()
Default Constructor.
Definition: XrdClOperations.hh:75
virtual std::string ToString()=0
Name of the operation.
std::unique_ptr< Operation< true > > nextOperation
Next operation in the pipeline.
Definition: XrdClOperations.hh:151
Pipeline(Operation< false > &op)
Constructor.
Definition: XrdClOperations.hh:363
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
Definition: XrdClOperations.hh:690
Definition: XrdClAnyObject.hh:25
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
Definition: XrdClOperations.hh:507
Definition: XrdClOperations.hh:44
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
Operation()
Constructor.
Definition: XrdClOperations.hh:194
const XRootDStatus & GetError() const
Definition: XrdClOperationHandlers.hh:396
Definition: XrdClOperations.hh:58
Operation< HasHndl > * Move()
Definition: XrdClOperations.hh:630
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
Pipeline(Operation< true > &op)
Constructor.
Definition: XrdClOperations.hh:339
std::promise< XRootDStatus > prms
The promise that there will be a result (traveling along the pipeline)
Definition: XrdClOperations.hh:161
Handle an async response.
Definition: XrdClXRootDResponses.hh:1040
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
Definition: XrdClOperations.hh:699
Operation< true > * operator->()
Definition: XrdClOperations.hh:449
std::unique_ptr< Operation< true > > currentOperation
The operation the handler is assigned to.
Definition: XrdClOperations.hh:146
Derived< to > Transform()
Definition: XrdClOperations.hh:666
void AddOperation(Operation< true > *operation)
Pipeline(Operation< false > *op)
Definition: XrdClOperations.hh:354
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
virtual Operation< true > * ToHandled()=0
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Definition: XrdClOperations.hh:730
friend class PipelineHandler
Definition: XrdClOperations.hh:187
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
Definition: XrdClOperations.hh:250
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
Operation< true > * ToHandled()
Definition: XrdClOperations.hh:641
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< true > *op)
Constructor.
Definition: XrdClOperations.hh:330
std::unique_ptr< ResponseHandler > responseHandler
The handler of our operation.
Definition: XrdClOperations.hh:141
Derived< true > operator|(Operation< false > &&op)
Definition: XrdClOperations.hh:610
Definition: XrdClOperations.hh:319
Definition: XrdClParallelOperation.hh:79
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
virtual Operation< HasHndl > * Move()=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:304
Definition: XrdClOperations.hh:521