XRootD
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27 
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClMessage.hh"
32 #include "XProtocol/XProtocol.hh"
33 #include "XrdCl/XrdClLog.hh"
34 #include "XrdCl/XrdClConstants.hh"
35 
41 
42 #include "XrdSys/XrdSysPthread.hh"
43 #include "XrdSys/XrdSysPageSize.hh"
45 #include "XrdSys/XrdSysPlatform.hh"
46 
48 
49 #include <sys/uio.h>
50 #include <arpa/inet.h> // for network unmarshaling stuff
51 
52 #include <array>
53 #include <list>
54 #include <memory>
55 #include <atomic>
56 #include <memory>
57 
58 namespace XrdCl
59 {
60  class PostMaster;
61  class SIDManager;
62  class URL;
63  class LocalFileHandler;
64  class Socket;
65 
66  //----------------------------------------------------------------------------
67  // Single entry in the redirect-trace-back
68  //----------------------------------------------------------------------------
70  {
71  enum Type
72  {
76  EntryWait
77  };
78 
79  RedirectEntry( const URL &from, const URL &to, Type type ) :
80  from( from ), to( to ), type( type )
81  {
82 
83  }
84 
89 
90  std::string ToString( bool prevok = true )
91  {
92  const std::string tostr = to.GetLocation();
93  const std::string fromstr = from.GetLocation();
94 
95  if( prevok )
96  {
97  switch( type )
98  {
99  case EntryRedirect: return "Redirected from: " + fromstr + " to: "
100  + tostr;
101 
102  case EntryRedirectOnWait: return "Server responded with wait. "
103  "Falling back to virtual redirector: " + tostr;
104 
105  case EntryRetry: return "Retrying: " + tostr;
106 
107  case EntryWait: return "Waited at server request. Resending: "
108  + tostr;
109  }
110  }
111  return "Failed at: " + fromstr + ", retrying at: " + tostr;
112  }
113  };
114 
115  //----------------------------------------------------------------------------
117  //----------------------------------------------------------------------------
119  {
120  friend class HandleRspJob;
121 
122  public:
123  //------------------------------------------------------------------------
132  //------------------------------------------------------------------------
134  ResponseHandler *respHandler,
135  const URL *url,
136  std::shared_ptr<SIDManager> sidMgr,
137  LocalFileHandler *lFileHandler):
138  pRequest( msg ),
139  pResponseHandler( respHandler ),
140  pUrl( *url ),
141  pEffectiveDataServerUrl( 0 ),
142  pSidMgr( sidMgr ),
143  pLFileHandler( lFileHandler ),
144  pExpiration( 0 ),
145  pRedirectAsAnswer( false ),
146  pOksofarAsAnswer( false ),
147  pHasLoadBalancer( false ),
148  pHasSessionId( false ),
149  pChunkList( 0 ),
150  pKBuff( 0 ),
151  pRedirectCounter( 0 ),
152  pNotAuthorizedCounter( 0 ),
153 
154  pAsyncOffset( 0 ),
155  pAsyncChunkIndex( 0 ),
156 
157  pPgWrtCksumBuff( 4 ),
158  pPgWrtCurrentPageOffset( 0 ),
159  pPgWrtCurrentPageNb( 0 ),
160 
161  pOtherRawStarted( false ),
162 
163  pFollowMetalink( false ),
164 
165  pStateful( false ),
166 
167  pAggregatedWaitTime( 0 ),
168 
169  pMsgInFly( false ),
170 
171  pTimeoutFence( false ),
172 
173  pDirListStarted( false ),
174  pDirListWithStat( false ),
175 
176  pCV( 0 ),
177 
178  pSslErrCnt( 0 )
179  {
180  pPostMaster = DefaultEnv::GetPostMaster();
181  if( msg->GetSessionId() )
182  pHasSessionId = true;
183 
184  Log *log = DefaultEnv::GetLog();
185  log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
186  pUrl.GetHostId().c_str(), this,
187  pRequest->GetDescription().c_str() );
188 
189  ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
190  if( ntohs( hdr->requestid ) == kXR_pgread )
191  {
192  ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
193  pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
194  ntohl( pgrdreq->rlen ) ) );
195  }
196 
197  if( ntohs( hdr->requestid ) == kXR_readv )
198  pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
199  else if( ntohs( hdr->requestid ) == kXR_read )
200  pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
201  else
202  pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
203  }
204 
205  //------------------------------------------------------------------------
207  //------------------------------------------------------------------------
209  {
210  DumpRedirectTraceBack();
211 
212  if( !pHasSessionId )
213  delete pRequest;
214  delete pEffectiveDataServerUrl;
215 
216  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
217  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
218  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
219  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
220  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
221  pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
222 
223  Log *log = DefaultEnv::GetLog();
224  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
225  pUrl.GetHostId().c_str(), this );
226  }
227 
228  //------------------------------------------------------------------------
234  //------------------------------------------------------------------------
235  virtual uint16_t Examine( std::shared_ptr<Message> &msg );
236 
237  //------------------------------------------------------------------------
246  //------------------------------------------------------------------------
247  virtual uint16_t InspectStatusRsp();
248 
249  //------------------------------------------------------------------------
253  //------------------------------------------------------------------------
254  virtual uint16_t GetSid() const;
255 
256  //------------------------------------------------------------------------
260  //------------------------------------------------------------------------
261  virtual void Process();
262 
263  //------------------------------------------------------------------------
273  //------------------------------------------------------------------------
274  virtual XRootDStatus ReadMessageBody( Message *msg,
275  Socket *socket,
276  uint32_t &bytesRead );
277 
278  //------------------------------------------------------------------------
283  //------------------------------------------------------------------------
284  virtual uint8_t OnStreamEvent( StreamEvent event,
285  XRootDStatus status );
286 
287  //------------------------------------------------------------------------
289  //------------------------------------------------------------------------
290  virtual void OnStatusReady( const Message *message,
291  XRootDStatus status );
292 
293  //------------------------------------------------------------------------
295  //------------------------------------------------------------------------
296  virtual bool IsRaw() const;
297 
298  //------------------------------------------------------------------------
307  //------------------------------------------------------------------------
309  uint32_t &bytesWritten );
310 
311  //------------------------------------------------------------------------
315  //------------------------------------------------------------------------
316  void WaitDone( time_t now );
317 
318  //------------------------------------------------------------------------
320  //------------------------------------------------------------------------
321  void SetExpiration( time_t expiration )
322  {
323  pExpiration = expiration;
324  }
325 
326  //------------------------------------------------------------------------
328  //------------------------------------------------------------------------
329  time_t GetExpiration()
330  {
331  return pExpiration;
332  }
333 
334  //------------------------------------------------------------------------
337  //------------------------------------------------------------------------
338  void SetRedirectAsAnswer( bool redirectAsAnswer )
339  {
340  pRedirectAsAnswer = redirectAsAnswer;
341  }
342 
343  //------------------------------------------------------------------------
346  //------------------------------------------------------------------------
347  void SetOksofarAsAnswer( bool oksofarAsAnswer )
348  {
349  pOksofarAsAnswer = oksofarAsAnswer;
350  }
351 
352  //------------------------------------------------------------------------
354  //------------------------------------------------------------------------
355  const Message *GetRequest() const
356  {
357  return pRequest;
358  }
359 
360  //------------------------------------------------------------------------
362  //------------------------------------------------------------------------
363  void SetLoadBalancer( const HostInfo &loadBalancer )
364  {
365  if( !loadBalancer.url.IsValid() )
366  return;
367  pLoadBalancer = loadBalancer;
368  pHasLoadBalancer = true;
369  }
370 
371  //------------------------------------------------------------------------
373  //------------------------------------------------------------------------
374  void SetHostList( HostList *hostList )
375  {
376  pHosts.reset( hostList );
377  }
378 
379  //------------------------------------------------------------------------
381  //------------------------------------------------------------------------
382  void SetChunkList( ChunkList *chunkList )
383  {
384  pChunkList = chunkList;
385  if( pBodyReader )
386  pBodyReader->SetChunkList( chunkList );
387  if( chunkList )
388  pChunkStatus.resize( chunkList->size() );
389  else
390  pChunkStatus.clear();
391  }
392 
393  void SetCrc32cDigests( std::vector<uint32_t> && crc32cDigests )
394  {
395  pCrc32cDigests = std::move( crc32cDigests );
396  }
397 
398  //------------------------------------------------------------------------
400  //------------------------------------------------------------------------
402  {
403  pKBuff = kbuff;
404  }
405 
406  //------------------------------------------------------------------------
408  //------------------------------------------------------------------------
409  void SetRedirectCounter( uint16_t redirectCounter )
410  {
411  pRedirectCounter = redirectCounter;
412  }
413 
414  void SetFollowMetalink( bool followMetalink )
415  {
416  pFollowMetalink = followMetalink;
417  }
418 
419  void SetStateful( bool stateful )
420  {
421  pStateful = stateful;
422  }
423 
424  //------------------------------------------------------------------------
428  //------------------------------------------------------------------------
429  void PartialReceived();
430 
431  private:
432 
433  //------------------------------------------------------------------------
435  //------------------------------------------------------------------------
436  void HandleError( XRootDStatus status );
437 
438  //------------------------------------------------------------------------
440  //------------------------------------------------------------------------
441  Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
442 
443  //------------------------------------------------------------------------
445  //------------------------------------------------------------------------
446  void HandleResponse();
447 
448  //------------------------------------------------------------------------
450  //------------------------------------------------------------------------
451  XRootDStatus *ProcessStatus();
452 
453  //------------------------------------------------------------------------
456  //------------------------------------------------------------------------
457  Status ParseResponse( AnyObject *&response );
458 
459  //------------------------------------------------------------------------
462  //------------------------------------------------------------------------
463  Status ParseXAttrResponse( char *data, size_t len, AnyObject *&response );
464 
465  //------------------------------------------------------------------------
468  //------------------------------------------------------------------------
469  Status RewriteRequestRedirect( const URL &newUrl );
470 
471  //------------------------------------------------------------------------
473  //------------------------------------------------------------------------
474  Status RewriteRequestWait();
475 
476  //------------------------------------------------------------------------
478  //------------------------------------------------------------------------
479  void UpdateTriedCGI(uint32_t errNo=0);
480 
481  //------------------------------------------------------------------------
483  //------------------------------------------------------------------------
484  void SwitchOnRefreshFlag();
485 
486  //------------------------------------------------------------------------
489  //------------------------------------------------------------------------
490  void HandleRspOrQueue();
491 
492  //------------------------------------------------------------------------
494  //------------------------------------------------------------------------
495  void HandleLocalRedirect( URL *url );
496 
497  //------------------------------------------------------------------------
502  //------------------------------------------------------------------------
503  bool IsRetriable();
504 
505  //------------------------------------------------------------------------
512  //------------------------------------------------------------------------
513  bool OmitWait( Message &request, const URL &url );
514 
515  //------------------------------------------------------------------------
521  //------------------------------------------------------------------------
522  bool RetriableErrorResponse( const Status &status );
523 
524  //------------------------------------------------------------------------
526  //------------------------------------------------------------------------
527  void DumpRedirectTraceBack();
528 
535  //------------------------------------------------------------------------
536  template<typename T>
537  Status ReadFromBuffer( char *&buffer, size_t &buflen, T& result );
538 
539  //------------------------------------------------------------------------
546  //------------------------------------------------------------------------
547  Status ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result );
548 
549  //------------------------------------------------------------------------
557  //------------------------------------------------------------------------
558  Status ReadFromBuffer( char *&buffer, size_t &buflen, size_t size,
559  std::string &result );
560 
561  //------------------------------------------------------------------------
562  // Helper struct for async reading of chunks
563  //------------------------------------------------------------------------
564  struct ChunkStatus
565  {
566  ChunkStatus(): sizeError( false ), done( false ) {}
567  bool sizeError;
568  bool done;
569  };
570 
571  typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
572 
573  static const size_t CksumSize = sizeof( uint32_t );
574  static const size_t PageWithCksum = XrdSys::PageSize + CksumSize;
575  static const size_t MaxSslErrRetry = 3;
576 
577  inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
578  {
579  uint32_t pgcnt = 0;
580  uint32_t remainder = offset % XrdSys::PageSize;
581  if( remainder > 0 )
582  {
583  // account for the first unaligned page
584  ++pgcnt;
585  // the size of the 1st unaligned page
586  uint32_t _1stpg = XrdSys::PageSize - remainder;
587  if( _1stpg + CksumSize > dlen )
588  _1stpg = dlen - CksumSize;
589  dlen -= _1stpg + CksumSize;
590  }
591  pgcnt += dlen / PageWithCksum;
592  if( dlen % PageWithCksum )
593  ++ pgcnt;
594  return pgcnt;
595  }
596 
597  Message *pRequest;
598  std::shared_ptr<Message> pResponse; //< the ownership is shared with MsgReader
599  std::vector<std::shared_ptr<Message>> pPartialResps; //< the ownership is shared with MsgReader
600  ResponseHandler *pResponseHandler;
601  URL pUrl;
602  URL *pEffectiveDataServerUrl;
603  PostMaster *pPostMaster;
604  std::shared_ptr<SIDManager> pSidMgr;
605  LocalFileHandler *pLFileHandler;
606  XRootDStatus pStatus;
607  Status pLastError;
608  time_t pExpiration;
609  bool pRedirectAsAnswer;
610  bool pOksofarAsAnswer;
611  std::unique_ptr<HostList> pHosts;
612  bool pHasLoadBalancer;
613  HostInfo pLoadBalancer;
614  bool pHasSessionId;
615  std::string pRedirectUrl;
616  ChunkList *pChunkList;
617  std::vector<uint32_t> pCrc32cDigests;
618  XrdSys::KernelBuffer *pKBuff;
619  std::vector<ChunkStatus> pChunkStatus;
620  uint16_t pRedirectCounter;
621  uint16_t pNotAuthorizedCounter;
622 
623  uint32_t pAsyncOffset;
624  uint32_t pAsyncChunkIndex;
625 
626  std::unique_ptr<AsyncPageReader> pPageReader;
627  std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
628 
629  Buffer pPgWrtCksumBuff;
630  uint32_t pPgWrtCurrentPageOffset;
631  uint32_t pPgWrtCurrentPageNb;
632 
633  bool pOtherRawStarted;
634 
635  bool pFollowMetalink;
636 
637  bool pStateful;
638  int pAggregatedWaitTime;
639 
640  std::unique_ptr<RedirectEntry> pRdirEntry;
641  RedirectTraceBack pRedirectTraceBack;
642 
643  bool pMsgInFly;
644 
645  //------------------------------------------------------------------------
646  // true if MsgHandler is both in inQueue and installed in respective
647  // Stream (this could happen if server gave oksofar response), otherwise
648  // false
649  //------------------------------------------------------------------------
650  std::atomic<bool> pTimeoutFence;
651 
652  //------------------------------------------------------------------------
653  // if we are serving chunked data to the user's handler in case of
654  // kXR_dirlist we need to memorize if the response contains stat info or
655  // not (the information is only encoded in the first chunk)
656  //------------------------------------------------------------------------
657  bool pDirListStarted;
658  bool pDirListWithStat;
659 
660  //------------------------------------------------------------------------
661  // synchronization is needed in case the MsgHandler has been configured
662  // to serve kXR_oksofar as a response to the user's handler
663  //------------------------------------------------------------------------
664  XrdSysCondVar pCV;
665 
666  //------------------------------------------------------------------------
667  // Count of consecutive `errTlsSslError` errors
668  //------------------------------------------------------------------------
669  size_t pSslErrCnt;
670  };
671 }
672 
673 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_pgread
Definition: XProtocol.hh:142
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:95
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
A network socket.
Definition: XrdClSocket.hh:43
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:330
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:438
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
void SetFollowMetalink(bool followMetalink)
const Message * GetRequest() const
Get the request pointer.
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
void SetStateful(bool stateful)
virtual bool IsRaw() const
Are we a raw writer or not?
time_t GetExpiration()
Get a timestamp after which we give up.
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual void Process()
Process the message if it was "taken" by the examine action.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
const uint64_t ExDbgMsg
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
URL url
URL of the host.
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.
Definition: XrdClStatus.hh:115