xrootd
XrdEcStrmWriter.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_
26 #define SRC_XRDEC_XRDECSTRMWRITER_HH_
27 
28 #include "XrdEc/XrdEcWrtBuff.hh"
29 #include "XrdEc/XrdEcThreadPool.hh"
30 
34 
35 #include <random>
36 #include <chrono>
37 #include <future>
38 #include <atomic>
39 #include <memory>
40 #include <vector>
41 #include <thread>
42 #include <iterator>
43 
44 #include <sys/stat.h>
45 
46 namespace XrdEc
47 {
48  //---------------------------------------------------------------------------
51  //---------------------------------------------------------------------------
52  class StrmWriter
53  {
54  //-------------------------------------------------------------------------
55  // Type for queue of buffers to be written
56  //-------------------------------------------------------------------------
58 
59  public:
60 
61  //-----------------------------------------------------------------------
63  //-----------------------------------------------------------------------
65  writer_thread_stop( false ),
67  next_blknb( 0 ),
68  global_status( this )
69  {
70  }
71 
72  //-----------------------------------------------------------------------
74  //-----------------------------------------------------------------------
75  virtual ~StrmWriter()
76  {
77  writer_thread_stop = true;
79  writer_thread.join();
80  }
81 
82  //-----------------------------------------------------------------------
86  //-----------------------------------------------------------------------
87  void Open( XrdCl::ResponseHandler *handler );
88 
89  //-----------------------------------------------------------------------
95  //-----------------------------------------------------------------------
96  void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler );
97 
98  //-----------------------------------------------------------------------
102  //-----------------------------------------------------------------------
103  void Close( XrdCl::ResponseHandler *handler );
104 
105  private:
106 
107  //-----------------------------------------------------------------------
108  // Global status of the StrmWriter
109  //-----------------------------------------------------------------------
111  {
112  //---------------------------------------------------------------------
113  // Constructor
114  //---------------------------------------------------------------------
116  bytesleft( 0 ),
117  stopped_writing( false ),
118  closeHandler( 0 )
119  {
120  }
121 
122  //---------------------------------------------------------------------
123  // Report status of write operation
124  //---------------------------------------------------------------------
125  void report_wrt( const XrdCl::XRootDStatus &st, uint64_t wrtsize )
126  {
127  std::unique_lock<std::recursive_mutex> lck( mtx );
128  //-------------------------------------------------------------------
129  // Update the global status
130  //-------------------------------------------------------------------
131  bytesleft -= wrtsize;
132  if( !st.IsOK() ) status = st;
133 
134  //-------------------------------------------------------------------
135  // check if we are done, and if yes call the close implementation
136  //-------------------------------------------------------------------
137  if( bytesleft == 0 && stopped_writing )
138  {
139  lck.unlock();
141  }
142  }
143 
144  //---------------------------------------------------------------------
145  // Report status of open operation
146  //---------------------------------------------------------------------
147  inline void report_open( const XrdCl::XRootDStatus &st )
148  {
149  report_wrt( st, 0 );
150  }
151 
152  //---------------------------------------------------------------------
153  // Indicate that the user issued close
154  //---------------------------------------------------------------------
156  {
157  std::unique_lock<std::recursive_mutex> lck( mtx );
158  //-------------------------------------------------------------------
159  // There will be no more new write requests
160  //-------------------------------------------------------------------
161  stopped_writing = true;
162  //-------------------------------------------------------------------
163  // If there are no outstanding writes, we can simply call the close
164  // routine
165  //-------------------------------------------------------------------
166  if( bytesleft == 0 ) return writer->CloseImpl( handler );
167  //-------------------------------------------------------------------
168  // Otherwise we save the handler for later
169  //-------------------------------------------------------------------
170  closeHandler = handler;
171  }
172 
173  //---------------------------------------------------------------------
174  // get the global status value
175  //---------------------------------------------------------------------
176  inline const XrdCl::XRootDStatus& get() const
177  {
178  std::unique_lock<std::recursive_mutex> lck( mtx );
179  return status;
180  }
181 
182  inline void issue_write( uint64_t wrtsize )
183  {
184  std::unique_lock<std::recursive_mutex> lck( mtx );
185  bytesleft += wrtsize;
186  }
187 
188  private:
189  mutable std::recursive_mutex mtx;
190  StrmWriter *writer; //> pointer to the StrmWriter
191  uint64_t bytesleft; //> bytes left to be written
192  bool stopped_writing; //> true, if user called close
193  XrdCl::XRootDStatus status; //> the global status
194  XrdCl::ResponseHandler *closeHandler; //> user close handler
195  };
196 
197  //-----------------------------------------------------------------------
201  //-----------------------------------------------------------------------
202  inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff )
203  {
204  // the routine to be called in the thread-pool
205  // - does erasure coding
206  // - calculates crc32cs
207  static auto prepare_buff = []( WrtBuff *wrtbuff )
208  {
209  std::unique_ptr<WrtBuff> ptr( wrtbuff );
210  ptr->Encode();
211  return ptr.release();
212  };
213  buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) );
214  }
215 
216  //-----------------------------------------------------------------------
220  //-----------------------------------------------------------------------
221  inline std::unique_ptr<WrtBuff> DequeueBuff()
222  {
223  std::future<WrtBuff*> ftr = buffers.dequeue();
224  std::unique_ptr<WrtBuff> result( ftr.get() );
225  return std::move( result );
226  }
227 
228  //-----------------------------------------------------------------------
232  //-----------------------------------------------------------------------
233  static void writer_routine( StrmWriter *me )
234  {
235  try
236  {
237  while( !me->writer_thread_stop )
238  {
239  std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() );
240  if( !wrtbuff ) continue;
241  me->WriteBuff( std::move( wrtbuff ) );
242  }
243  }
244  catch( const buff_queue::wait_interrupted& ){ }
245  }
246 
247  //-----------------------------------------------------------------------
251  //-----------------------------------------------------------------------
252  void WriteBuff( std::unique_ptr<WrtBuff> buff );
253 
254  //-----------------------------------------------------------------------
258  //-----------------------------------------------------------------------
259  std::vector<char> GetMetadataBuffer();
260 
261  //-----------------------------------------------------------------------
265  //-----------------------------------------------------------------------
266  void CloseImpl( XrdCl::ResponseHandler *handler );
267 
268  const ObjCfg &objcfg;
269  std::unique_ptr<WrtBuff> wrtbuff; //< current write buffer
270  std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs; //< ZIP archives with data
271  std::vector<std::shared_ptr<XrdCl::File>> metadataarchs; //< ZIP archives with metadata
272  std::vector<std::vector<char>> cdbuffs; //< buffers with CDs
273  buff_queue buffers; //< queue of buffer for writing
274  //< (waiting to be erasure coded)
275  std::atomic<bool> writer_thread_stop; //< true if the writer thread should be stopped,
276  //< flase otherwise
277  std::thread writer_thread; //< handle to the writer thread
278  size_t next_blknb; //< number of the next block to be created
279  global_status_t global_status; //< global status of the writer
280  };
281 
282 }
283 
284 #endif /* SRC_XRDEC_XRDECSTRMWRITER_HH_ */
std::vector< std::shared_ptr< XrdCl::File > > metadataarchs
Definition: XrdEcStrmWriter.hh:271
global_status_t global_status
Definition: XrdEcStrmWriter.hh:279
void CloseImpl(XrdCl::ResponseHandler *handler)
void report_wrt(const XrdCl::XRootDStatus &st, uint64_t wrtsize)
Definition: XrdEcStrmWriter.hh:125
sync_queue< std::future< WrtBuff * > > buff_queue
Definition: XrdEcStrmWriter.hh:57
void EnqueueBuff(std::unique_ptr< WrtBuff > wrtbuff)
Definition: XrdEcStrmWriter.hh:202
bool stopped_writing
Definition: XrdEcStrmWriter.hh:192
buff_queue buffers
Definition: XrdEcStrmWriter.hh:273
StrmWriter * writer
Definition: XrdEcStrmWriter.hh:190
void WriteBuff(std::unique_ptr< WrtBuff > buff)
void report_open(const XrdCl::XRootDStatus &st)
Definition: XrdEcStrmWriter.hh:147
virtual ~StrmWriter()
Destructor.
Definition: XrdEcStrmWriter.hh:75
static ThreadPool & Instance()
Singleton access.
Definition: XrdEcThreadPool.hh:150
std::atomic< bool > writer_thread_stop
Definition: XrdEcStrmWriter.hh:275
void Open(XrdCl::ResponseHandler *handler)
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:122
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
Definition: XrdEcStrmWriter.hh:52
void Close(XrdCl::ResponseHandler *handler)
Definition: XrdEcUtilities.hh:175
std::unique_ptr< WrtBuff > DequeueBuff()
Definition: XrdEcStrmWriter.hh:221
std::thread writer_thread
Definition: XrdEcStrmWriter.hh:277
uint64_t bytesleft
Definition: XrdEcStrmWriter.hh:191
void enqueue(Element &&element)
Definition: XrdEcUtilities.hh:187
std::recursive_mutex mtx
Definition: XrdEcStrmWriter.hh:189
Element dequeue()
Definition: XrdEcUtilities.hh:198
std::vector< std::vector< char > > cdbuffs
Definition: XrdEcStrmWriter.hh:272
Request status.
Definition: XrdClXRootDResponses.hh:218
size_t next_blknb
Definition: XrdEcStrmWriter.hh:278
Definition: XrdEcUtilities.hh:170
Definition: XrdEcStrmWriter.hh:110
XrdCl::XRootDStatus status
Definition: XrdEcStrmWriter.hh:193
static void writer_routine(StrmWriter *me)
Definition: XrdEcStrmWriter.hh:233
Handle an async response.
Definition: XrdClXRootDResponses.hh:1040
const ObjCfg & objcfg
Definition: XrdEcStrmWriter.hh:268
void issue_write(uint64_t wrtsize)
Definition: XrdEcStrmWriter.hh:182
global_status_t(StrmWriter *writer)
Definition: XrdEcStrmWriter.hh:115
Definition: XrdEcWrtBuff.hh:132
StrmWriter(const ObjCfg &objcfg)
Constructor.
Definition: XrdEcStrmWriter.hh:64
Definition: XrdEcObjCfg.hh:19
void interrupt()
Definition: XrdEcUtilities.hh:236
XrdCl::ResponseHandler * closeHandler
Definition: XrdEcStrmWriter.hh:194
void issue_close(XrdCl::ResponseHandler *handler)
Definition: XrdEcStrmWriter.hh:155
Definition: XrdClZipArchive.hh:45
std::vector< char > GetMetadataBuffer()
std::unique_ptr< WrtBuff > wrtbuff
Definition: XrdEcStrmWriter.hh:269
std::vector< std::shared_ptr< XrdCl::ZipArchive > > dataarchs
Definition: XrdEcStrmWriter.hh:270