xrootd
XrdClAsyncWriter.hh
Go to the documentation of this file.
1 /*
2  * XrdClSocketWriter.h
3  *
4  * Created on: 3 May 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLASYNCWRITER_HH_
9 #define SRC_XRDCL_XRDCLASYNCWRITER_HH_
10 
11 #include "XrdCl/XrdClStatus.hh"
12 #include "XrdCl/XrdClSocket.hh"
13 #include "XrdCl/XrdClMessage.hh"
14 #include "XrdCl/XrdClLog.hh"
15 #include "XrdCl/XrdClDefaultEnv.hh"
16 #include "XrdCl/XrdClConstants.hh"
17 
18 namespace XrdCl
19 {
20 
22  {
23  public:
24 
26  const std::string &streamName ) : socket( socket ),
29  {
30  }
31 
32  virtual ~AsyncWriter()
33  {
34  }
35 
37  {
38  if( !status.IsOK() ) return status;
39  switch( status.code )
40  {
41  case suDone : return status;
42  case suPartial : return status;
43  case suAlreadyDone : return status;
44  case suNotStarted : return ( status = WriteImpl() );
45  case suRetry : return ( status = WriteImpl() );
46  case suContinue : return ( status = WriteImpl() );
47  default: return status;
48  }
49  }
50 
51  protected:
52 
53  virtual Status WriteImpl() = 0;
54 
56  std::string streamName;
58  };
59 
60  class MsgWriter : public AsyncWriter
61  {
62  public:
64  const std::string &streamName ) : AsyncWriter( socket, streamName ),
65  msg( nullptr )
66  {
67  }
68 
69  inline bool HasMsg()
70  {
71  return bool( msg );
72  }
73 
74  void Reset( Message *msg = nullptr )
75  {
76  this->msg.reset( msg );
78  }
79 
81  {
82  if( !msg ) return status;
83  Log *log = DefaultEnv::GetLog();
84 
85  //--------------------------------------------------------------------------
86  // Try to write down the current message
87  //--------------------------------------------------------------------------
88  size_t leftToBeWritten = msg->GetSize()-msg->GetCursor();
89  while( leftToBeWritten )
90  {
91  int bytesWritten = 0;
92  status = socket.Send( msg->GetBufferAtCursor(), leftToBeWritten, bytesWritten );
93  if( !status.IsOK() )
94  {
95  msg->SetCursor( 0 );
96  return status;
97  }
98  if( status.code == suRetry ) return status;
99  msg->AdvanceCursor( bytesWritten );
100  leftToBeWritten -= bytesWritten;
101  }
102 
103  //--------------------------------------------------------------------------
104  // We have written the message successfully
105  //--------------------------------------------------------------------------
106  log->Dump( AsyncSockMsg, "[%s] Wrote a message: %s (0x%x), %d bytes",
107  streamName.c_str(), msg->GetDescription().c_str(),
108  msg.get(), msg->GetSize() );
109  return XRootDStatus();
110  }
111 
112  private:
113  std::unique_ptr<Message> msg;
114  };
115 
116  class ChunkWriter : public AsyncWriter // TODO
117  {
118 
119  };
120 
121  class KBuffWriter : public AsyncWriter // TODO
122  {
123 
124  };
125 
126 } /* namespace XrdCl */
127 
128 #endif /* SRC_XRDCL_XRDCLASYNCWRITER_HH_ */
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
Definition: XrdClAsyncWriter.hh:60
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
virtual Status WriteImpl()=0
Status WriteImpl()
Definition: XrdClAsyncWriter.hh:80
Definition: XrdClAsyncWriter.hh:116
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:40
const uint16_t suDone
Definition: XrdClStatus.hh:38
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:140
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:122
const uint16_t suPartial
Definition: XrdClStatus.hh:41
Socket & socket
Definition: XrdClAsyncWriter.hh:55
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Procedure execution status.
Definition: XrdClStatus.hh:112
Definition: XrdClAsyncWriter.hh:21
static Log * GetLog()
Get default log.
Status Write()
Definition: XrdClAsyncWriter.hh:36
Definition: XrdClAsyncWriter.hh:121
AsyncWriter(Socket &socket, const std::string &streamName)
Definition: XrdClAsyncWriter.hh:25
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
const uint16_t suNotStarted
Definition: XrdClStatus.hh:43
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
bool HasMsg()
Definition: XrdClAsyncWriter.hh:69
std::unique_ptr< Message > msg
Definition: XrdClAsyncWriter.hh:113
std::string streamName
Definition: XrdClAsyncWriter.hh:56
const uint16_t suContinue
Definition: XrdClStatus.hh:39
Status status
Definition: XrdClAsyncWriter.hh:57
virtual ~AsyncWriter()
Definition: XrdClAsyncWriter.hh:32
MsgWriter(Socket &socket, const std::string &streamName)
Definition: XrdClAsyncWriter.hh:63
void Reset(Message *msg=nullptr)
Definition: XrdClAsyncWriter.hh:74
const uint16_t suRetry
Definition: XrdClStatus.hh:40
A network socket.
Definition: XrdClSocket.hh:41
Handle diagnostics.
Definition: XrdClLog.hh:102
XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)