xrootd
XrdSysKernelBuffer.hh
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------
2 // Copyright (c) 2013 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //-----------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //-----------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDCL_XRDCLKERNELBUFFER_HH_
26 #define SRC_XRDCL_XRDCLKERNELBUFFER_HH_
27 
28 #include <fcntl.h>
29 #include <sys/uio.h>
30 #include <stdint.h>
31 #include <unistd.h>
32 #include <string.h>
33 
34 #include <vector>
35 #include <array>
36 #include <tuple>
37 
38 namespace XrdSys
39 {
40  //---------------------------------------------------------------------------
44  //---------------------------------------------------------------------------
46  {
47  friend ssize_t Read( int, KernelBuffer&, uint32_t, int64_t );
48 
49  friend ssize_t Read( int, KernelBuffer&, uint32_t );
50 
51  friend ssize_t Write( int, KernelBuffer&, int64_t );
52 
53  friend ssize_t Send( int, KernelBuffer& );
54 
55  friend ssize_t Move( KernelBuffer&, char*& );
56 
57  friend ssize_t Move( char*&, KernelBuffer&, size_t );
58 
59  public:
60 
61  //-----------------------------------------------------------------------
63  //-----------------------------------------------------------------------
64  KernelBuffer() : capacity( 0 ), size( 0 )
65  {
66  }
67 
68  //-----------------------------------------------------------------------
70  //-----------------------------------------------------------------------
71  KernelBuffer( const KernelBuffer& ) = delete;
72 
73  //-----------------------------------------------------------------------
74  // Move constructor
75  //-----------------------------------------------------------------------
76  KernelBuffer( KernelBuffer &&kbuff ) : capacity( kbuff.capacity ),
77  size( kbuff.size ),
78  pipes( std::move( kbuff.pipes ) )
79  {
80  capacity = 0;
81  size = 0;
82  }
83 
84  //-----------------------------------------------------------------------
86  //-----------------------------------------------------------------------
87  KernelBuffer& operator=( const KernelBuffer& ) = delete;
88 
89  //-----------------------------------------------------------------------
91  //-----------------------------------------------------------------------
93  {
94  capacity = kbuff.capacity;
95  size = kbuff.size;
96  pipes = std::move( kbuff.pipes );
97  return *this;
98  }
99 
100  //-----------------------------------------------------------------------
102  //-----------------------------------------------------------------------
104  {
105  if( capacity > 0 ) Free();
106  }
107 
108  //------------------------------------------------------------------------
110  //------------------------------------------------------------------------
111  inline bool Empty() const
112  {
113  return size == 0;
114  }
115 
116  //-----------------------------------------------------------------------
122  //-----------------------------------------------------------------------
123  inline static bool IsPageAligned( const void *ptr )
124  {
125  return ( ( uintptr_t ( ptr ) ) % PAGE_SZ ) == 0 ;
126  }
127 
128  private:
129 
130  //-----------------------------------------------------------------------
132  //-----------------------------------------------------------------------
133  inline void Free()
134  {
135  auto itr = pipes.begin();
136  for( ; itr != pipes.end() ; ++itr )
137  {
138  std::array<int, 2> &p = std::get<0>( *itr );
139  close( p[1] );
140  close( p[0] );
141  }
142 
143  pipes.clear();
144  capacity = 0;
145  size = 0;
146  }
147 
148  //-----------------------------------------------------------------------
150  //-----------------------------------------------------------------------
151  inline ssize_t Alloc( size_t size )
152  {
153 #ifndef F_SETPIPE_SZ
154  return -ENOTSUP;
155 #else
156  ssize_t ret = 0;
157 
158  std::array<int, 2> pipe_fd;
159  ret = pipe( pipe_fd.data() );
160  if( ret < 0 ) return ret;
161 
163  ret = fcntl( pipe_fd[0], F_SETPIPE_SZ, size );
164  if( ret < 0 ) return ret;
165 
166  capacity += ret;
167  pipes.emplace_back( pipe_fd, 0 );
168 
169  return ret;
170 #endif
171  }
172 
173  //-----------------------------------------------------------------------
182  //-----------------------------------------------------------------------
183 #ifndef SPLICE_F_MOVE
184  inline ssize_t ReadFromFD( int fd, uint32_t length, int64_t *offset )
185  {
186  return -ENOTSUP;
187  }
188 #else
189  inline ssize_t ReadFromFD( int fd, uint32_t length, loff_t *offset )
190  {
191  if( capacity > 0 ) Free();
192 
193  while( length > 0 )
194  {
195  ssize_t ret = Alloc( length );
196  if( ret < 0 ) return ret;
197  if( size_t( ret ) > length ) ret = length;
198  std::array<int, 2> &pipe_fd = std::get<0>( pipes.back() );
199  size_t &pipedata = std::get<1>( pipes.back() );
200  ret = splice( fd, offset, pipe_fd[1], NULL, ret, SPLICE_F_MOVE | SPLICE_F_MORE );
201  if( ret == 0 ) break; // we reached the end of the file
202  if( ret < 0 ) return -1;
203  pipedata += ret;
204 
205  length -= ret;
206  size += ret;
207  }
208  pipes_cursor = pipes.begin();
209 
210  return size;
211  }
212 #endif
213 
214  //-----------------------------------------------------------------------
222  //-----------------------------------------------------------------------
223 #ifndef SPLICE_F_MOVE
224  inline ssize_t WriteToFD( int fd, int64_t *offset )
225  {
226  return -ENOTSUP;
227  }
228 #else
229  inline ssize_t WriteToFD( int fd, loff_t *offset )
230  {
231  if( size == 0 ) return 0;
232 
233  ssize_t result = 0;
234 
235  auto itr = pipes_cursor;
236  while( itr != pipes.end() )
237  {
238  std::array<int, 2> &pipe_fd = std::get<0>( *itr );
239  size_t &pipedata = std::get<1>( *itr );
240 
241  int ret = splice( pipe_fd[0], NULL, fd, offset, size, SPLICE_F_MOVE | SPLICE_F_MORE );
242  if( ret == 0 ) break; // we reached the end of the file
243  if( ret < 0 ) return -1;
244 
245  size -= ret;
246  result += ret;
247  pipedata -= ret;
248 
249 
250  if( pipedata > 0 ) continue;
251 
252  ++pipes_cursor;
253  ++itr;
254  }
255 
256  Free();
257 
258  return result;
259  }
260 #endif
261 
262  //-----------------------------------------------------------------------
277  //-----------------------------------------------------------------------
278  inline ssize_t ToUser( char *&buffer )
279  {
280 #ifndef SPLICE_F_MOVE
281  return -ENOTSUP;
282 #else
283  if( size == 0 ) return 0;
284 
285  ssize_t result = 0;
286 
287  void *void_ptr = 0;
288  int ret = posix_memalign( &void_ptr, PAGE_SZ, size );
289  if( ret )
290  {
291  errno = ret;
292  return -1;
293  }
294  char *ptr = reinterpret_cast<char*>( void_ptr );
295 
296  auto itr = pipes_cursor;
297  while( itr != pipes.end() )
298  {
299  iovec iov[1];
300  size_t len = size > MAX_PIPE_SIZE ? MAX_PIPE_SIZE : size;
301  iov->iov_len = len;
302  iov->iov_base = ptr;
303 
304  std::array<int, 2> &pipe_fd = std::get<0>( *itr );
305  size_t &pipedata = std::get<1>( *itr );
306  int ret = vmsplice( pipe_fd[0], iov, 1, 0 ); // vmsplice man NOTE:
307  // vmsplice() really supports true splicing only from user memory to a
308  // pipe. In the opposite direction, it actually just copies the data to
309  // userspace. But this makes the interface nice and symmetric and
310  // enables people to build on vmsplice() with room for future
311  // improvement in performance.
312  if( ret < 0 ) // an error
313  {
314  delete[] buffer;
315  buffer = 0;
316  return ret;
317  }
318 
319  size -= ret;
320  ptr += ret;
321  result += ret;
322  pipedata -= ret;
323 
324  if( pipedata > 0 ) continue;
325 
326  ++itr;
327  ++pipes_cursor;
328  }
329 
330  Free();
331 
332  buffer = reinterpret_cast<char*>( void_ptr );
333  return result;
334 #endif
335  }
336 
337  //-----------------------------------------------------------------------
354  //-----------------------------------------------------------------------
355  inline ssize_t FromUser( char *&buffer, size_t length )
356  {
357 #ifndef SPLICE_F_MOVE
358  return -ENOTSUP;
359 #else
360  if( !IsPageAligned( buffer ) )
361  {
362  errno = EINVAL;
363  return -1;
364  }
365 
366  if( capacity > 0 ) Free();
367 
368  char *buff = buffer;
369  while( length > 0 )
370  {
371  ssize_t ret = Alloc( length );
372  if( ret < 0 ) return ret;
373  std::array<int, 2> &pipe_fd = std::get<0>( pipes.back() );
374  size_t &pipedata = std::get<1>( pipes.back() );
375 
376  iovec iov[1];
377  iov->iov_len = size_t( ret ) < length ? ret : length;
378  iov->iov_base = buff;
379  ret = vmsplice( pipe_fd[1], iov, 1, SPLICE_F_GIFT );
380 
381  if( ret < 0 ) return -1;
382  length -= ret;
383  size += ret;
384  buff += ret;
385  pipedata += ret;
386  }
387 
388  pipes_cursor = pipes.begin();
389  free( buffer );
390  buffer = 0;
391  return size;
392 #endif
393  }
394 
395  static const size_t PAGE_SZ = 4 * 1024; //< page size
396  static const size_t MAX_PIPE_SIZE = 1024 * 1024; //< maximum pipe size
397 
398  size_t capacity; //< the total capacity of all underlying pipes
399  size_t size; //< size of the data stored in this kernel buffer
400  std::vector<std::tuple<std::array<int,2>, size_t>> pipes; //< the unerlying pipes
401  std::vector<std::tuple<std::array<int,2>, size_t>>::iterator pipes_cursor;
402  };
403 
404  //---------------------------------------------------------------------------
409  //---------------------------------------------------------------------------
410  inline ssize_t Read( int fd, KernelBuffer &buffer, uint32_t length, int64_t offset )
411  {
412  return buffer.ReadFromFD( fd, length, &offset );
413  }
414 
415  //---------------------------------------------------------------------------
420  //---------------------------------------------------------------------------
421  inline ssize_t Read( int fd, KernelBuffer &buffer, uint32_t length )
422  {
423  return buffer.ReadFromFD( fd, length, NULL );
424  }
425 
426  //---------------------------------------------------------------------------
431  //---------------------------------------------------------------------------
432  inline ssize_t Write( int fd, KernelBuffer &buffer, int64_t offset )
433  {
434  return buffer.WriteToFD( fd, &offset );
435  }
436 
437  //---------------------------------------------------------------------------
441  //---------------------------------------------------------------------------
442  inline ssize_t Send( int fd, KernelBuffer &buffer )
443  {
444  return buffer.WriteToFD( fd, NULL );
445  }
446 
447  //---------------------------------------------------------------------------
451  //---------------------------------------------------------------------------
452  inline ssize_t Move( KernelBuffer &kbuff, char *&ubuff )
453  {
454  return kbuff.ToUser( ubuff );
455  }
456 
457  //---------------------------------------------------------------------------
461  //---------------------------------------------------------------------------
462  inline ssize_t Move( char *&ubuff, KernelBuffer &kbuff, size_t length )
463  {
464  return kbuff.FromUser( ubuff, length );
465  }
466 
467 }
468 
469 
470 #endif /* SRC_XRDCL_XRDCLKERNELBUFFER_HH_ */
ssize_t Send(int fd, KernelBuffer &buffer)
Definition: XrdSysKernelBuffer.hh:442
friend ssize_t Write(int, KernelBuffer &, int64_t)
Definition: XrdSysKernelBuffer.hh:432
size_t size
Definition: XrdSysKernelBuffer.hh:399
std::vector< std::tuple< std::array< int, 2 >, size_t > >::iterator pipes_cursor
Definition: XrdSysKernelBuffer.hh:401
void Free()
Closes the underlying pipes (kernel buffers)
Definition: XrdSysKernelBuffer.hh:133
size_t capacity
Definition: XrdSysKernelBuffer.hh:398
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Definition: XrdSysKernelBuffer.hh:452
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
Definition: XrdSysKernelBuffer.hh:410
KernelBuffer(KernelBuffer &&kbuff)
Definition: XrdSysKernelBuffer.hh:76
friend ssize_t Send(int, KernelBuffer &)
Definition: XrdSysKernelBuffer.hh:442
friend ssize_t Read(int, KernelBuffer &, uint32_t, int64_t)
Definition: XrdSysKernelBuffer.hh:410
KernelBuffer()
Default constructor.
Definition: XrdSysKernelBuffer.hh:64
ssize_t ReadFromFD(int fd, uint32_t length, int64_t *offset)
Definition: XrdSysKernelBuffer.hh:184
Definition: XrdOucIOVec.hh:65
friend ssize_t Move(KernelBuffer &, char *&)
Definition: XrdSysKernelBuffer.hh:452
ssize_t Alloc(size_t size)
Allocates another pipe (kernel buffer) of size up to 1MB.
Definition: XrdSysKernelBuffer.hh:151
~KernelBuffer()
Destructor.
Definition: XrdSysKernelBuffer.hh:103
bool Empty() const
Definition: XrdSysKernelBuffer.hh:111
static bool IsPageAligned(const void *ptr)
Definition: XrdSysKernelBuffer.hh:123
Definition: XrdSysKernelBuffer.hh:45
#define close(a)
Definition: XrdPosix.hh:43
KernelBuffer & operator=(KernelBuffer &&kbuff)
Move assignment operator.
Definition: XrdSysKernelBuffer.hh:92
ssize_t Write(int fd, KernelBuffer &buffer, int64_t offset)
Definition: XrdSysKernelBuffer.hh:432
static const size_t PAGE_SZ
Definition: XrdSysKernelBuffer.hh:395
Definition: XrdClPollerBuiltIn.hh:28
ssize_t ToUser(char *&buffer)
Definition: XrdSysKernelBuffer.hh:278
static const size_t MAX_PIPE_SIZE
Definition: XrdSysKernelBuffer.hh:396
KernelBuffer & operator=(const KernelBuffer &)=delete
Copy assignment operator - deleted.
ssize_t FromUser(char *&buffer, size_t length)
Definition: XrdSysKernelBuffer.hh:355
std::vector< std::tuple< std::array< int, 2 >, size_t > > pipes
Definition: XrdSysKernelBuffer.hh:400
ssize_t WriteToFD(int fd, int64_t *offset)
Definition: XrdSysKernelBuffer.hh:224