xrootd
XrdClZipCache.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
26 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
27 
29 #include <zlib.h>
30 #include <exception>
31 #include <string>
32 #include <vector>
33 #include <mutex>
34 #include <queue>
35 #include <tuple>
36 
37 namespace XrdCl
38 {
39  //---------------------------------------------------------------------------
41  //---------------------------------------------------------------------------
42  struct ZipError : public std::exception
43  {
45  {
46  }
47 
49  };
50 
51  //---------------------------------------------------------------------------
53  //---------------------------------------------------------------------------
54  class ZipCache
55  {
56  public:
57 
58  typedef std::vector<char> buffer_t;
59 
60  private:
61 
62  typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
63  typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
64 
66  {
67  inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
68  {
69  return std::get<1>( lhs ) > std::get<1>( rhs );
70  }
71  };
72 
73  typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
74 
75  public:
76 
77  struct ReadHandler : public ResponseHandler // TODO once we drop ZipArchiveReader this class can be removed
78  {
79  ReadHandler( uint64_t offset, uint32_t length, ZipCache &self ) : offset( offset ), buffer( length ), self( self )
80  {
81  }
82 
83  void HandleResponse( XRootDStatus *status, AnyObject *response )
84  {
85  self.QueueRsp( *status, offset, std::move( buffer ) );
86  delete status;
87  delete response;
88  delete this;
89  }
90 
91  uint64_t offset;
93  ZipCache &self;
94  };
95 
96  ZipCache() : inabsoff( 0 )
97  {
98  strm.zalloc = Z_NULL;
99  strm.zfree = Z_NULL;
100  strm.opaque = Z_NULL;
101  strm.avail_in = 0;
102  strm.next_in = Z_NULL;
103  strm.avail_out = 0;
104  strm.next_out = Z_NULL;
105 
106  // make sure zlib doesn't look for gzip headers, in order to do so
107  // pass negative window bits !!!
108  int rc = inflateInit2( &strm, -MAX_WBITS );
109  XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
110  if( !st.IsOK() ) throw ZipError( st );
111  }
112 
114  {
115  inflateEnd( &strm );
116  }
117 
118  inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
119  {
120  std::unique_lock<std::mutex> lck( mtx );
121  rdreqs.emplace( offset, length, buffer, handler );
122  Decompress();
123  }
124 
125  inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
126  {
127  std::unique_lock<std::mutex> lck( mtx );
128  rdrsps.emplace( st, offset, std::move( buffer ) );
129  Decompress();
130  }
131 
132  private:
133 
134  inline bool HasInput() const
135  {
136  return strm.avail_in != 0;
137  }
138 
139  inline bool HasOutput() const
140  {
141  return strm.avail_out != 0;
142  }
143 
144  inline void Input( const read_resp_t &rdrsp )
145  {
146  const buffer_t &buffer = std::get<2>( rdrsp );
147  strm.avail_in = buffer.size();
148  strm.next_in = (Bytef*)buffer.data();
149  }
150 
151  inline void Output( const read_args_t &rdreq )
152  {
153  strm.avail_out = std::get<1>( rdreq );
154  strm.next_out = (Bytef*)std::get<2>( rdreq );
155  }
156 
157  inline bool Consecutive( const read_resp_t &resp ) const
158  {
159  return ( std::get<1>( resp ) == inabsoff );
160  }
161 
162  void Decompress()
163  {
164  while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
165  {
166  if( !HasOutput() && !rdreqs.empty() )
167  Output( rdreqs.front() );
168 
169  if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) ) // the response might come out of order so we need to check the offset
170  Input( rdrsps.top() );
171 
172  if( !HasInput() || !HasOutput() ) return;
173 
174  // check the response status
175  XRootDStatus st = std::get<0>( rdrsps.top() );
176  if( !st.IsOK() ) return CallHandler( st );
177 
178  // the available space in output buffer before inflating
179  uInt avail_before = strm.avail_in;
180  // decompress the data
181  int rc = inflate( &strm, Z_SYNC_FLUSH );
182  st = ToXRootDStatus( rc, "inflate" );
183  if( !st.IsOK() ) return CallHandler( st ); // report error to user handler
184  // update the absolute input offset by the number of bytes we consumed
185  inabsoff += avail_before - strm.avail_in;
186 
187  if( !strm.avail_out ) // the output buffer is empty meaning a request has been fulfilled
189 
190  // the input buffer is empty meaning a response has been consumed
191  // (we need to check if there are any elements in the responses
192  // queue as the input buffer might have been set directly by the user)
193  if( !strm.avail_in && !rdrsps.empty() )
194  rdrsps.pop();
195  }
196  }
197 
198  static inline AnyObject* PkgRsp( ChunkInfo *chunk )
199  {
200  if( !chunk ) return nullptr;
201  AnyObject *rsp = new AnyObject();
202  rsp->Set( chunk );
203  return rsp;
204  }
205 
206  inline void CallHandler( const XRootDStatus &st )
207  {
208  if( rdreqs.empty() ) return;
209  read_args_t args = std::move( rdreqs.front() );
210  rdreqs.pop();
211 
212  ChunkInfo *chunk = nullptr;
213  if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
214  std::get<1>( args ),
215  std::get<2>( args ) );
216 
217  ResponseHandler *handler = std::get<3>( args );
218  handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
219  }
220 
221  XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
222  {
223  std::string msg = "[zlib] " + func + " : ";
224 
225  switch( rc )
226  {
227  case Z_STREAM_END :
228  case Z_OK : return XrdCl::XRootDStatus();
229  case Z_BUF_ERROR : return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suContinue );
230  case Z_MEM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_MEM_ERROR, msg + "not enough memory." );
231  case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_VERSION_ERROR, msg + "version mismatch." );
232  case Z_STREAM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR, msg + "invalid argument." );
233  case Z_NEED_DICT : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_NEED_DICT, msg + "need dict.");
234  case Z_DATA_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_DATA_ERROR, msg + "corrupted data." );
236  }
237  }
238 
239  z_stream strm; // the zlib stream we will use for reading
240 
241  std::mutex mtx;
242  uint64_t inabsoff; //< the absolute offset in the input file (compressed), ensures the user is actually streaming the data
243  std::queue<read_args_t> rdreqs; //< pending read requests (we only allow read requests to be submitted in order)
244  resp_queue_t rdrsps; //< pending read responses (due to multiple-streams the read response may come out of order)
245  };
246 
247 }
248 
249 #endif /* SRC_XRDZIP_XRDZIPINFLCACHE_HH_ */
XrdCl::XRootDStatus ToXRootDStatus(int rc, const std::string &func)
Definition: XrdClZipCache.hh:221
void Output(const read_args_t &rdreq)
Definition: XrdClZipCache.hh:151
Definition: XrdClAnyObject.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::priority_queue< read_resp_t, std::vector< read_resp_t >, greater_read_resp_t > resp_queue_t
Definition: XrdClZipCache.hh:73
std::queue< read_args_t > rdreqs
Definition: XrdClZipCache.hh:243
resp_queue_t rdrsps
Definition: XrdClZipCache.hh:244
const uint16_t errUnknown
Unknown error.
Definition: XrdClStatus.hh:50
bool HasInput() const
Definition: XrdClZipCache.hh:134
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:122
std::vector< char > buffer_t
Definition: XrdClZipCache.hh:58
z_stream strm
Definition: XrdClZipCache.hh:239
std::tuple< XRootDStatus, uint64_t, buffer_t > read_resp_t
Definition: XrdClZipCache.hh:63
void Decompress()
Definition: XrdClZipCache.hh:162
void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClZipCache.hh:83
uint64_t offset
Definition: XrdClZipCache.hh:91
ReadHandler(uint64_t offset, uint32_t length, ZipCache &self)
Definition: XrdClZipCache.hh:79
XrdCl::XRootDStatus status
Definition: XrdClZipCache.hh:48
ZipCache & self
Definition: XrdClZipCache.hh:93
void CallHandler(const XRootDStatus &st)
Definition: XrdClZipCache.hh:206
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:907
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
An exception for carrying the XRootDStatus of InflCache.
Definition: XrdClZipCache.hh:42
uint64_t inabsoff
Definition: XrdClZipCache.hh:242
Definition: XrdClZipCache.hh:77
const uint16_t suContinue
Definition: XrdClStatus.hh:39
bool HasOutput() const
Definition: XrdClZipCache.hh:139
void QueueRsp(const XRootDStatus &st, uint64_t offset, buffer_t &&buffer)
Definition: XrdClZipCache.hh:125
Handle an async response.
Definition: XrdClXRootDResponses.hh:1040
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1070
ZipCache()
Definition: XrdClZipCache.hh:96
void QueueReq(uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler)
Definition: XrdClZipCache.hh:118
bool Consecutive(const read_resp_t &resp) const
Definition: XrdClZipCache.hh:157
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ZipError(const XrdCl::XRootDStatus &status)
Definition: XrdClZipCache.hh:44
bool operator()(const read_resp_t &lhs, const read_resp_t &rhs) const
Definition: XrdClZipCache.hh:67
void Input(const read_resp_t &rdrsp)
Definition: XrdClZipCache.hh:144
std::mutex mtx
Definition: XrdClZipCache.hh:241
std::tuple< uint64_t, uint32_t, void *, ResponseHandler * > read_args_t
Definition: XrdClZipCache.hh:62
Definition: XrdClZipCache.hh:65
buffer_t buffer
Definition: XrdClZipCache.hh:92
~ZipCache()
Definition: XrdClZipCache.hh:113
Utility class for inflating a compressed buffer.
Definition: XrdClZipCache.hh:54
static AnyObject * PkgRsp(ChunkInfo *chunk)
Definition: XrdClZipCache.hh:198