XRootD
XrdPfcIOFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <cstdio>
20 #include <fcntl.h>
21 
22 #include "XrdSys/XrdSysError.hh"
24 #include "XrdSys/XrdSysPthread.hh"
25 
26 #include "XrdPfcIOFile.hh"
27 #include "XrdPfcStats.hh"
28 #include "XrdPfcTrace.hh"
29 
30 #include "XrdOuc/XrdOucEnv.hh"
32 
33 using namespace XrdPfc;
34 
35 //______________________________________________________________________________
37  IO(io, cache),
38  m_file(0),
39  m_localStat(0)
40 {
41  m_file = Cache::GetInstance().GetFile(GetFilename(), this);
42 }
43 
44 //______________________________________________________________________________
46 {
47  // called from Detach() if no sync is needed or
48  // from Cache's sync thread
49  TRACEIO(Debug, "~IOFile() " << this);
50 
51  delete m_localStat;
52 }
53 
54 //______________________________________________________________________________
55 int IOFile::Fstat(struct stat &sbuff)
56 {
57  int res = 0;
58  if( ! m_localStat)
59  {
60  res = initCachedStat();
61  if (res) return res;
62  }
63 
64  memcpy(&sbuff, m_localStat, sizeof(struct stat));
65  return 0;
66 }
67 
68 //______________________________________________________________________________
69 long long IOFile::FSize()
70 {
71  return m_file->GetFileSize();
72 }
73 
74 //______________________________________________________________________________
75 int IOFile::initCachedStat()
76 {
77  // Called indirectly from the constructor.
78 
79  static const char* trace_pfx = "initCachedStat ";
80 
81  int res = -1;
82  struct stat tmpStat;
83 
84  std::string fname = GetFilename();
85  std::string iname = fname + Info::s_infoExtension;
86  if (m_cache.GetOss()->Stat(fname.c_str(), &tmpStat) == XrdOssOK)
87  {
88  XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str());
89  XrdOucEnv myEnv;
90  int res_open;
91  if ((res_open = infoFile->Open(iname.c_str(), O_RDONLY, 0600, myEnv)) == XrdOssOK)
92  {
93  Info info(m_cache.GetTrace());
94  if (info.Read(infoFile, iname.c_str()))
95  {
96  // The filesize from the file itself may be misleading if its download is incomplete; take it from the cinfo.
97  tmpStat.st_size = info.GetFileSize();
98  // We are arguably abusing the mtime to be the creation time of the file; then ctime becomes the
99  // last time additional data was cached.
100  tmpStat.st_mtime = info.GetCreationTime();
101  TRACEIO(Info, trace_pfx << "successfully read size " << tmpStat.st_size << " and creation time " << tmpStat.st_mtime << " from info file");
102  res = 0;
103  }
104  else
105  {
106  // file exist but can't read it
107  TRACEIO(Info, trace_pfx << "info file is incomplete or corrupt");
108  }
109  }
110  else
111  {
112  TRACEIO(Error, trace_pfx << "can't open info file " << XrdSysE2T(-res_open));
113  }
114  infoFile->Close();
115  delete infoFile;
116  }
117 
118  if (res)
119  {
120  res = GetInput()->Fstat(tmpStat);
121  TRACEIO(Debug, trace_pfx << "got stat from client res = " << res << ", size = " << tmpStat.st_size);
122  // The mtime / atime / ctime for cached responses come from the file on disk in the cache hit case.
123  // To avoid weirdness when two subsequent stat queries can give wildly divergent times (one from the
124  // origin, one from the cache), set the times to "now" so we effectively only report the *time as the
125  // cache service sees it.
126  tmpStat.st_ctime = tmpStat.st_mtime = tmpStat.st_atime = time(NULL);
127  }
128 
129  if (res == 0)
130  {
131  m_localStat = new struct stat;
132  memcpy(m_localStat, &tmpStat, sizeof(struct stat));
133  }
134  return res;
135 }
136 
137 //______________________________________________________________________________
139 {
140  IO::Update(iocp);
141  m_file->ioUpdated(this);
142 }
143 
144 //______________________________________________________________________________
146 {
147  RefreshLocation();
148  return m_file->ioActive(this);
149 }
150 
151 //______________________________________________________________________________
153 {
154  // Effectively a destructor.
155 
156  TRACE(Info, "DetachFinalize() " << this);
157 
158  m_file->RequestSyncOfDetachStats();
159  Cache::GetInstance().ReleaseFile(m_file, this);
160 
161  delete this;
162 }
163 
164 
165 //==============================================================================
166 // Read and pgRead - sync / async and helpers
167 //==============================================================================
168 
169 //______________________________________________________________________________
170 int IOFile::Read(char *buff, long long off, int size)
171 {
173 
174  auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
175 
176  TRACEIO(Dump, "Read() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
177 
178  rh->m_cond.Lock();
179  int retval = ReadBegin(buff, off, size, rh);
180  if (retval == -EWOULDBLOCK)
181  {
182  rh->m_cond.Wait();
183  retval = rh->m_retval;
184  }
185  rh->m_cond.UnLock();
186 
187  return ReadEnd(retval, rh);
188 }
189 
190 //______________________________________________________________________________
191 void IOFile::Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size)
192 {
193  struct ZHandler : ReadReqRH
194  { using ReadReqRH::ReadReqRH;
195  IOFile *m_io = nullptr;
196 
197  void Done(int result) override {
198  m_io->ReadEnd(result, this);
199  }
200  };
201 
203 
204  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
205  rh->m_io = this;
206 
207  TRACEIO(Dump, "Read() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
208 
209  int retval = ReadBegin(buff, off, size, rh);
210  if (retval != -EWOULDBLOCK)
211  {
212  rh->Done(retval);
213  }
214 }
215 
216 //______________________________________________________________________________
217 void IOFile::pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size,
218  std::vector<uint32_t> &csvec, uint64_t opts, int *csfix)
219 {
220  struct ZHandler : ReadReqRH
221  { using ReadReqRH::ReadReqRH;
222  IOFile *m_io = nullptr;
223  std::function<void (int)> m_lambda {0};
224 
225  void Done(int result) override {
226  if (m_lambda) m_lambda(result);
227  m_io->ReadEnd(result, this);
228  }
229  };
230 
232 
233  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
234  rh->m_io = this;
235 
236  TRACEIO(Dump, "pgRead() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
237 
239  rh->m_lambda = [=, &csvec](int result) {
240  if (result > 0)
241  XrdOucPgrwUtils::csCalc((const char *)buff, (ssize_t)off, (size_t)result, csvec);
242  };
243 
244  int retval = ReadBegin(buff, off, size, rh);
245  if (retval != -EWOULDBLOCK)
246  {
247  rh->Done(retval);
248  }
249 }
250 
251 //______________________________________________________________________________
252 int IOFile::ReadBegin(char *buff, long long off, int size, ReadReqRH *rh)
253 {
254  // protect from reads over the file size
255  if (off >= FSize()) {
256  size = 0;
257  return 0;
258  }
259  if (off < 0) {
260  return -EINVAL;
261  }
262  if (off + size > FSize()) {
263  size = FSize() - off;
264  }
265  rh->m_expected_size = size;
266 
267  return m_file->Read(this, buff, off, size, rh);
268 }
269 
270 //______________________________________________________________________________
271 int IOFile::ReadEnd(int retval, ReadReqRH *rh)
272 {
273  TRACEIO(Dump, "ReadEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " retval: " << retval << " expected_size: " << rh->m_expected_size);
274 
275  if (retval < 0) {
276  TRACEIO(Warning, "ReadEnd() error in File::Read(), exit status=" << retval << ", error=" << XrdSysE2T(-retval) << " sid: " << Xrd::hex1 << rh->m_seq_id);
277  } else if (retval < rh->m_expected_size) {
278  TRACEIO(Warning, "ReadEnd() bytes missed " << rh->m_expected_size - retval << " sid: " << Xrd::hex1 << rh->m_seq_id);
279  }
280  if (rh->m_iocb)
281  rh->m_iocb->Done(retval);
282 
283  delete rh;
284 
286 
287  return retval;
288 }
289 
290 
291 //==============================================================================
292 // ReadV
293 //==============================================================================
294 
295 //______________________________________________________________________________
296 int IOFile::ReadV(const XrdOucIOVec *readV, int n)
297 {
299 
300  auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
301 
302  TRACEIO(Dump, "ReadV() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
303 
304  rh->m_cond.Lock();
305  int retval = ReadVBegin(readV, n, rh);
306  if (retval == -EWOULDBLOCK)
307  {
308  rh->m_cond.Wait();
309  retval = rh->m_retval;
310  }
311  rh->m_cond.UnLock();
312  return ReadVEnd(retval, rh);
313 }
314 
315 //______________________________________________________________________________
316 void IOFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n)
317 {
318  struct ZHandler : ReadReqRH
319  { using ReadReqRH::ReadReqRH;
320  IOFile *m_io = nullptr;
321 
322  void Done(int result) override { m_io-> ReadVEnd(result, this); }
323  };
324 
326 
327  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
328  rh->m_io = this;
329 
330  TRACEIO(Dump, "ReadV() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
331 
332  int retval = ReadVBegin(readV, n, rh);
333  if (retval != -EWOULDBLOCK)
334  {
335  rh->Done(retval);
336  }
337 }
338 
339 //______________________________________________________________________________
340 int IOFile::ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh)
341 {
342  long long file_size = FSize();
343  for (int i = 0; i < n; ++i)
344  {
345  const XrdOucIOVec &vr = readV[i];
346  if (vr.offset < 0 || vr.offset >= file_size ||
347  vr.offset + vr.size > file_size)
348  {
349  return -EINVAL;
350  }
351  rh->m_expected_size += vr.size;
352  }
353  rh->m_n_chunks = n;
354 
355  return m_file->ReadV(this, readV, n, rh);
356 }
357 
358 //______________________________________________________________________________
359 int IOFile::ReadVEnd(int retval, ReadReqRH *rh)
360 {
361  TRACEIO(Dump, "ReadVEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id <<
362  " retval: " << retval << " n_chunks: " << rh->m_n_chunks << " expected_size: " << rh->m_expected_size);
363 
364  if (retval < 0) {
365  TRACEIO(Warning, "ReadVEnd() error in File::ReadV(), exit status=" << retval << ", error=" << XrdSysE2T(-retval));
366  } else if (retval < rh->m_expected_size) {
367  TRACEIO(Warning, "ReadVEnd() bytes missed " << rh->m_expected_size - retval);
368  }
369  if (rh->m_iocb)
370  rh->m_iocb->Done(retval);
371 
372  delete rh;
373 
375 
376  return retval;
377 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define TRACEIO(act, x)
Definition: XrdPfcTrace.hh:58
int stat(const char *path, struct stat *buf)
bool Debug
struct myOpts opts
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:99
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
static const uint64_t forceCS
Definition: XrdOucCache.hh:188
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:411
XrdOss * GetOss() const
Definition: XrdPfc.hh:385
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:398
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:159
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:492
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:674
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:265
long long GetFileSize()
Definition: XrdPfcFile.hh:279
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:641
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:179
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:188
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:40
void Update(XrdOucCacheIO &iocp) override
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
int Read(char *buff, long long off, int size) override
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
Definition: XrdPfcIOFile.cc:55
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
int ReadV(const XrdOucIOVec *readV, int n) override
Pass ReadV request to the corresponding File object.
long long FSize() override
Definition: XrdPfcIOFile.cc:69
IOFile(XrdOucCacheIO *io, Cache &cache)
Definition: XrdPfcIOFile.cc:36
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
std::string GetFilename()
Definition: XrdPfcIO.hh:56
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
Cache & m_cache
reference to Cache object
Definition: XrdPfcIO.hh:52
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:72
const char * RefreshLocation()
Definition: XrdPfcIO.hh:57
void Update(XrdOucCacheIO &iocp) override
Definition: XrdPfcIO.cc:16
unsigned short ObtainReadSid()
Definition: XrdPfcIO.hh:59
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
Definition: XrdPfc.hh:41
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:65
unsigned short m_seq_id
Definition: XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:67