xrootd
XrdClEcHandler.hh
Go to the documentation of this file.
1 /*
2  * XrdClEcHandler.hh
3  *
4  * Created on: 23 Mar 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
10 
12 #include "XrdCl/XrdClUtils.hh"
13 
14 #include "XrdEc/XrdEcReader.hh"
15 #include "XrdEc/XrdEcStrmWriter.hh"
16 
17 #include <memory>
18 
19 namespace XrdCl
20 {
21  class EcHandler : public FilePlugIn
22  {
23  public:
25  fs( redir ),
26  objcfg( objcfg ),
27  curroff( 0 ),
28  cosc( cosc )
29  {
30  }
31 
32  virtual ~EcHandler()
33  {
34  }
35 
36  XrdCl::XRootDStatus Open( uint16_t flags,
37  XrdCl::ResponseHandler *handler,
38  uint16_t timeout )
39  {
40  if( ( flags & XrdCl::OpenFlags::Write ) || ( flags & XrdCl::OpenFlags::Update ) )
41  {
42  if( !( flags & XrdCl::OpenFlags::New ) || // it has to be a new file
43  ( flags & XrdCl::OpenFlags::Delete ) || // truncation is not supported
44  ( flags & XrdCl::OpenFlags::Read ) ) // write + read is not supported
46 
47  writer.reset( new XrdEc::StrmWriter( *objcfg ) );
48  writer->Open( handler ); // TODO impl timeout
49  return XrdCl::XRootDStatus();
50  }
51 
52  if( flags & XrdCl::OpenFlags::Read )
53  {
54  if( flags & XrdCl::OpenFlags::Write )
56 
57  reader.reset( new XrdEc::Reader( *objcfg ) );
58  reader->Open( handler );
59  return XrdCl::XRootDStatus();
60  }
61 
63  }
64 
65  //------------------------------------------------------------------------
67  //------------------------------------------------------------------------
69  uint16_t timeout )
70  {
71  if( writer )
72  {
73  writer->Close( XrdCl::ResponseHandler::Wrap( [this, handler]( XrdCl::XRootDStatus *st, XrdCl::AnyObject *rsp )
74  {
75  writer.reset();
76  if( st->IsOK() && cosc )
77  {
78  std::string commit = redir.GetPath()
79  + "?xrdec.objid=" + objcfg->obj
80  + "&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
81  Buffer arg; arg.FromString( commit );
82  auto st = fs.Query( QueryCode::OpaqueFile, arg, handler );
83  if( !st.IsOK() ) handler->HandleResponse( new XRootDStatus( st ), nullptr );
84  return;
85  }
86  handler->HandleResponse( st, rsp );
87  } ) );
88  return XrdCl::XRootDStatus();
89  }
90 
91  if( reader )
92  {
93  reader->Close( XrdCl::ResponseHandler::Wrap( [this, handler]( XrdCl::XRootDStatus *st, XrdCl::AnyObject *rsp )
94  {
95  reader.reset();
96  handler->HandleResponse( st, rsp );
97  } ) );
98  return XrdCl::XRootDStatus();
99  }
100 
102  }
103 
104  //------------------------------------------------------------------------
106  //------------------------------------------------------------------------
108  XrdCl::ResponseHandler *handler,
109  uint16_t timeout )
110  {
112  return fs.Stat( redir.GetPath(), handler, timeout );
113  }
114 
115  //------------------------------------------------------------------------
117  //------------------------------------------------------------------------
118  XrdCl::XRootDStatus Read( uint64_t offset,
119  uint32_t size,
120  void *buffer,
121  XrdCl::ResponseHandler *handler,
122  uint16_t timeout )
123  {
125 
126  reader->Read( offset, size, buffer, handler );
127  return XrdCl::XRootDStatus();
128  }
129 
130  //------------------------------------------------------------------------
132  //------------------------------------------------------------------------
133  XrdCl::XRootDStatus Write( uint64_t offset,
134  uint32_t size,
135  const void *buffer,
136  XrdCl::ResponseHandler *handler,
137  uint16_t timeout )
138  {
141  writer->Write( size, buffer, handler );
142  curroff += size;
143  return XrdCl::XRootDStatus();
144  }
145 
146  //------------------------------------------------------------------------
148  //------------------------------------------------------------------------
149  bool IsOpen() const
150  {
151  return writer || reader;
152  }
153 
154  private:
155 
158  std::unique_ptr<XrdEc::ObjCfg> objcfg;
159  std::unique_ptr<XrdEc::StrmWriter> writer;
160  std::unique_ptr<XrdEc::Reader> reader;
161  uint64_t curroff;
162  bool cosc;
163  };
164 
165  EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl )
166  {
167  const URL::ParamsMap &params = redirurl.GetParams();
168  // make sure all the xrdec. tokens are present and the values are sane
169  URL::ParamsMap::const_iterator itr = params.find( "xrdec.nbdta" );
170  if( itr == params.end() ) return nullptr;
171  uint8_t nbdta = std::stoul( itr->second );
172 
173  itr = params.find( "xrdec.nbprt" );
174  if( itr == params.end() ) return nullptr;
175  uint8_t nbprt = std::stoul( itr->second );
176 
177  itr = params.find( "xrdec.blksz" );
178  if( itr == params.end() ) return nullptr;
179  uint64_t blksz = std::stoul( itr->second );
180 
181  itr = params.find( "xrdec.plgr" );
182  if( itr == params.end() ) return nullptr;
183  std::vector<std::string> plgr;
184  Utils::splitString( plgr, itr->second, "," );
185  if( plgr.size() < nbdta + nbprt ) return nullptr;
186 
187  itr = params.find( "xrdec.objid" );
188  if( itr == params.end() ) return nullptr;
189  std::string objid = itr->second;
190 
191  itr = params.find( "xrdec.format" );
192  if( itr == params.end() ) return nullptr;
193  size_t format = std::stoul( itr->second );
194  if( format != 1 ) return nullptr; // TODO use constant
195 
196  std::vector<std::string> dtacgi;
197  itr = params.find( "xrdec.dtacgi" );
198  if( itr != params.end() )
199  {
200  Utils::splitString( dtacgi, itr->second, "," );
201  if( plgr.size() != dtacgi.size() ) return nullptr;
202  }
203 
204  std::vector<std::string> mdtacgi;
205  itr = params.find( "xrdec.mdtacgi" );
206  if( itr != params.end() )
207  {
208  Utils::splitString( mdtacgi, itr->second, "," );
209  if( plgr.size() != mdtacgi.size() ) return nullptr;
210  }
211 
212  itr = params.find( "xrdec.cosc" );
213  if( itr == params.end() ) return nullptr;
214  std::string cosc_str = itr->second;
215  if( cosc_str != "true" && cosc_str != "false" ) return nullptr;
216  bool cosc = cosc_str == "true";
217 
218  XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( objid, nbdta, nbprt, blksz / nbdta );
219  objcfg->plgr = std::move( plgr );
220  objcfg->dtacgi = std::move( dtacgi );
221  objcfg->mdtacgi = std::move( mdtacgi );
222 
223  return new EcHandler( headnode, objcfg, cosc );
224  }
225 
226 } /* namespace XrdCl */
227 
228 #endif /* SRC_XRDCL_XRDCLECHANDLER_HH_ */
229 
bool cosc
Definition: XrdClEcHandler.hh:162
Definition: XrdClAnyObject.hh:32
Implementation dependent.
Definition: XrdClFileSystem.hh:58
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
Definition: XrdClEcHandler.hh:165
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:68
virtual ~EcHandler()
Definition: XrdClEcHandler.hh:32
Definition: XrdClEcHandler.hh:21
std::unique_ptr< XrdEc::ObjCfg > objcfg
Definition: XrdClEcHandler.hh:158
Open only for writing.
Definition: XrdClFileSystem.hh:97
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:122
std::vector< std::string > mdtacgi
Definition: XrdEcObjCfg.hh:78
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Definition: XrdEcStrmWriter.hh:52
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:212
uint64_t curroff
Definition: XrdClEcHandler.hh:161
An interface for file plug-ins.
Definition: XrdClPlugInInterface.hh:38
std::vector< std::string > dtacgi
Definition: XrdEcObjCfg.hh:77
std::unique_ptr< XrdEc::Reader > reader
Definition: XrdClEcHandler.hh:160
std::unique_ptr< XrdEc::StrmWriter > writer
Definition: XrdClEcHandler.hh:159
Open for reading and writing.
Definition: XrdClFileSystem.hh:96
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
bool IsOpen() const
Definition: XrdClEcHandler.hh:149
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, bool cosc)
Definition: XrdClEcHandler.hh:24
XrdCl::XRootDStatus Open(uint16_t flags, XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:36
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:53
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
Handle an async response.
Definition: XrdClXRootDResponses.hh:1040
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1070
Definition: XrdEcReader.hh:57
Open only for reading.
Definition: XrdClFileSystem.hh:95
URL representation.
Definition: XrdClURL.hh:30
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
Definition: XrdClFileSystem.hh:86
URL redir
Definition: XrdClEcHandler.hh:156
Send file/filesystem queries to an XRootD cluster.
Definition: XrdClFileSystem.hh:202
XrdCl::XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:118
Definition: XrdClFileSystem.hh:80
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:239
Definition: XrdEcObjCfg.hh:19
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:76
FileSystem fs
Definition: XrdClEcHandler.hh:157
XrdCl::XRootDStatus Stat(bool force, XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:107
XrdCl::XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition: XrdClEcHandler.hh:133
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Binary blob representation.
Definition: XrdClBuffer.hh:33