xrootd
XrdClOperationHandlers.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_OPERATION_HANDLERS_HH__
27 #define __XRD_CL_OPERATION_HANDLERS_HH__
28 
29 #include "XrdCl/XrdClFile.hh"
30 
31 #include<functional>
32 #include<future>
33 #include <atomic>
34 
35 namespace XrdCl
36 {
37  //----------------------------------------------------------------------------
39  //----------------------------------------------------------------------------
41  {
42  public:
43 
45  {
46  }
47 
48  //------------------------------------------------------------------------
50  //------------------------------------------------------------------------
51  void HandleResponse( XRootDStatus *status, AnyObject *response )
52  {
53  // status maybe error for old servers not supporting xattrs
54  if( !status->IsOK() )
55  {
56  handler->HandleResponse( status, nullptr );
57  return;
58  }
59 
60  std::vector<XAttrStatus> *bulk = nullptr;
61  response->Get( bulk );
62  *status = bulk->front().status;
63  handler->HandleResponse( status, nullptr );
64  delete response;
65  delete this;
66  }
67 
68  private:
69 
71  };
72 
73  //----------------------------------------------------------------------------
75  //----------------------------------------------------------------------------
77  {
78  public:
79 
81  {
82  }
83 
84  //------------------------------------------------------------------------
86  //------------------------------------------------------------------------
87  void HandleResponse( XRootDStatus *status, AnyObject *response )
88  {
89  // status is always OK for bulk response
90 
91  std::vector<XAttr> *bulk = nullptr;
92  response->Get( bulk );
93  *status = bulk->front().status;
94  std::string *rsp = new std::string( std::move( bulk->front().value ) );
95  delete bulk;
96  response->Set( rsp );
97  handler->HandleResponse( status, response );
98  delete this;
99  }
100 
101  private:
102 
104  };
105 
106  //----------------------------------------------------------------------------
107  // Helper class for creating null references for particular types
108  //
109  // @arg Response : type for which we need a null reference
110  //----------------------------------------------------------------------------
111  template<typename Response>
112  struct NullRef
113  {
114  static Response value;
115  };
116 
117  //----------------------------------------------------------------------------
118  // Initialize the 'null-reference'
119  //----------------------------------------------------------------------------
120  template<typename Response>
121  Response NullRef<Response>::value;
122 
123  //----------------------------------------------------------------------------
128  //----------------------------------------------------------------------------
129  template<typename Response>
130  inline Response* GetResponse( AnyObject *rsp )
131  {
132  Response *ret = nullptr;
133  rsp->Get( ret );
134  return ret;
135  }
136 
137  //----------------------------------------------------------------------------
143  //----------------------------------------------------------------------------
144  template<typename Response>
145  inline Response* GetResponse( XRootDStatus *status, AnyObject *rsp )
146  {
147  if( !status->IsOK() ) return &NullRef<Response>::value;
148  return GetResponse<Response>( rsp );
149  }
150 
151  //----------------------------------------------------------------------------
155  //----------------------------------------------------------------------------
156  template<typename Response>
158  {
159  public:
160 
161  //------------------------------------------------------------------------
163  //
165  //------------------------------------------------------------------------
167  std::function<void( XRootDStatus&, Response& )> handleFunction ) :
168  fun( handleFunction )
169  {
170  }
171 
172  //------------------------------------------------------------------------
174  //------------------------------------------------------------------------
175  void HandleResponse( XRootDStatus *status, AnyObject *response )
176  {
177  Response *res = GetResponse<Response>( status, response );
178  fun( *status, *res );
179  delete status;
180  delete response;
181  delete this;
182  }
183 
184  private:
185  //------------------------------------------------------------------------
187  //------------------------------------------------------------------------
188  std::function<void( XRootDStatus&, Response& )> fun;
189  };
190 
191  //----------------------------------------------------------------------------
195  //----------------------------------------------------------------------------
196  template<>
197  class FunctionWrapper<void> : public ResponseHandler
198  {
199  public:
200 
201  //------------------------------------------------------------------------
203  //
205  //------------------------------------------------------------------------
207  std::function<void( XRootDStatus& )> handleFunction ) :
208  fun( handleFunction )
209  {
210  }
211 
212  //------------------------------------------------------------------------
214  //------------------------------------------------------------------------
215  void HandleResponse( XRootDStatus *status, AnyObject *response )
216  {
217  fun( *status );
218  delete status;
219  delete response;
220  delete this;
221  }
222 
223  private:
224  //------------------------------------------------------------------------
226  //------------------------------------------------------------------------
227  std::function<void( XRootDStatus& )> fun;
228  };
229 
230  //----------------------------------------------------------------------------
235  //----------------------------------------------------------------------------
236  template<typename Response, typename Return>
238  {
239  public:
240 
241  //------------------------------------------------------------------------
243  //
245  //------------------------------------------------------------------------
246  TaskWrapper( std::packaged_task<Return( XRootDStatus&, Response& )> && task ) :
247  task( std::move( task ) )
248  {
249  }
250 
251  //------------------------------------------------------------------------
253  //------------------------------------------------------------------------
254  void HandleResponse( XRootDStatus *status, AnyObject *response )
255  {
256  Response *resp = GetResponse<Response>( status, response );
257  task( *status, *resp );
258  delete status;
259  delete response;
260  delete this;
261  }
262 
263  private:
264 
265  //------------------------------------------------------------------------
267  //------------------------------------------------------------------------
268  std::packaged_task<Return( XRootDStatus&, Response& )> task;
269  };
270 
271  //----------------------------------------------------------------------------
277  //----------------------------------------------------------------------------
278  template<typename Return>
279  class TaskWrapper<void, Return>: public ResponseHandler
280  {
281  public:
282 
283  //------------------------------------------------------------------------
285  //
287  //------------------------------------------------------------------------
288  TaskWrapper( std::packaged_task<Return( XRootDStatus& )> && task ) :
289  task( std::move( task ) )
290  {
291  }
292 
293  //------------------------------------------------------------------------
295  //------------------------------------------------------------------------
296  void HandleResponse( XRootDStatus *status, AnyObject *response )
297  {
298  task( *status );
299  delete status;
300  delete response;
301  delete this;
302  }
303 
304  private:
305 
306  //------------------------------------------------------------------------
308  //------------------------------------------------------------------------
309  std::packaged_task<Return( XRootDStatus& )> task;
310  };
311 
312 
313  //----------------------------------------------------------------------------
315  //----------------------------------------------------------------------------
317  {
318  public:
319 
320  //------------------------------------------------------------------------
322  //
324  //------------------------------------------------------------------------
326  std::function<void( XRootDStatus&, StatInfo& )> handleFunction ) :
327  f( f ), fun( handleFunction )
328  {
329  }
330 
331  //------------------------------------------------------------------------
333  //------------------------------------------------------------------------
334  void HandleResponse( XRootDStatus *status, AnyObject *response )
335  {
336  StatInfo *info = nullptr;
337  if( status->IsOK() )
338  XRootDStatus st = f.Stat( false, info );
339  else
340  info = &NullRef<StatInfo>::value;
341  fun( *status, *info );
342  if( info != &NullRef<StatInfo>::value ) delete info;
343  delete status;
344  delete response;
345  delete this;
346  }
347 
348  private:
349  File &f;
350  //------------------------------------------------------------------------
352  //------------------------------------------------------------------------
353  std::function<void( XRootDStatus&, StatInfo& )> fun;
354  };
355 
356  //----------------------------------------------------------------------------
358  //----------------------------------------------------------------------------
359  class PipelineException : public std::exception
360  {
361  public:
362 
363  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
367  {
368 
369  }
370 
371  //------------------------------------------------------------------------
373  //------------------------------------------------------------------------
375  {
376 
377  }
378 
379  //------------------------------------------------------------------------
381  //------------------------------------------------------------------------
383  {
384  error = ex.error;
385  return *this;
386  }
387 
388  //------------------------------------------------------------------------
390  //------------------------------------------------------------------------
391  const char* what() const noexcept
392  {
393  return error.ToString().c_str();
394  }
395 
396  //------------------------------------------------------------------------
398  //------------------------------------------------------------------------
399  const XRootDStatus& GetError() const
400  {
401  return error;
402  }
403 
404  private:
405 
406  //------------------------------------------------------------------------
408  //------------------------------------------------------------------------
410  };
411 
412  //----------------------------------------------------------------------------
416  //----------------------------------------------------------------------------
417  template<typename Response>
419  {
420  public:
421 
422  //------------------------------------------------------------------------
427  //------------------------------------------------------------------------
428  FutureWrapperBase( std::future<Response> &ftr )
429  {
430  ftr = prms.get_future();
431  }
432 
433  protected:
434 
435  //------------------------------------------------------------------------
439  //------------------------------------------------------------------------
440  void SetException( const XRootDStatus &err )
441  {
442  std::exception_ptr ex = std::make_exception_ptr( PipelineException( err ) );
443  prms.set_exception( ex );
444  }
445 
446  //------------------------------------------------------------------------
448  //------------------------------------------------------------------------
449  std::promise<Response> prms;
450  };
451 
452  //----------------------------------------------------------------------------
456  //----------------------------------------------------------------------------
457  template<typename Response>
458  class FutureWrapper : public FutureWrapperBase<Response>
459  {
460  public:
461 
462  //------------------------------------------------------------------------
466  //------------------------------------------------------------------------
467  FutureWrapper( std::future<Response> &ftr ) : FutureWrapperBase<Response>( ftr )
468  {
469 
470  }
471 
472  //------------------------------------------------------------------------
474  //------------------------------------------------------------------------
475  void HandleResponse( XRootDStatus *status, AnyObject *response )
476  {
477  if( status->IsOK() )
478  {
479  Response *resp = GetResponse<Response>( response );
480  if( resp == &NullRef<Response>::value )
482  else
483  this->prms.set_value( std::move( *resp ) );
484  }
485  else
486  this->SetException( *status );
487 
488  delete status;
489  delete response;
490  delete this;
491  }
492  };
493 
494  //----------------------------------------------------------------------------
496  //----------------------------------------------------------------------------
497  template<>
498  class FutureWrapper<void> : public FutureWrapperBase<void>
499  {
500  public:
501 
502  //------------------------------------------------------------------------
506  //------------------------------------------------------------------------
507  FutureWrapper( std::future<void> &ftr ) : FutureWrapperBase<void>( ftr )
508  {
509 
510  }
511 
512  //------------------------------------------------------------------------
514  //------------------------------------------------------------------------
515  void HandleResponse( XRootDStatus *status, AnyObject *response )
516  {
517  if( status->IsOK() )
518  {
519  prms.set_value();
520  }
521  else
522  SetException( *status );
523 
524  delete status;
525  delete response;
526  delete this;
527  }
528  };
529 
530 
531  //----------------------------------------------------------------------------
536  //----------------------------------------------------------------------------
538  {
539  public:
540 
541  //------------------------------------------------------------------------
545  //------------------------------------------------------------------------
547  {
548 
549  }
550 
551  //------------------------------------------------------------------------
553  //------------------------------------------------------------------------
555  {
556  ResponseHandler* hdlr = handler.exchange( nullptr );
557  if( hdlr )
558  hdlr->HandleResponseWithHosts( new XRootDStatus( stError, errPipelineFailed), nullptr, nullptr );
559  }
560 
561  //------------------------------------------------------------------------
566  //------------------------------------------------------------------------
567  virtual void HandleResponseWithHosts( XRootDStatus *status,
568  AnyObject *response,
569  HostList *hostList )
570  {
571  ResponseHandler* hdlr = handler.exchange( nullptr );
572  if( hdlr )
573  hdlr->HandleResponseWithHosts( status, response, hostList );
574  }
575 
576  private:
577 
578  //------------------------------------------------------------------------
580  //------------------------------------------------------------------------
581  std::atomic<ResponseHandler*> handler;
582  };
583 
584  //----------------------------------------------------------------------------
590  //----------------------------------------------------------------------------
592  {
593  return new FinalizeHandler( handler );
594  }
595 
596 
597  //----------------------------------------------------------------------------
602  //----------------------------------------------------------------------------
603  template<typename Response>
604  struct RespBase
605  {
606  //------------------------------------------------------------------------
611  //------------------------------------------------------------------------
612  inline static ResponseHandler* Create( ResponseHandler *hdlr )
613  {
614  return make_finalized( hdlr );
615  }
616 
617  //------------------------------------------------------------------------
622  //------------------------------------------------------------------------
623  inline static ResponseHandler* Create( ResponseHandler &hdlr )
624  {
625  return make_finalized( &hdlr );
626  }
627 
628  //------------------------------------------------------------------------
633  //------------------------------------------------------------------------
634  inline static ResponseHandler* Create( std::future<Response> &ftr )
635  {
636  return make_finalized( new FutureWrapper<Response>( ftr ) );
637  }
638  };
639 
640  //----------------------------------------------------------------------------
645  //----------------------------------------------------------------------------
646  template<typename Response>
647  struct Resp: RespBase<Response>
648  {
649  //------------------------------------------------------------------------
654  //------------------------------------------------------------------------
655  inline static ResponseHandler* Create( std::function<void( XRootDStatus&,
656  Response& )> func )
657  {
658  return make_finalized( new FunctionWrapper<Response>( func ) );
659  }
660 
661  //------------------------------------------------------------------------
666  //------------------------------------------------------------------------
667  template<typename Return>
668  inline static ResponseHandler* Create( std::packaged_task<Return( XRootDStatus&,
669  Response& )> &task )
670  {
671  return make_finalized( new TaskWrapper<Response, Return>( std::move( task ) ) );
672  }
673 
674  //------------------------------------------------------------------------
676  //------------------------------------------------------------------------
678  };
679 
680  //----------------------------------------------------------------------------
684  //----------------------------------------------------------------------------
685  template<>
686  struct Resp<void>: RespBase<void>
687  {
688  //------------------------------------------------------------------------
693  //------------------------------------------------------------------------
694  inline static ResponseHandler* Create( std::function<void( XRootDStatus& )> func )
695  {
696  return make_finalized( new FunctionWrapper<void>( func ) );
697  }
698 
699  //------------------------------------------------------------------------
704  //------------------------------------------------------------------------
705  template<typename Return>
706  inline static ResponseHandler* Create( std::packaged_task<Return( XRootDStatus& )> &task )
707  {
708  return make_finalized( new TaskWrapper<void, Return>( std::move( task ) ) );
709  }
710 
711  //------------------------------------------------------------------------
713  //------------------------------------------------------------------------
715  };
716 }
717 
718 #endif // __XRD_CL_OPERATIONS_HANDLERS_HH__
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:296
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:215
TaskWrapper(std::packaged_task< Return(XRootDStatus &, Response &)> &&task)
Constructor.
Definition: XrdClOperationHandlers.hh:246
Definition: XrdClAnyObject.hh:32
static ResponseHandler * Create(ResponseHandler &hdlr)
Definition: XrdClOperationHandlers.hh:623
File & f
Definition: XrdClOperationHandlers.hh:349
const char * what() const noexcept
inherited from std::exception
Definition: XrdClOperationHandlers.hh:391
void Get(Type &object)
Retrieve the object being held.
Definition: XrdClAnyObject.hh:78
std::string ToString() const
Create a string representation.
Object stat info.
Definition: XrdClXRootDResponses.hh:395
Definition: XrdClOperationHandlers.hh:418
static ResponseHandler * Create(std::future< Response > &ftr)
Definition: XrdClOperationHandlers.hh:634
std::promise< Response > prms
promise that corresponds to the future
Definition: XrdClOperationHandlers.hh:449
Definition: XrdClOperationHandlers.hh:537
Definition: XrdClOperationHandlers.hh:112
std::function< void(XRootDStatus &, StatInfo &)> fun
user defined function, functor or lambda
Definition: XrdClOperationHandlers.hh:353
static ResponseHandler * Create(std::packaged_task< Return(XRootDStatus &, Response &)> &task)
Definition: XrdClOperationHandlers.hh:668
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:120
static Response value
Definition: XrdClOperationHandlers.hh:114
Definition: XrdClOperationHandlers.hh:197
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:51
PipelineException(const PipelineException &ex)
Copy constructor.
Definition: XrdClOperationHandlers.hh:374
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Definition: XrdClOperationHandlers.hh:567
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:137
FinalizeHandler(ResponseHandler *handler)
Definition: XrdClOperationHandlers.hh:546
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:254
Definition: XrdClOperationHandlers.hh:237
FunctionWrapper(std::function< void(XRootDStatus &)> handleFunction)
Constructor.
Definition: XrdClOperationHandlers.hh:206
Definition: XrdClOperationHandlers.hh:647
std::function< void(XRootDStatus &, Response &)> fun
user defined function, functor or lambda
Definition: XrdClOperationHandlers.hh:188
Definition: XrdClOperationHandlers.hh:279
TaskWrapper(std::packaged_task< Return(XRootDStatus &)> &&task)
Constructor.
Definition: XrdClOperationHandlers.hh:288
FutureWrapper(std::future< Response > &ftr)
Definition: XrdClOperationHandlers.hh:467
void SetException(const XRootDStatus &err)
Definition: XrdClOperationHandlers.hh:440
Pipeline exception, wrapps an XRootDStatus.
Definition: XrdClOperationHandlers.hh:359
A file.
Definition: XrdClFile.hh:44
std::vector< HostInfo > HostList
Definition: XrdClXRootDResponses.hh:969
Helper class for unpacking single XAttr from bulk response.
Definition: XrdClOperationHandlers.hh:76
Definition: XrdClOperationHandlers.hh:458
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
PipelineException & operator=(const PipelineException &ex)
Assigment operator.
Definition: XrdClOperationHandlers.hh:382
Request status.
Definition: XrdClXRootDResponses.hh:214
Definition: XrdClAnyObject.hh:25
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:175
static ResponseHandler * Create(std::function< void(XRootDStatus &, Response &)> func)
Definition: XrdClOperationHandlers.hh:655
std::packaged_task< Return(XRootDStatus &, Response &)> task
user defined task
Definition: XrdClOperationHandlers.hh:268
const uint16_t errPipelineFailed
Pipeline failed and operation couldn&#39;t be executed.
Definition: XrdClStatus.hh:65
std::packaged_task< Return(XRootDStatus &)> task
user defined task
Definition: XrdClOperationHandlers.hh:309
const XRootDStatus & GetError() const
Definition: XrdClOperationHandlers.hh:399
Definition: XrdClOperationHandlers.hh:604
virtual ~FinalizeHandler()
Destructor.
Definition: XrdClOperationHandlers.hh:554
Lambda wrapper.
Definition: XrdClOperationHandlers.hh:316
static ResponseHandler * Create(std::packaged_task< Return(XRootDStatus &)> &task)
Definition: XrdClOperationHandlers.hh:706
ExOpenFuncWrapper(File &f, std::function< void(XRootDStatus &, StatInfo &)> handleFunction)
Constructor.
Definition: XrdClOperationHandlers.hh:325
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:87
UnpackXAttrStatus(ResponseHandler *handler)
Definition: XrdClOperationHandlers.hh:44
Handle an async response.
Definition: XrdClXRootDResponses.hh:974
Helper class for unpacking single XAttrStatus from bulk response.
Definition: XrdClOperationHandlers.hh:40
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1004
ResponseHandler * handler
Definition: XrdClOperationHandlers.hh:103
std::atomic< ResponseHandler * > handler
The actual operation handler.
Definition: XrdClOperationHandlers.hh:581
FinalizeHandler * make_finalized(ResponseHandler *handler)
Definition: XrdClOperationHandlers.hh:591
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Definition: XrdClXRootDResponses.hh:988
Response * GetResponse(AnyObject *rsp)
Definition: XrdClOperationHandlers.hh:130
static ResponseHandler * Create(ResponseHandler *hdlr)
Definition: XrdClOperationHandlers.hh:612
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:55
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:515
XRootDStatus error
the XRootDStatus associated with this exception
Definition: XrdClOperationHandlers.hh:409
ResponseHandler * handler
Definition: XrdClOperationHandlers.hh:70
std::function< void(XRootDStatus &)> fun
user defined function, functor or lambda
Definition: XrdClOperationHandlers.hh:227
FunctionWrapper(std::function< void(XRootDStatus &, Response &)> handleFunction)
Constructor.
Definition: XrdClOperationHandlers.hh:166
FutureWrapperBase(std::future< Response > &ftr)
Definition: XrdClOperationHandlers.hh:428
Definition: XrdClOperationHandlers.hh:157
PipelineException(const XRootDStatus &error)
Constructor from XRootDStatus.
Definition: XrdClOperationHandlers.hh:366
UnpackXAttr(ResponseHandler *handler)
Definition: XrdClOperationHandlers.hh:80
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:334
FutureWrapper(std::future< void > &ftr)
Definition: XrdClOperationHandlers.hh:507
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback method.
Definition: XrdClOperationHandlers.hh:475
static ResponseHandler * Create(std::function< void(XRootDStatus &)> func)
Definition: XrdClOperationHandlers.hh:694