xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdEcReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECREADER_HH_
26 #define SRC_XRDEC_XRDECREADER_HH_
27 
28 #include "XrdEc/XrdEcObjCfg.hh"
29 
30 #include "XrdCl/XrdClZipArchive.hh"
31 #include "XrdCl/XrdClOperations.hh"
32 
33 #include <string>
34 #include <unordered_map>
35 #include <unordered_set>
36 
37 class MicroTest;
38 
39 namespace XrdEc
40 {
41  //---------------------------------------------------------------------------
42  // Forward declaration for the internal cache
43  //---------------------------------------------------------------------------
44  struct block_t;
45  //---------------------------------------------------------------------------
46  // Buffer for a single chunk of data
47  //---------------------------------------------------------------------------
48  typedef std::vector<char> buffer_t;
49  //---------------------------------------------------------------------------
50  // Read callback, to be called with status and number of bytes read
51  //---------------------------------------------------------------------------
52  typedef std::function<void( const XrdCl::XRootDStatus&, uint32_t )> callback_t;
53 
54  //---------------------------------------------------------------------------
55  // Reader object for reading erasure coded and striped data
56  //---------------------------------------------------------------------------
57  class Reader
58  {
59  friend class ::MicroTest;
60  friend struct block_t;
61 
62  public:
63  //-----------------------------------------------------------------------
68  //-----------------------------------------------------------------------
69  Reader( ObjCfg &objcfg ) : objcfg( objcfg ), lstblk( 0 ), filesize( 0 )
70  {
71  }
72 
73  //-----------------------------------------------------------------------
74  // Destructor
75  //-----------------------------------------------------------------------
76  virtual ~Reader();
77 
78  //-----------------------------------------------------------------------
82  //-----------------------------------------------------------------------
83  void Open( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
84 
85  //-----------------------------------------------------------------------
92  //-----------------------------------------------------------------------
93  void Read( uint64_t offset,
94  uint32_t length,
95  void *buffer,
96  XrdCl::ResponseHandler *handler,
97  uint16_t timeout );
98 
99  /*
100  * Read multiple locations and lengths of data
101  * internally remapped to stripes
102  *
103  * @param chunks : list of offsets, lengths and separate buffers
104  * @param buffer : optional full buffer for all data
105  */
106  void VectorRead( const XrdCl::ChunkList &chunks,
107  void *buffer,
108  XrdCl::ResponseHandler *handler,
109  uint16_t timeout);
110 
111  //-----------------------------------------------------------------------
113  //-----------------------------------------------------------------------
114  void Close( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
115 
116  //-----------------------------------------------------------------------
118  //-----------------------------------------------------------------------
119  inline uint64_t GetSize()
120  {
121  return filesize;
122  }
123 
124  private:
125 
126  //-----------------------------------------------------------------------
134  //-----------------------------------------------------------------------
135  void Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb, uint16_t timeout = 0 );
136 
137  //-----------------------------------------------------------------------
141  //-----------------------------------------------------------------------
142  XrdCl::Pipeline ReadMetadata( size_t index );
143 
144  //-----------------------------------------------------------------------
148  //-----------------------------------------------------------------------
149  XrdCl::Pipeline ReadSize( size_t index );
150 
151  //-----------------------------------------------------------------------
155  //-----------------------------------------------------------------------
156  bool ParseMetadata( XrdCl::ChunkInfo &ch );
157 
158  //-----------------------------------------------------------------------
162  //-----------------------------------------------------------------------
163  void AddMissing( const buffer_t &cdbuff );
164 
165  //-----------------------------------------------------------------------
167  //-----------------------------------------------------------------------
168  bool IsMissing( const std::string &fn );
169 
170  inline static callback_t ErrorCorrected(Reader *reader, std::shared_ptr<block_t> &self, size_t blkid, size_t strpid);
171 
172  void MissingVectorRead(std::shared_ptr<block_t> &block, size_t blkid, size_t strpid, uint16_t timeout = 0);
173 
174  typedef std::unordered_map<std::string, std::shared_ptr<XrdCl::ZipArchive>> dataarchs_t;
175  typedef std::unordered_map<std::string, buffer_t> metadata_t;
176  typedef std::unordered_map<std::string, std::string> urlmap_t;
177  typedef std::unordered_set<std::string> missing_t;
178 
180  dataarchs_t dataarchs; //> map URL to ZipArchive object
181  metadata_t metadata; //> map URL to CD metadata
182  urlmap_t urlmap; //> map blknb/strpnb (data chunk) to URL
183  missing_t missing; //> set of missing stripes
184  std::shared_ptr<block_t> block; //> cache for the block we are reading from
185  std::mutex blkmtx; //> mutex guarding the block from parallel access
186  size_t lstblk; //> last block number
187  uint64_t filesize; //> file size (obtained from xattr)
188  std::map<std::string, size_t> archiveIndices;
189 
190  std::mutex missingChunksMutex;
191  std::vector<std::tuple<size_t, size_t>> missingChunksVectorRead;
192  std::condition_variable waitMissing;
193  };
194 
195 } /* namespace XrdEc */
196 
197 #endif /* SRC_XRDEC_XRDECREADER_HH_ */
std::shared_ptr< block_t > block
Definition: XrdEcReader.hh:184
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:1055
std::condition_variable waitMissing
Definition: XrdEcReader.hh:192
bool IsMissing(const std::string &fn)
Check if chunk file name is missing.
ObjCfg & objcfg
Definition: XrdEcReader.hh:179
std::vector< std::tuple< size_t, size_t > > missingChunksVectorRead
Definition: XrdEcReader.hh:191
std::mutex missingChunksMutex
Definition: XrdEcReader.hh:190
metadata_t metadata
Definition: XrdEcReader.hh:181
urlmap_t urlmap
Definition: XrdEcReader.hh:182
std::mutex blkmtx
Definition: XrdEcReader.hh:185
std::map< std::string, size_t > archiveIndices
Definition: XrdEcReader.hh:188
std::unordered_map< std::string, std::string > urlmap_t
Definition: XrdEcReader.hh:176
uint64_t filesize
Definition: XrdEcReader.hh:187
XrdCl::Pipeline ReadSize(size_t index)
void Read(uint64_t offset, uint32_t length, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
std::unordered_map< std::string, buffer_t > metadata_t
Definition: XrdEcReader.hh:175
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:916
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
Close the data object.
XrdCl::Pipeline ReadMetadata(size_t index)
void VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
static callback_t ErrorCorrected(Reader *reader, std::shared_ptr< block_t > &self, size_t blkid, size_t strpid)
bool ParseMetadata(XrdCl::ChunkInfo &ch)
friend struct block_t
Definition: XrdEcReader.hh:60
std::vector< char > buffer_t
a buffer type
Definition: XrdEcReader.hh:44
uint64_t GetSize()
Definition: XrdEcReader.hh:119
size_t lstblk
Definition: XrdEcReader.hh:186
Handle an async response.
Definition: XrdClXRootDResponses.hh:1125
std::unordered_set< std::string > missing_t
Definition: XrdEcReader.hh:177
missing_t missing
Definition: XrdEcReader.hh:183
Definition: XrdEcReader.hh:57
void AddMissing(const buffer_t &cdbuff)
void MissingVectorRead(std::shared_ptr< block_t > &block, size_t blkid, size_t strpid, uint16_t timeout=0)
std::function< void(const XrdCl::XRootDStatus &, uint32_t)> callback_t
Definition: XrdEcReader.hh:52
dataarchs_t dataarchs
Definition: XrdEcReader.hh:180
Definition: XrdEcObjCfg.hh:33
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClOperations.hh:319
Reader(ObjCfg &objcfg)
Definition: XrdEcReader.hh:69
virtual ~Reader()
std::unordered_map< std::string, std::shared_ptr< XrdCl::ZipArchive > > dataarchs_t
Definition: XrdEcReader.hh:174