xrootd
XrdEcUtilities.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECUTILITIES_HH_
26 #define SRC_XRDEC_XRDECUTILITIES_HH_
27 
28 #include "XrdEc/XrdEcObjCfg.hh"
29 
31 #include "XrdCl/XrdClFileSystem.hh"
32 
33 #include "XrdCl/XrdClUtils.hh"
34 
35 #include <exception>
36 #include <memory>
37 #include <random>
38 #include <queue>
39 #include <mutex>
40 #include <condition_variable>
41 
42 namespace XrdEc
43 {
44  //---------------------------------------------------------------------------
46  //---------------------------------------------------------------------------
47  struct stripe_t
48  {
49  //-------------------------------------------------------------------------
54  //-------------------------------------------------------------------------
55  stripe_t( char *buffer, bool valid ) : buffer( buffer ), valid( valid )
56  {
57  }
58 
59  char *buffer; //< buffer with stripe data
60  bool valid; //< true if data are valid, otherwise false
61  };
62 
63  //---------------------------------------------------------------------------
65  //---------------------------------------------------------------------------
66  typedef std::vector<stripe_t> stripes_t;
67 
68  //----------------------------------------------------------------------------
70  //----------------------------------------------------------------------------
71  typedef std::vector<char> buffer_t;
72 
73  //----------------------------------------------------------------------------
75  //----------------------------------------------------------------------------
76  class IOError : public std::exception
77  {
78  public:
79 
80  //------------------------------------------------------------------------
84  //------------------------------------------------------------------------
85  IOError( const XrdCl::XRootDStatus &st ) noexcept : st( st ), msg( st.ToString() )
86  {
87  }
88 
89  //------------------------------------------------------------------------
91  //------------------------------------------------------------------------
92  IOError( const IOError &err ) noexcept : st( err.st ), msg( err.st.ToString() )
93  {
94  }
95 
96  //------------------------------------------------------------------------
98  //------------------------------------------------------------------------
99  IOError& operator=( const IOError &err ) noexcept
100  {
101  st = err.st;
102  msg = err.st.ToString();
103  return *this;
104  }
105 
106  //------------------------------------------------------------------------
108  //------------------------------------------------------------------------
109  virtual ~IOError()
110  {
111  }
112 
113  //------------------------------------------------------------------------
115  //------------------------------------------------------------------------
116  virtual const char* what() const noexcept
117  {
118  return msg.c_str();
119  }
120 
121  //------------------------------------------------------------------------
123  //------------------------------------------------------------------------
125  {
126  return st;
127  }
128 
129  enum
130  {
132  };
133 
134  private:
135 
136  //------------------------------------------------------------------------
138  //------------------------------------------------------------------------
140 
141  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
144  std::string msg;
145  };
146 
147  //---------------------------------------------------------------------------
154  //---------------------------------------------------------------------------
155  void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler );
156 
157  //---------------------------------------------------------------------------
162  //---------------------------------------------------------------------------
164 
165 
166  //---------------------------------------------------------------------------
167  // A class implementing synchronous queue
168  //---------------------------------------------------------------------------
169  template<typename Element>
170  struct sync_queue
171  {
172  //-------------------------------------------------------------------------
173  // An internal exception used for interrupting the `dequeue` method
174  //-------------------------------------------------------------------------
175  struct wait_interrupted{ };
176 
177  //-------------------------------------------------------------------------
178  // Default constructor
179  //-------------------------------------------------------------------------
180  sync_queue() : interrupted( false )
181  {
182  }
183 
184  //-------------------------------------------------------------------------
185  // Enqueue new element into the queue
186  //-------------------------------------------------------------------------
187  inline void enqueue( Element && element )
188  {
189  std::unique_lock<std::mutex> lck( mtx );
190  elements.push( std::move( element ) );
191  cv.notify_all();
192  }
193 
194  //-------------------------------------------------------------------------
195  // Dequeue an element from the front of the queue
196  // Note: if the queue is empty blocks until a new element is enqueued
197  //-------------------------------------------------------------------------
198  inline Element dequeue()
199  {
200  std::unique_lock<std::mutex> lck( mtx );
201  while( elements.empty() )
202  {
203  cv.wait( lck );
204  if( interrupted ) throw wait_interrupted();
205  }
206  Element element = std::move( elements.front() );
207  elements.pop();
208  return std::move( element );
209  }
210 
211  //-------------------------------------------------------------------------
212  // Dequeue an element from the front of the queue
213  // Note: if the queue is empty returns false, true otherwise
214  //-------------------------------------------------------------------------
215  inline bool dequeue( Element &e )
216  {
217  std::unique_lock<std::mutex> lck( mtx );
218  if( elements.empty() ) return false;
219  e = std::move( elements.front() );
220  elements.pop();
221  return true;
222  }
223 
224  //-------------------------------------------------------------------------
225  // Checks if the queue is empty
226  //-------------------------------------------------------------------------
227  bool empty()
228  {
229  std::unique_lock<std::mutex> lck( mtx );
230  return elements.empty();
231  }
232 
233  //-------------------------------------------------------------------------
234  // Interrupt all waiting `dequeue` routines
235  //-------------------------------------------------------------------------
236  inline void interrupt()
237  {
238  interrupted = true;
239  cv.notify_all();
240  }
241 
242  private:
243  std::queue<Element> elements; //< the queue itself
244  std::mutex mtx; //< mutex guarding the queue
245  std::condition_variable cv;
246  std::atomic<bool> interrupted; //< a flag, true if all `dequeue` routines
247  //< should be interrupted
248  };
249 }
250 
251 #endif /* SRC_XRDEC_XRDECUTILITIES_HH_ */
std::mutex mtx
Definition: XrdEcUtilities.hh:244
virtual ~IOError()
Destructor.
Definition: XrdEcUtilities.hh:109
stripe_t(char *buffer, bool valid)
Definition: XrdEcUtilities.hh:55
XrdCl::XRootDStatus st
The status object.
Definition: XrdEcUtilities.hh:139
A buffer with stripe data and info on validity.
Definition: XrdEcUtilities.hh:47
Generic I/O exception, wraps up XrdCl::XRootDStatus (.
Definition: XrdEcUtilities.hh:76
std::string ToString() const
Create a string representation.
const XrdCl::XRootDStatus & Status() const
Definition: XrdEcUtilities.hh:124
std::atomic< bool > interrupted
Definition: XrdEcUtilities.hh:246
bool dequeue(Element &e)
Definition: XrdEcUtilities.hh:215
virtual const char * what() const noexcept
overloaded
Definition: XrdEcUtilities.hh:116
char * buffer
Definition: XrdEcUtilities.hh:59
bool empty()
Definition: XrdEcUtilities.hh:227
std::condition_variable cv
Definition: XrdEcUtilities.hh:245
std::queue< Element > elements
Definition: XrdEcUtilities.hh:243
Definition: XrdEcUtilities.hh:175
IOError(const XrdCl::XRootDStatus &st) noexcept
Definition: XrdEcUtilities.hh:85
IOError(const IOError &err) noexcept
Copy constructor.
Definition: XrdEcUtilities.hh:92
void enqueue(Element &&element)
Definition: XrdEcUtilities.hh:187
Element dequeue()
Definition: XrdEcUtilities.hh:198
sync_queue()
Definition: XrdEcUtilities.hh:180
bool valid
Definition: XrdEcUtilities.hh:60
Request status.
Definition: XrdClXRootDResponses.hh:218
IOError & operator=(const IOError &err) noexcept
Assigment operator.
Definition: XrdEcUtilities.hh:99
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
Definition: XrdEcUtilities.hh:170
std::vector< char > buffer_t
a buffer type
Definition: XrdEcReader.hh:44
Handle an async response.
Definition: XrdClXRootDResponses.hh:1040
Definition: XrdEcUtilities.hh:131
std::string msg
The error message.
Definition: XrdEcUtilities.hh:144
std::vector< stripe_t > stripes_t
All stripes in a block.
Definition: XrdEcUtilities.hh:66
void interrupt()
Definition: XrdEcUtilities.hh:236
Definition: XrdClZipArchive.hh:45