xrootd
XrdOssCsiFileAio.hh
Go to the documentation of this file.
1 #ifndef _XRDOSSCSIFILEAIO_H
2 #define _XRDOSSCSIFILEAIO_H
3 /******************************************************************************/
4 /* */
5 /* X r d O s s C s i F i l e A i o . h h */
6 /* */
7 /* (C) Copyright 2021 CERN. */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* In applying this licence, CERN does not waive the privileges and */
17 /* immunities granted to it by virtue of its status as an Intergovernmental */
18 /* Organization or submit itself to any jurisdiction. */
19 /* */
20 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
21 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
22 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
23 /* License for more details. */
24 /* */
25 /* You should have received a copy of the GNU Lesser General Public License */
26 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
27 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
28 /* */
29 /* The copyright holder's institutional names and contributor's names may not */
30 /* be used to endorse or promote products derived from this software without */
31 /* specific prior written permission of the institution or contributor. */
32 /******************************************************************************/
33 
34 #include "Xrd/XrdScheduler.hh"
35 #include "XrdSfs/XrdSfsAio.hh"
36 #include "XrdOssCsi.hh"
37 #include "XrdSys/XrdSysPageSize.hh"
38 
39 #include <mutex>
40 #include <thread>
41 
43 {
44 public:
45 
47  virtual ~XrdOssCsiFileAioJob() { }
48 
49  void Init(XrdOssCsiFile *fp, XrdOssCsiFileAio *nio, XrdSfsAio *aiop, bool isPg, bool read)
50  {
51  fp_ = fp;
52  nio_ = nio;
53  aiop_ = aiop;
54  pg_ = isPg;
55  read_ = read;
57  }
58 
60  {
62  }
63 
64  void PrepareRead2()
65  {
67  }
68 
69  void DoIt() /* override */
70  {
71  switch(jobtype_)
72  {
73  case JobReadStep1:
74  // take rangelock, then submit aio read
75  DoItRead1();
76  break;
77 
78  case JobReadStep2:
79  // fetch any extra bytes then verify/fetch csvec
80  DoItRead2();
81  break;
82 
83  case JobWriteStep1:
84  // lock byte range, update/store csvec and queue aio write
85  DoItWrite1();
86  break;
87 
88  case JobWriteStep2:
89  // check return from aio write, write any extra
90  DoItWrite2();
91  break;
92  }
93  }
94 
95  void DoItRead1();
96  void DoItRead2();
97  void DoItWrite1();
98  void DoItWrite2();
99 
100 private:
104  bool pg_;
105  bool read_;
107 };
108 
110 {
112 public:
113 
115  uint64_t pgOpts_;
116 
117  virtual void doneRead() /* override */
118  {
119  parentaio_->Result = this->Result;
120  // schedule the result check and verify/fetchrange
121  SchedReadJob2();
122  }
123 
124  virtual void doneWrite() /* override */
125  {
126  parentaio_->Result = this->Result;
127  // schedule the result check and write any extra
128  SchedWriteJob2();
129  }
130 
131  virtual void Recycle()
132  {
133  rg_.ReleaseAll();
134  parentaio_ = NULL;
135  XrdOssCsiFile *f = file_;
136  file_ = NULL;
137  if (store_)
138  {
139  std::lock_guard<std::mutex> guard(store_->mtx_);
140  next_ = store_->list_;
141  store_->list_ = this;
142  }
143  else
144  {
145  delete this;
146  }
147  if (f)
148  {
149  f->aioDec();
150  }
151  }
152 
153  void Init(XrdSfsAio *aiop, XrdOssCsiFile *file, bool isPgOp, uint64_t opts, bool isread)
154  {
155  parentaio_ = aiop;
156  this->sfsAio.aio_fildes = aiop->sfsAio.aio_fildes;
157  this->sfsAio.aio_buf = aiop->sfsAio.aio_buf;
158  this->sfsAio.aio_nbytes = aiop->sfsAio.aio_nbytes;
159  this->sfsAio.aio_offset = aiop->sfsAio.aio_offset;
160  this->sfsAio.aio_reqprio = aiop->sfsAio.aio_reqprio;
161  this->cksVec = aiop->cksVec;
162  this->TIdent = aiop->TIdent;
163  file_ = file;
164  isPgOp_ = isPgOp;
165  pgOpts_ = opts;
167  job_.Init(file, this, aiop, isPgOp, isread);
168  file_->aioInc();
169  }
170 
172  {
173  XrdOssCsiFileAio *p=NULL;
174  if (store)
175  {
176  std::lock_guard<std::mutex> guard(store->mtx_);
177  if ((p = store->list_)) store->list_ = p->next_;
178  }
179  if (!p) p = new XrdOssCsiFileAio(store);
180  return p;
181  }
182 
184  {
186  Sched_->Schedule((XrdJob *)&job_);
187  }
188 
190  {
191  Sched_->Schedule((XrdJob *)&job_);
192  }
193 
195  {
196  job_.PrepareRead2();
197  Sched_->Schedule((XrdJob *)&job_);
198  }
199 
201  {
202  Sched_->Schedule((XrdJob *)&job_);
203  }
204 
207 
208 private:
212  bool isPgOp_;
216 };
217 
219 {
220  // this job runs after async Read
221  // range was already locked read-only before the read
222 
223  if (aiop_->Result<0 || nio_->sfsAio.aio_nbytes==0)
224  {
225  aiop_->doneRead();
226  nio_->Recycle();
227  return;
228  }
229 
230  // if this is a pg operation and this was a short read, try to complete,
231  // otherwise caller will have to deal with joining csvec values from repeated reads
232 
233  ssize_t toread = nio_->sfsAio.aio_nbytes - nio_->Result;
234  ssize_t nread = nio_->Result;
235 
236  if (!pg_)
237  {
238  // not a pg operation, no need to read more
239  toread = 0;
240  }
241  char *p = (char*)nio_->sfsAio.aio_buf;
242  while(toread>0)
243  {
244  const ssize_t rret = fp_->successor_->Read(&p[nread], nio_->sfsAio.aio_offset+nread, toread);
245  if (rret == 0) break;
246  if (rret<0)
247  {
248  aiop_->Result = rret;
249  aiop_->doneRead();
250  nio_->Recycle();
251  return;
252  }
253  toread -= rret;
254  nread += rret;
255  }
256  aiop_->Result = nread;
257 
258  ssize_t puret;
259  if (pg_)
260  {
261  puret = fp_->Pages()->FetchRange(fp_->successor_,
262  (void *)nio_->sfsAio.aio_buf,
263  (off_t)nio_->sfsAio.aio_offset,
264  (size_t)nio_->Result,
265  (uint32_t*)nio_->cksVec,
266  nio_->pgOpts_,
267  nio_->rg_);
268  }
269  else
270  {
271  puret = fp_->Pages()->VerifyRange(fp_->successor_,
272  (void *)nio_->sfsAio.aio_buf,
273  (off_t)nio_->sfsAio.aio_offset,
274  (size_t)nio_->Result,
275  nio_->rg_);
276  }
277  if (puret<0)
278  {
279  aiop_->Result = puret;
280  }
281  aiop_->doneRead();
282  nio_->Recycle();
283 }
284 
286 {
287  // this job takes rangelock and then queues aio read
288 
289  // lock range
291  (off_t)(aiop_->sfsAio.aio_offset+aiop_->sfsAio.aio_nbytes), true);
292 
293  const int ret = fp_->successor_->Read(nio_);
294  if (ret<0)
295  {
296  aiop_->Result = ret;
297  aiop_->doneRead();
298  nio_->Recycle();
299  return;
300  }
301 }
302 
304 {
305  // this job runs before async Write
306 
307  // lock range
309  (off_t)(aiop_->sfsAio.aio_offset+aiop_->sfsAio.aio_nbytes), false);
310  int puret;
311  if (pg_) {
312  puret = fp_->Pages()->StoreRange(fp_->successor_,
313  (const void *)aiop_->sfsAio.aio_buf, (off_t)aiop_->sfsAio.aio_offset,
314  (size_t)aiop_->sfsAio.aio_nbytes, (uint32_t*)aiop_->cksVec, nio_->pgOpts_, nio_->rg_);
315 
316  }
317  else
318  {
319  puret = fp_->Pages()->UpdateRange(fp_->successor_,
320  (const void *)aiop_->sfsAio.aio_buf, (off_t)aiop_->sfsAio.aio_offset,
321  (size_t)aiop_->sfsAio.aio_nbytes, nio_->rg_);
322  }
323  if (puret<0)
324  {
325  nio_->rg_.ReleaseAll();
326  fp_->resyncSizes();
327  aiop_->Result = puret;
328  aiop_->doneWrite();
329  nio_->Recycle();
330  return;
331  }
332 
333  const int ret = fp_->successor_->Write(nio_);
334  if (ret<0)
335  {
336  nio_->rg_.ReleaseAll();
337  fp_->resyncSizes();
338  aiop_->Result = ret;
339  aiop_->doneWrite();
340  nio_->Recycle();
341  return;
342  }
343 }
344 
346 {
347  // this job runs after the async Write
348 
349  if (aiop_->Result<0)
350  {
351  nio_->rg_.ReleaseAll();
352  fp_->resyncSizes();
353  aiop_->doneWrite();
354  nio_->Recycle();
355  return;
356  }
357 
358  // in case there was a short write during the async write, finish
359  // writing the data now, otherwise the crc values will be inconsistent
360  ssize_t towrite = nio_->sfsAio.aio_nbytes - nio_->Result;
361  ssize_t nwritten = nio_->Result;
362  const char *p = (const char*)nio_->sfsAio.aio_buf;
363  while(towrite>0)
364  {
365  const ssize_t wret = fp_->successor_->Write(&p[nwritten], nio_->sfsAio.aio_offset+nwritten, towrite);
366  if (wret<0)
367  {
368  aiop_->Result = wret;
369  nio_->rg_.ReleaseAll();
370  fp_->resyncSizes();
371  aiop_->doneWrite();
372  nio_->Recycle();
373  return;
374  }
375  towrite -= wret;
376  nwritten += wret;
377  }
378  aiop_->Result = nwritten;
379  aiop_->doneWrite();
380  nio_->Recycle();
381 }
382 
383 #endif
bool pg_
Definition: XrdOssCsiFileAio.hh:104
void SchedReadJob2()
Definition: XrdOssCsiFileAio.hh:194
XrdOssCsiRangeGuard rg_
Definition: XrdOssCsiFileAio.hh:114
XrdOssCsiFileAio * list_
Definition: XrdOssCsi.hh:53
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void aioDec()
Definition: XrdOssCsi.hh:120
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
const char * TIdent
Definition: XrdSfsAio.hh:67
bool read_
Definition: XrdOssCsiFileAio.hh:105
void LockTrackinglen(XrdOssCsiRangeGuard &, off_t, off_t, bool)
virtual void doneRead()=0
Definition: XrdOssCsi.hh:46
virtual void doneWrite()=0
XrdSfsAio * parentaio_
Definition: XrdOssCsiFileAio.hh:210
void SchedWriteJob2()
Definition: XrdOssCsiFileAio.hh:183
Definition: XrdOssCsiFileAio.hh:109
void DoItRead2()
Definition: XrdOssCsiFileAio.hh:218
Definition: XrdScheduler.hh:44
XrdOssDF * successor_
Definition: XrdOssHandler.hh:81
virtual ~XrdOssCsiFileAioJob()
Definition: XrdOssCsiFileAio.hh:47
XrdOssCsiFileAioJob job_
Definition: XrdOssCsiFileAio.hh:213
uint64_t pgOpts_
Definition: XrdOssCsiFileAio.hh:115
virtual void doneWrite()
Definition: XrdOssCsiFileAio.hh:124
int FetchRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
std::mutex mtx_
Definition: XrdOssCsi.hh:52
void PrepareRead2()
Definition: XrdOssCsiFileAio.hh:64
void aioInc()
Definition: XrdOssCsi.hh:111
XrdOssCsiFileAioJob()
Definition: XrdOssCsiFileAio.hh:46
XrdOssCsiPages * Pages()
Definition: XrdOssCsi.hh:140
XrdSfsAio * aiop_
Definition: XrdOssCsiFileAio.hh:103
XrdScheduler * Sched_
Definition: XrdOssCsiFileAio.hh:214
Definition: XrdOssCsi.hh:73
virtual void Recycle()
Definition: XrdOssCsiFileAio.hh:131
Definition: XrdOssCsiRanges.hh:57
Definition: XrdOssCsiFileAio.hh:106
int aio_reqprio
Definition: XrdSfsAio.hh:50
void Init(XrdSfsAio *aiop, XrdOssCsiFile *file, bool isPgOp, uint64_t opts, bool isread)
Definition: XrdOssCsiFileAio.hh:153
Definition: XrdOssCsiFileAio.hh:42
static XrdScheduler * Sched_
Definition: XrdOssCsi.hh:216
XrdOssCsiFile * fp_
Definition: XrdOssCsiFileAio.hh:101
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:285
void DoItWrite1()
Definition: XrdOssCsiFileAio.hh:303
Definition: XrdOssCsiFileAio.hh:106
Definition: XrdOssCsiFileAio.hh:106
XrdOssCsiFile * file_
Definition: XrdOssCsiFileAio.hh:211
int StoreRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
int UpdateRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
void SchedReadJob()
Definition: XrdOssCsiFileAio.hh:200
~XrdOssCsiFileAio()
Definition: XrdOssCsiFileAio.hh:206
Definition: XrdOssCsiFileAio.hh:106
void SchedWriteJob()
Definition: XrdOssCsiFileAio.hh:189
off_t aio_offset
Definition: XrdSfsAio.hh:49
void PrepareWrite2()
Definition: XrdOssCsiFileAio.hh:59
void DoItRead1()
Definition: XrdOssCsiFileAio.hh:285
void Init(XrdOssCsiFile *fp, XrdOssCsiFileAio *nio, XrdSfsAio *aiop, bool isPg, bool read)
Definition: XrdOssCsiFileAio.hh:49
int VerifyRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
#define read(a, b, c)
Definition: XrdPosix.hh:77
Definition: XrdSfsAio.hh:58
void DoIt()
Definition: XrdOssCsiFileAio.hh:69
static XrdOssCsiFileAio * Alloc(XrdOssCsiFileAioStore *store)
Definition: XrdOssCsiFileAio.hh:171
enum XrdOssCsiFileAioJob::@96 jobtype_
ssize_t Result
Definition: XrdSfsAio.hh:65
bool isPgOp_
Definition: XrdOssCsiFileAio.hh:212
void DoItWrite2()
Definition: XrdOssCsiFileAio.hh:345
XrdOssCsiFileAio(XrdOssCsiFileAioStore *store)
Definition: XrdOssCsiFileAio.hh:205
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:349
void * aio_buf
Definition: XrdSfsAio.hh:47
XrdOssCsiFileAio * next_
Definition: XrdOssCsiFileAio.hh:215
int aio_fildes
Definition: XrdSfsAio.hh:46
virtual void doneRead()
Definition: XrdOssCsiFileAio.hh:117
void Schedule(XrdJob *jp)
Definition: XrdJob.hh:42
XrdOssCsiFileAio * nio_
Definition: XrdOssCsiFileAio.hh:102
XrdOssCsiFileAioStore * store_
Definition: XrdOssCsiFileAio.hh:209