xrootd
XrdClXCpSrc.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDCL_XRDCLXCPSRC_HH_
26 #define SRC_XRDCL_XRDCLXCPSRC_HH_
27 
28 #include "XrdCl/XrdClFile.hh"
29 #include "XrdCl/XrdClSyncQueue.hh"
30 #include "XrdSys/XrdSysPthread.hh"
31 
32 namespace XrdCl
33 {
34 
35 class XCpCtx;
36 
37 class XCpSrc
38 {
39  friend class ChunkHandler;
40 
41  public:
42 
53  XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx );
54 
58  void Start();
59 
63  void Stop()
64  {
65  pRunning = false;
66  }
67 
71  void Delete()
72  {
73  XrdSysMutexHelper lck( pMtx );
74  --pRefCount;
75  if( !pRefCount )
76  {
77  lck.UnLock();
78  delete this;
79  }
80  }
81 
88  {
89  XrdSysMutexHelper lck( pMtx );
90  ++pRefCount;
91  return this;
92  }
93 
97  bool IsRunning()
98  {
99  return pRunning;
100  }
101 
106  bool HasData()
107  {
108  XrdSysMutexHelper lck( pMtx );
109  return pCurrentOffset < pBlkEnd || !pRecovered.empty() || !pOngoing.empty();
110  }
111 
112 
113 
119  uint64_t TransferRate();
120 
126  static void DeleteChunk( ChunkInfo *&chunk )
127  {
128  if( chunk )
129  {
130  delete[] static_cast<char*>( chunk->buffer );
131  delete chunk;
132  chunk = 0;
133  }
134  }
135 
136  private:
137 
143  virtual ~XCpSrc();
144 
148  static void* Run( void* arg );
149 
154  void StartDownloading();
155 
167 
176 
190 
205  void Steal( XCpSrc *src );
206 
216 
225  void ReportResponse( XRootDStatus *status, ChunkInfo *chunk, File *handle );
226 
230  template<typename T>
231  static void DeletePtr( T *&obj )
232  {
233  delete obj;
234  obj = 0;
235  }
236 
243  static bool FilesEqual( File *f1, File *f2 )
244  {
245  if( !f1 || !f2 ) return false;
246 
247  const std::string lastURL = "LastURL";
248  std::string url1, url2;
249 
250  f1->GetProperty( lastURL, url1 );
251  f2->GetProperty( lastURL, url2 );
252 
253  // remove cgi information
254  size_t pos = url1.find( '?' );
255  if( pos != std::string::npos )
256  url1 = url1.substr( 0 , pos );
257  pos = url2.find( '?' );
258  if( pos != std::string::npos )
259  url2 = url2.substr( 0 , pos );
260 
261  return url1 == url2;
262  }
263 
267  uint32_t pChunkSize;
268 
272  uint8_t pParallel;
273 
277  int64_t pFileSize;
278 
282  pthread_t pThread;
283 
288 
292  std::string pUrl;
293 
298 
299  std::map<File*, uint8_t> pFailed;
300 
304  uint64_t pCurrentOffset;
305 
309  uint64_t pBlkEnd;
310 
314  uint64_t pDataTransfered;
315 
320  std::map<uint64_t, uint64_t> pOngoing;
321 
326  std::map<uint64_t, uint64_t> pRecovered;
327 
335 
340 
344  size_t pRefCount;
345 
351  bool pRunning;
352 
356  time_t pStartTime;
357 
363 };
364 
365 } /* namespace XrdCl */
366 
367 #endif /* SRC_XRDCL_XRDCLXCPSRC_HH_ */
bool HasData()
Definition: XrdClXCpSrc.hh:106
XrdSysRecMutex pMtx
Definition: XrdClXCpSrc.hh:339
Definition: XrdSysPthread.hh:241
virtual ~XCpSrc()
XRootDStatus Initialize()
uint8_t pParallel
Definition: XrdClXCpSrc.hh:272
void Steal(XCpSrc *src)
bool GetProperty(const std::string &name, std::string &value) const
static void * Run(void *arg)
uint64_t TransferRate()
Definition: XrdClXCpSrc.hh:37
time_t pTransferTime
Definition: XrdClXCpSrc.hh:362
std::string pUrl
Definition: XrdClXCpSrc.hh:292
time_t pStartTime
Definition: XrdClXCpSrc.hh:356
void UnLock()
Definition: XrdSysPthread.hh:274
static void DeleteChunk(ChunkInfo *&chunk)
Definition: XrdClXCpSrc.hh:126
A synchronized queue.
Definition: XrdClSyncQueue.hh:32
uint64_t pDataTransfered
Definition: XrdClXCpSrc.hh:314
static void DeletePtr(T *&obj)
Definition: XrdClXCpSrc.hh:231
SyncQueue< XRootDStatus * > pReports
Definition: XrdClXCpSrc.hh:334
friend class ChunkHandler
Definition: XrdClXCpSrc.hh:39
void * buffer
length of the chunk
Definition: XrdClXRootDResponses.hh:917
std::map< uint64_t, uint64_t > pOngoing
Definition: XrdClXCpSrc.hh:320
A file.
Definition: XrdClFile.hh:45
uint64_t pBlkEnd
Definition: XrdClXCpSrc.hh:309
File * pFile
Definition: XrdClXCpSrc.hh:297
Definition: XrdClXCpCtx.hh:40
XCpSrc * Self()
Definition: XrdClXCpSrc.hh:87
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:907
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
std::map< File *, uint8_t > pFailed
Definition: XrdClXCpSrc.hh:299
void Stop()
Definition: XrdClXCpSrc.hh:63
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
XRootDStatus Recover()
void ReportResponse(XRootDStatus *status, ChunkInfo *chunk, File *handle)
bool IsRunning()
Definition: XrdClXCpSrc.hh:97
std::map< uint64_t, uint64_t > pRecovered
Definition: XrdClXCpSrc.hh:326
int64_t pFileSize
Definition: XrdClXCpSrc.hh:277
uint32_t pChunkSize
Definition: XrdClXCpSrc.hh:267
size_t pRefCount
Definition: XrdClXCpSrc.hh:344
void StartDownloading()
uint64_t pCurrentOffset
Definition: XrdClXCpSrc.hh:304
bool pRunning
Definition: XrdClXCpSrc.hh:351
XCpCtx * pCtx
Definition: XrdClXCpSrc.hh:287
pthread_t pThread
Definition: XrdClXCpSrc.hh:282
XRootDStatus GetWork()
Definition: XrdSysPthread.hh:262
void Delete()
Definition: XrdClXCpSrc.hh:71
XRootDStatus ReadChunks()
static bool FilesEqual(File *f1, File *f2)
Definition: XrdClXCpSrc.hh:243