XRootD
XrdLinkXeq.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d L i n k X e q . c c */
4 /* */
5 /* (c) 2018 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <limits.h>
31 #include <poll.h>
32 #include <signal.h>
33 #include <cstdio>
34 #include <cstring>
35 #include <unistd.h>
36 #include <sys/types.h>
37 #include <sys/uio.h>
38 
39 #if defined(__linux__) || defined(__GNU__)
40 #include <netinet/tcp.h>
41 #if !defined(TCP_CORK)
42 #undef HAVE_SENDFILE
43 #endif
44 #endif
45 
46 #ifdef HAVE_SENDFILE
47 
48 #if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49 #include <sys/sendfile.h>
50 #endif
51 
52 #endif
53 
54 #include "XrdSys/XrdSysAtomics.hh"
55 #include "XrdSys/XrdSysError.hh"
56 #include "XrdSys/XrdSysFD.hh"
57 #include "XrdSys/XrdSysPlatform.hh"
58 
59 #include "Xrd/XrdBuffer.hh"
60 #include "Xrd/XrdLink.hh"
61 #include "Xrd/XrdLinkCtl.hh"
62 #include "Xrd/XrdLinkXeq.hh"
63 #include "Xrd/XrdPoll.hh"
64 #include "Xrd/XrdScheduler.hh"
65 #include "Xrd/XrdSendQ.hh"
66 #include "Xrd/XrdTcpMonPin.hh"
67 
68 #define TRACE_IDENT ID
69 #include "Xrd/XrdTrace.hh"
70 
71 /******************************************************************************/
72 /* G l o b a l s */
73 /******************************************************************************/
74 
75 namespace
76 {
77 int getIovMax()
78 {
79 int maxiov;
80 #ifdef _SC_IOV_MAX
81  if ((maxiov = sysconf(_SC_IOV_MAX)) > 0) return maxiov;
82 #endif
83 #ifdef IOV_MAX
84  return IOV_MAX;
85 #else
86  return 1024;
87 #endif
88 }
89 };
90 
91 namespace XrdGlobal
92 {
93 extern XrdSysError Log;
94 extern XrdScheduler Sched;
95 extern XrdTlsContext *tlsCtx;
97 extern int devNull;
98  int maxIOV = getIovMax();
99 };
100 
101 using namespace XrdGlobal;
102 
103 /******************************************************************************/
104 /* S t a t i c s */
105 /******************************************************************************/
106 
107  const char *XrdLinkXeq::TraceID = "LinkXeq";
108 
109  long long XrdLinkXeq::LinkBytesIn = 0;
110  long long XrdLinkXeq::LinkBytesOut = 0;
111  long long XrdLinkXeq::LinkConTime = 0;
112  long long XrdLinkXeq::LinkCountTot = 0;
113  int XrdLinkXeq::LinkCount = 0;
114  int XrdLinkXeq::LinkCountMax = 0;
115  int XrdLinkXeq::LinkTimeOuts = 0;
116  int XrdLinkXeq::LinkStalls = 0;
117  int XrdLinkXeq::LinkSfIntr = 0;
119 
120 /******************************************************************************/
121 /* C o n s t r u c t o r */
122 /******************************************************************************/
123 
124 XrdLinkXeq::XrdLinkXeq() : XrdLink(*this), PollInfo((XrdLink &)*this)
125 {
127 }
128 
130 {
131  memcpy(Uname+sizeof(Uname)-7, "anon.0@", 7);
132  strcpy(Lname, "somewhere");
133  ID = &Uname[sizeof(Uname)-5];
134  Comment = ID;
135  sendQ = 0;
136  stallCnt = stallCntTot = 0;
137  tardyCnt = tardyCntTot = 0;
138  SfIntr = 0;
139  isIdle = 0;
141  LockReads= false;
142  KeepFD = false;
143  Protocol = 0;
144  ProtoAlt = 0;
145 
146  LinkInfo.Reset();
147  PollInfo.Zorch();
148  ResetLink();
149 }
150 
151 /******************************************************************************/
152 /* B a c k l o g */
153 /******************************************************************************/
154 
156 {
158 
159 // Return backlog information
160 //
161  return (sendQ ? sendQ->Backlog() : 0);
162 }
163 
164 /******************************************************************************/
165 /* C l i e n t */
166 /******************************************************************************/
167 
168 int XrdLinkXeq::Client(char *nbuf, int nbsz)
169 {
170  int ulen;
171 
172 // Generate full client name
173 //
174  if (nbsz <= 0) return 0;
175  ulen = (Lname - ID);
176  if ((ulen + HNlen) >= nbsz) ulen = 0;
177  else {strncpy(nbuf, ID, ulen);
178  strcpy(nbuf+ulen, HostName);
179  ulen += HNlen;
180  }
181  return ulen;
182 }
183 
184 /******************************************************************************/
185 /* C l o s e */
186 /******************************************************************************/
187 
188 int XrdLinkXeq::Close(bool defer)
190  int csec, fd, rc = 0;
191 
192 // If a defer close is requested, we can close the descriptor but we must
193 // keep the slot number to prevent a new client getting the same fd number.
194 // Linux is peculiar in that any in-progress operations will remain in that
195 // state even after the FD is closed unless there is some activity either on
196 // the connection or an event occurs that causes an operation restart. We
197 // portably solve this problem by issuing a shutdown() on the socket prior
198 // closing it. On most platforms, this informs readers that the connection is
199 // gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
200 // nonblocking mode is enabled, we need to do this in a separate thread as
201 // a shutdown may block for a pretty long time if lots\ of messages are queued.
202 // We will ask the SendQ object to schedule the shutdown for us before it
203 // commits suicide.
204 // Note that we can hold the opMutex while we also get the wrMutex.
205 //
206  if (defer)
207  {if (!sendQ) Shutdown(false);
208  else {TRACEI(DEBUG, "Shutdown FD " <<LinkInfo.FD<<" only via SendQ");
209  LinkInfo.InUse++;
210  LinkInfo.FD = -LinkInfo.FD; // Leave poll version untouched!
211  wrMutex.Lock();
212  sendQ->Terminate(this);
213  sendQ = 0;
214  wrMutex.UnLock();
215  }
216  return 0;
217  }
218 
219 // If we got here then this is not a deferred close so we just need to check
220 // if there is a sendq appendage we need to get rid of.
221 //
222  if (sendQ)
223  {wrMutex.Lock();
224  sendQ->Terminate();
225  sendQ = 0;
226  wrMutex.UnLock();
227  }
228 
229 // Multiple protocols may be bound to this link. If it is in use, defer the
230 // actual close until the use count drops to one.
231 //
232  while(LinkInfo.InUse > 1)
233  {opHelper.UnLock();
234  TRACEI(DEBUG, "Close FD "<<LinkInfo.FD <<" deferred, use count="
235  <<LinkInfo.InUse);
236  Serialize();
237  opHelper.Lock(&LinkInfo.opMutex);
238  }
239  LinkInfo.InUse--;
240  Instance = 0;
241 
242 // Add up the statistic for this link
243 //
244  syncStats(&csec);
245 
246 // Cleanup TLS if it is active
247 //
248  if (isTLS) tlsIO.Shutdown();
249 
250 // Clean this link up
251 //
252  if (Protocol) {Protocol->Recycle(this, csec, LinkInfo.Etext); Protocol = 0;}
253  if (ProtoAlt) {ProtoAlt->Recycle(this, csec, LinkInfo.Etext); ProtoAlt = 0;}
254  if (LinkInfo.Etext) {free(LinkInfo.Etext); LinkInfo.Etext = 0;}
255  LinkInfo.InUse = 0;
256 
257 // At this point we can have no lock conflicts, so if someone is waiting for
258 // us to terminate let them know about it. Note that we will get the condvar
259 // mutex while we hold the opMutex. This is the required order! We will also
260 // zero out the pointer to the condvar while holding the opmutex.
261 //
262  if (LinkInfo.KillcvP)
263  {LinkInfo.KillcvP->Lock();
266  LinkInfo.KillcvP = 0;
267  }
268 
269 // Remove ourselves from the poll table and then from the Link table. We may
270 // not hold on to the opMutex when we acquire the LTMutex. However, the link
271 // table needs to be cleaned up prior to actually closing the socket. So, we
272 // do some fancy footwork to prevent multiple closes of this link.
273 //
274  fd = abs(LinkInfo.FD);
275  if (PollInfo.FD > 0)
277  PollInfo.FD = -1;
278  opHelper.UnLock();
279  XrdLinkCtl::Unhook(fd);
280  } else opHelper.UnLock();
281 
282 // Invoke the TCP monitor if it was loaded.
283 //
284  if (TcpMonPin && fd > 2)
285  {XrdTcpMonPin::LinkInfo lnkInfo;
286  lnkInfo.tident = ID;
287  lnkInfo.fd = fd;
288  lnkInfo.consec = csec;
289  lnkInfo.bytesIn = BytesInTot;
290  lnkInfo.bytesOut = BytesOutTot;
291  TcpMonPin->Monitor(Addr, lnkInfo, sizeof(lnkInfo));
292  }
293 
294 // Close the file descriptor if it isn't being shared. Do it as the last
295 // thing because closes and accepts and not interlocked.
296 //
297  if (fd >= 2) {if (KeepFD) rc = 0;
298  else rc = (close(fd) < 0 ? errno : 0);
299  }
300  if (rc) Log.Emsg("Link", rc, "close", ID);
301  return rc;
302 }
303 
304 /******************************************************************************/
305 /* D o I t */
306 /******************************************************************************/
307 
309 {
310  int rc;
311 
312 // The Process() return code tells us what to do:
313 // < 0 -> Stop getting requests,
314 // -EINPROGRESS leave link disabled but otherwise all is well
315 // -n Error, disable and close the link
316 // = 0 -> OK, get next request, if allowed, o/w enable the link
317 // > 0 -> Slow link, stop getting requests and enable the link
318 //
319  if (Protocol)
320  do {rc = Protocol->Process(this);} while (!rc && Sched.canStick());
321  else {Log.Emsg("Link", "Dispatch on closed link", ID);
322  return;
323  }
324 
325 // Either re-enable the link and cycle back waiting for a new request, leave
326 // disabled, or terminate the connection.
327 //
328  if (rc >= 0)
330  else if (rc != -EINPROGRESS) Close();
331 }
332 
333 /******************************************************************************/
334 /* g e t P e e r C e r t s */
335 /******************************************************************************/
336 
338 {
339  return (isTLS ? tlsIO.getCerts(true) : 0);
340 }
341 
342 /******************************************************************************/
343 /* P e e k */
344 /******************************************************************************/
345 
346 int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
347 {
348  XrdSysMutexHelper theMutex;
349  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
350  ssize_t mlen;
351  int retc;
352 
353 // Lock the read mutex if we need to, the helper will unlock it upon exit
354 //
355  if (LockReads) theMutex.Lock(&rdMutex);
356 
357 // Wait until we can actually read something
358 //
359  isIdle = 0;
360  do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
361  if (retc != 1)
362  {if (retc == 0) return 0;
363  return Log.Emsg("Link", -errno, "poll", ID);
364  }
365 
366 // Verify it is safe to read now
367 //
368  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
369  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
370  return -1;
371  }
372 
373 // Do the peek.
374 //
375  do {mlen = recv(LinkInfo.FD, Buff, Blen, MSG_PEEK);}
376  while(mlen < 0 && errno == EINTR);
377 
378 // Return the result
379 //
380  if (mlen >= 0) return int(mlen);
381  Log.Emsg("Link", errno, "peek on", ID);
382  return -1;
383 }
384 
385 /******************************************************************************/
386 /* R e c v */
387 /******************************************************************************/
388 
389 int XrdLinkXeq::Recv(char *Buff, int Blen)
390 {
391  ssize_t rlen;
392 
393 // Note that we will read only as much as is queued. Use Recv() with a
394 // timeout to receive as much data as possible.
395 //
396  if (LockReads) rdMutex.Lock();
397  isIdle = 0;
398  do {rlen = read(LinkInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
399  if (rlen > 0) AtomicAdd(BytesIn, rlen);
400  if (LockReads) rdMutex.UnLock();
401 
402  if (rlen >= 0) return int(rlen);
403  if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
404  return -1;
405 }
406 
407 /******************************************************************************/
408 
409 int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
410 {
411  XrdSysMutexHelper theMutex;
412  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
413  ssize_t rlen, totlen = 0;
414  int retc;
415 
416 // Lock the read mutex if we need to, the helper will unlock it upon exit
417 //
418  if (LockReads) theMutex.Lock(&rdMutex);
419 
420 // Wait up to timeout milliseconds for data to arrive
421 //
422  isIdle = 0;
423  while(Blen > 0)
424  {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
425  if (retc != 1)
426  {if (retc == 0)
427  {tardyCnt++;
428  if (totlen)
429  {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
430  AtomicAdd(BytesIn, totlen);
431  }
432  return int(totlen);
433  }
434  return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
435  }
436 
437  // Verify it is safe to read now
438  //
439  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
440  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
441  "polling", ID);
442  return -1;
443  }
444 
445  // Read as much data as you can. Note that we will force an error
446  // if we get a zero-length read after poll said it was OK.
447  //
448  do {rlen = recv(LinkInfo.FD, Buff, Blen, 0);}
449  while(rlen < 0 && errno == EINTR);
450  if (rlen <= 0)
451  {if (!rlen) return -ENOMSG;
452  if (LinkInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
453  return -1;
454  }
455  totlen += rlen; Blen -= rlen; Buff += rlen;
456  }
457 
458  AtomicAdd(BytesIn, totlen);
459  return int(totlen);
460 }
461 
462 /******************************************************************************/
463 
464 int XrdLinkXeq::Recv(const struct iovec *iov, int iocnt, int timeout)
465 {
466  XrdSysMutexHelper theMutex;
467  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
468  int retc, rlen;
469 
470 // Lock the read mutex if we need to, the helper will unlock it upon exit
471 //
472  if (LockReads) theMutex.Lock(&rdMutex);
473 
474 // Wait up to timeout milliseconds for data to arrive
475 //
476  isIdle = 0;
477  do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
478  if (retc != 1)
479  {if (retc == 0)
480  {tardyCnt++;
481  return 0;
482  }
483  return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
484  }
485 
486 // Verify it is safe to read now
487 //
488  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
489  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
490  return -1;
491  }
492 
493 // If the iocnt is within limits then just go ahead and read once.
494 //
495  if (iocnt <= maxIOV)
496  {rlen = RecvIOV(iov, iocnt);
497  if (rlen > 0) {AtomicAdd(BytesIn, rlen);}
498  return rlen;
499  }
500 
501 // We will have to break this up into allowable segments and we need to add up
502 // the bytes in each segment so that we know when to stop reading.
503 //
504  int seglen, segcnt = maxIOV, totlen = 0;
505  do {seglen = 0;
506  for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
507  if ((rlen = RecvIOV(iov, segcnt)) < 0) return rlen;
508  totlen += rlen;
509  if (rlen < seglen) break;
510  iov += segcnt;
511  iocnt -= segcnt;
512  if (iocnt <= maxIOV) segcnt = iocnt;
513  } while(iocnt > 0);
514 
515 // All done
516 //
517  AtomicAdd(BytesIn, totlen);
518  return totlen;
519 }
520 
521 /******************************************************************************/
522 /* R e c v A l l */
523 /******************************************************************************/
524 
525 int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
526 {
527  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
528  ssize_t rlen;
529  int retc;
530 
531 // Check if timeout specified. Notice that the timeout is the max we will
532 // for some data. We will wait forever for all the data. Yeah, it's weird.
533 //
534  if (timeout >= 0)
535  {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
536  if (retc != 1)
537  {if (!retc) return -ETIMEDOUT;
538  Log.Emsg("Link",errno,"poll",ID);
539  return -1;
540  }
541  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
542  {Log.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
543  return -1;
544  }
545  }
546 
547 // Note that we will block until we receive all he bytes.
548 //
549  if (LockReads) rdMutex.Lock();
550  isIdle = 0;
551  do {rlen = recv(LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
552  while(rlen < 0 && errno == EINTR);
553  if (rlen > 0) AtomicAdd(BytesIn, rlen);
554  if (LockReads) rdMutex.UnLock();
555 
556  if (int(rlen) == Blen) return Blen;
557  if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
558  else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
559  else if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
560  return -1;
561 }
562 
563 /******************************************************************************/
564 /* Protected: R e c v I O V */
565 /******************************************************************************/
566 
567 int XrdLinkXeq::RecvIOV(const struct iovec *iov, int iocnt)
568 {
569  ssize_t retc = 0;
570 
571 // Read the data in. On some version of Unix (e.g., Linux) a readv() may
572 // end at any time without reading all the bytes when directed to a socket.
573 // We always return the number bytes read (or an error). The caller needs to
574 // restart the read at the appropriate place in the iovec when more data arrives.
575 //
576  do {retc = readv(LinkInfo.FD, iov, iocnt);}
577  while(retc < 0 && errno == EINTR);
578 
579 // Check how we completed
580 //
581  if (retc < 0) Log.Emsg("Link", errno, "receive from", ID);
582  return retc;
583 }
584 
585 /******************************************************************************/
586 /* R e g i s t e r */
587 /******************************************************************************/
588 
589 bool XrdLinkXeq::Register(const char *hName)
590 {
591 
592 // First see if we can register this name with the address object
593 //
594  if (!Addr.Register(hName)) return false;
595 
596 // Make appropriate changes here
597 //
598  if (HostName) free(HostName);
599  HostName = strdup(hName);
600  strlcpy(Lname, hName, sizeof(Lname));
601  return true;
602 }
603 
604 /******************************************************************************/
605 /* S e n d */
606 /******************************************************************************/
607 
608 int XrdLinkXeq::Send(const char *Buff, int Blen)
609 {
610  ssize_t retc = 0, bytesleft = Blen;
611 
612 // Get a lock
613 //
614  wrMutex.Lock();
615  isIdle = 0;
616  AtomicAdd(BytesOut, Blen);
617 
618 // Do non-blocking writes if we are setup to do so.
619 //
620  if (sendQ)
621  {retc = sendQ->Send(Buff, Blen);
622  wrMutex.UnLock();
623  return retc;
624  }
625 
626 // Write the data out
627 //
628  while(bytesleft)
629  {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
630  {if (errno == EINTR) continue;
631  else break;
632  }
633  bytesleft -= retc; Buff += retc;
634  }
635 
636 // All done
637 //
638  wrMutex.UnLock();
639  if (retc >= 0) return Blen;
640  Log.Emsg("Link", errno, "send to", ID);
641  return -1;
642 }
643 
644 /******************************************************************************/
645 
646 int XrdLinkXeq::Send(const struct iovec *iov, int iocnt, int bytes)
647 {
648  int retc;
649  static int maxIOV = -1;
650  if (maxIOV == -1) {
651 #ifdef _SC_IOV_MAX
652  maxIOV = sysconf(_SC_IOV_MAX);
653  if (maxIOV == -1)
654 #endif
655 #ifdef IOV_MAX
656  maxIOV = IOV_MAX;
657 #else
658  maxIOV = 1024;
659 #endif
660  }
661 
662 // Get a lock and assume we will be successful (statistically we are)
663 //
664  wrMutex.Lock();
665  isIdle = 0;
666  AtomicAdd(BytesOut, bytes);
667 
668 // Do non-blocking writes if we are setup to do so.
669 //
670  if (sendQ)
671  {retc = sendQ->Send(iov, iocnt, bytes);
672  wrMutex.UnLock();
673  return retc;
674  }
675 
676 // If the iocnt is within limits then just go ahead and write this out
677 //
678  if (iocnt <= maxIOV)
679  {retc = SendIOV(iov, iocnt, bytes);
680  wrMutex.UnLock();
681  return retc;
682  }
683 
684 // We will have to break this up into allowable segments
685 //
686  int seglen, segcnt = maxIOV, iolen = 0;
687  do {seglen = 0;
688  for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
689  if ((retc = SendIOV(iov, segcnt, seglen)) < 0)
690  {wrMutex.UnLock();
691  return retc;
692  }
693  iolen += retc;
694  iov += segcnt;
695  iocnt -= segcnt;
696  if (iocnt <= maxIOV) segcnt = iocnt;
697  } while(iocnt > 0);
698 
699 // All done
700 //
701  wrMutex.UnLock();
702  return iolen;
703 }
704 
705 /******************************************************************************/
706 
707 int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
708 {
709 #if !defined(HAVE_SENDFILE)
710 
711  return -1;
712 
713 #elif defined(__solaris__)
714 
715  sendfilevec_t vecSF[XrdOucSFVec::sfMax], *vecSFP = vecSF;
716  size_t xframt, totamt, bytes = 0;
717  ssize_t retc;
718  int i = 0;
719 
720 // Construct the sendfilev() vector
721 //
722  for (i = 0; i < sfN; sfP++, i++)
723  {if (sfP->fdnum < 0)
724  {vecSF[i].sfv_fd = SFV_FD_SELF;
725  vecSF[i].sfv_off = (off_t)sfP->buffer;
726  } else {
727  vecSF[i].sfv_fd = sfP->fdnum;
728  vecSF[i].sfv_off = sfP->offset;
729  }
730  vecSF[i].sfv_flag = 0;
731  vecSF[i].sfv_len = sfP->sendsz;
732  bytes += sfP->sendsz;
733  }
734  totamt = bytes;
735 
736 // Lock the link, issue sendfilev(), and unlock the link. The documentation
737 // is very spotty and inconsistent. We can only retry this operation under
738 // very limited conditions.
739 //
740  wrMutex.Lock();
741  isIdle = 0;
742 do{retc = sendfilev(LinkInfo.FD, vecSFP, sfN, &xframt);
743 
744 // Check if all went well and return if so (usual case)
745 //
746  if (xframt == bytes)
747  {AtomicAdd(BytesOut, bytes);
748  wrMutex.UnLock();
749  return totamt;
750  }
751 
752 // The only one we will recover from is EINTR. We cannot legally get EAGAIN.
753 //
754  if (retc < 0 && errno != EINTR) break;
755 
756 // Try to resume the transfer
757 //
758  if (xframt > 0)
759  {AtomicAdd(BytesOut, xframt); bytes -= xframt; SfIntr++;
760  while(xframt > 0 && sfN)
761  {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
762  {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
763  xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
764  }
765  }
766  } while(sfN > 0);
767 
768 // See if we can recover without destroying the connection
769 //
770  retc = (retc < 0 ? errno : ECANCELED);
771  wrMutex.UnLock();
772  Log.Emsg("Link", retc, "send file to", ID);
773  return -1;
774 
775 #elif defined(__linux__) || defined(__GNU__)
776 
777  static const int setON = 1, setOFF = 0;
778  ssize_t retc = 0, bytesleft;
779  off_t myOffset;
780  int i, xfrbytes = 0, uncork = 1, xIntr = 0;
781 
782 // lock the link
783 //
784  wrMutex.Lock();
785  isIdle = 0;
786 
787 // In linux we need to cork the socket. On permanent errors we do not uncork
788 // the socket because it will be closed in short order.
789 //
790  if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
791  {Log.Emsg("Link", errno, "cork socket for", ID);
792  uncork = 0; sfOK = 0;
793  }
794 
795 // Send the header first
796 //
797  for (i = 0; i < sfN; sfP++, i++)
798  {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
799  else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
800  while(bytesleft
801  && (retc=sendfile(LinkInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
802  {bytesleft -= retc; xIntr++;}
803  }
804  if (retc < 0 && errno == EINTR) continue;
805  if (retc <= 0) break;
806  xfrbytes += sfP->sendsz;
807  }
808 
809 // Diagnose any sendfile errors
810 //
811  if (retc <= 0)
812  {if (retc == 0) errno = ECANCELED;
813  wrMutex.UnLock();
814  Log.Emsg("Link", errno, "send file to", ID);
815  return -1;
816  }
817 
818 // Now uncork the socket
819 //
820  if (uncork
821  && setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
822  Log.Emsg("Link", errno, "uncork socket for", ID);
823 
824 // All done
825 //
826  if (xIntr > sfN) SfIntr += (xIntr - sfN);
827  AtomicAdd(BytesOut, xfrbytes);
828  wrMutex.UnLock();
829  return xfrbytes;
830 
831 #else
832 
833  return -1;
834 
835 #endif
836 }
837 
838 /******************************************************************************/
839 /* Protected: s e n d D a t a */
840 /******************************************************************************/
841 
842 int XrdLinkXeq::sendData(const char *Buff, int Blen)
843 {
844  ssize_t retc = 0, bytesleft = Blen;
845 
846 // Write the data out
847 //
848  while(bytesleft)
849  {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
850  {if (errno == EINTR) continue;
851  else break;
852  }
853  bytesleft -= retc; Buff += retc;
854  }
855 
856 // All done
857 //
858  return retc;
859 }
860 
861 /******************************************************************************/
862 /* Protected: S e n d I O V */
863 /******************************************************************************/
864 
865 int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
866 {
867  ssize_t bytesleft, n, retc = 0;
868  const char *Buff;
869 
870 // Write the data out. On some version of Unix (e.g., Linux) a writev() may
871 // end at any time without writing all the bytes when directed to a socket.
872 // So, we attempt to resume the writev() using a combination of write() and
873 // a writev() continuation. This approach slowly converts a writev() to a
874 // series of writes if need be. We must do this inline because we must hold
875 // the lock until all the bytes are written or an error occurs.
876 //
877  bytesleft = static_cast<ssize_t>(bytes);
878  while(bytesleft)
879  {do {retc = writev(LinkInfo.FD, iov, iocnt);}
880  while(retc < 0 && errno == EINTR);
881  if (retc >= bytesleft || retc < 0) break;
882  bytesleft -= retc;
883  while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
884  {retc -= n; iov++; iocnt--;}
885  Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
886  while(n) {if ((retc = write(LinkInfo.FD, Buff, n)) < 0)
887  {if (errno == EINTR) continue;
888  else break;
889  }
890  n -= retc; Buff += retc; bytesleft -= retc;
891  }
892  if (retc < 0 || iocnt < 1) break;
893  }
894 
895 // All done
896 //
897  if (retc >= 0) return bytes;
898  Log.Emsg("Link", errno, "send to", ID);
899  return -1;
900 }
901 
902 /******************************************************************************/
903 /* s e t I D */
904 /******************************************************************************/
905 
906 void XrdLinkXeq::setID(const char *userid, int procid)
907 {
908  char buff[sizeof(Uname)], *bp, *sp;
909  int ulen;
910 
911  snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
912  ulen = strlen(buff);
913  sp = buff + ulen - 1;
914  bp = &Uname[sizeof(Uname)-1];
915  if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
916  *bp = '@'; bp--;
917  while(ulen--) {*bp = *sp; bp--; sp--;}
918  ID = bp+1;
919  Comment = (const char *)ID;
920 
921 // Update the ID in the TLS socket if enabled
922 //
923  if (isTLS) tlsIO.SetTraceID(ID);
924 }
925 
926 /******************************************************************************/
927 /* s e t N B */
928 /******************************************************************************/
929 
931 {
932 // We don't support non-blocking output except for Linux at the moment
933 //
934 #if !defined(__linux__)
935  return false;
936 #else
937 // Trace this request
938 //
939  TRACEI(DEBUG,"enabling non-blocking output");
940 
941 // If we don't already have a sendQ object get one. This is a one-time call
942 // so to optimize checking if this object exists we also get the opMutex.'
943 //
945  if (!sendQ)
946  {wrMutex.Lock();
947  sendQ = new XrdSendQ(*this, wrMutex);
948  wrMutex.UnLock();
949  }
951  return true;
952 #endif
953 }
954 
955 /******************************************************************************/
956 /* s e t P r o t o c o l */
957 /******************************************************************************/
958 
960 {
961 
962 // Set new protocol.
963 //
965  XrdProtocol *op = Protocol;
966  if (push) ProtoAlt = Protocol;
967  Protocol = pp;
969  return op;
970 }
971 
972 /******************************************************************************/
973 /* s e t P r o t N a m e */
974 /******************************************************************************/
975 
976 void XrdLinkXeq::setProtName(const char *name)
977 {
978 
979 // Set the protocol name.
980 //
982  Addr.SetDialect(name);
984 }
985 
986 /******************************************************************************/
987 /* s e t T L S */
988 /******************************************************************************/
989 
990 bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
991 { //???
992 // static const XrdTlsConnection::RW_Mode rwMode=XrdTlsConnection::TLS_RNB_WBL;
995  const char *eNote;
996  XrdTls::RC rc;
997 
998 // If we are already in a compatible mode, we are done
999 //
1000 
1001  if (isTLS == enable) return true;
1002 
1003 // If this is a shutdown, then do it now.
1004 //
1005  if (!enable)
1006  {tlsIO.Shutdown();
1007  isTLS = enable;
1008  Addr.SetTLS(enable);
1009  return true;
1010  }
1011 // We want to initialize TLS, do so now.
1012 //
1013  if (!ctx) ctx = tlsCtx;
1014  eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, false, ID);
1015 
1016 // Check for errors
1017 //
1018  if (eNote)
1019  {char buff[1024];
1020  snprintf(buff, sizeof(buff), "Unable to enable tls for %s;", ID);
1021  Log.Emsg("LinkXeq", buff, eNote);
1022  return false;
1023  }
1024 
1025 // Now we need to accept this TLS connection
1026 //
1027  std::string eMsg;
1028  rc = tlsIO.Accept(&eMsg);
1029 
1030 // Diagnose return state
1031 //
1032  if (rc != XrdTls::TLS_AOK) Log.Emsg("LinkXeq", eMsg.c_str());
1033  else {isTLS = enable;
1034  Addr.SetTLS(enable);
1035  Log.Emsg("LinkXeq", ID, "connection upgraded to", verTLS());
1036  }
1037  return rc == XrdTls::TLS_AOK;
1038 }
1039 
1040 /******************************************************************************/
1041 /* S F E r r o r */
1042 /******************************************************************************/
1043 
1045 {
1046  Log.Emsg("TLS", rc, "send file to", ID);
1047  return -1;
1048 }
1049 
1050 /******************************************************************************/
1051 /* S h u t d o w n */
1052 /******************************************************************************/
1053 
1054 void XrdLinkXeq::Shutdown(bool getLock)
1055 {
1056  int temp;
1057 
1058 // Trace the entry
1059 //
1060  TRACEI(DEBUG, (getLock ? "Async" : "Sync") <<" link shutdown in progress");
1061 
1062 // Get the lock if we need too (external entry via another thread)
1063 //
1064  if (getLock) LinkInfo.opMutex.Lock();
1065 
1066 // If there is something to do, do it now
1067 //
1068  temp = Instance; Instance = 0;
1069  if (!KeepFD)
1070  {shutdown(PollInfo.FD, SHUT_RDWR);
1071  if (dup2(devNull, PollInfo.FD) < 0)
1072  {Instance = temp;
1073  Log.Emsg("Link", errno, "shutdown FD for", ID);
1074  }
1075  }
1076 
1077 // All done
1078 //
1079  if (getLock) LinkInfo.opMutex.UnLock();
1080 }
1081 
1082 /******************************************************************************/
1083 /* S t a t s */
1084 /******************************************************************************/
1085 
1086 int XrdLinkXeq::Stats(char *buff, int blen, bool do_sync)
1087 {
1088  static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
1089  "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1090  "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1091  "<sfps>%d</sfps></stats>";
1092  int i;
1093 
1094 // Check if actual length wanted
1095 //
1096  if (!buff) return sizeof(statfmt)+17*6;
1097 
1098 // We must synchronize the statistical counters
1099 //
1100  if (do_sync) XrdLinkCtl::SyncAll();
1101 
1102 // Obtain lock on the stats area and format it
1103 //
1105  i = snprintf(buff, blen, statfmt, AtomicGet(LinkCount),
1115  return i;
1116 }
1117 
1118 /******************************************************************************/
1119 /* s y n c S t a t s */
1120 /******************************************************************************/
1121 
1122 void XrdLinkXeq::syncStats(int *ctime)
1123 {
1124  long long tmpLL;
1125  int tmpI4;
1126 
1127 // If this is dynamic, get the opMutex lock
1128 //
1129  if (!ctime) LinkInfo.opMutex.Lock();
1130 
1131 // Either the caller has the opMutex or this is called out of close. In either
1132 // case, we need to get the read and write mutexes; each followed by the stats
1133 // mutex. This order is important because we should not hold the stats mutex
1134 // for very long and the r/w mutexes may take a long time to acquire. If we
1135 // must maintain the link count we need to actually acquire the stats mutex as
1136 // we will be doing compound operations. Atomics are still used to keep other
1137 // threads from seeing partial results.
1138 //
1139  AtomicBeg(rdMutex);
1140 
1141  if (ctime)
1142  {*ctime = time(0) - LinkInfo.conTime;
1143  AtomicAdd(LinkConTime, *ctime);
1144  statsMutex.Lock();
1145  if (LinkCount > 0) AtomicDec(LinkCount);
1146  statsMutex.UnLock();
1147  }
1148 
1150 
1151  tmpLL = AtomicFAZ(BytesIn);
1152  AtomicAdd(LinkBytesIn, tmpLL); AtomicAdd(BytesInTot, tmpLL);
1153  tmpI4 = AtomicFAZ(tardyCnt);
1154  AtomicAdd(LinkTimeOuts, tmpI4); AtomicAdd(tardyCntTot, tmpI4);
1155  tmpI4 = AtomicFAZ(stallCnt);
1156  AtomicAdd(LinkStalls, tmpI4); AtomicAdd(stallCntTot, tmpI4);
1158 
1160  tmpLL = AtomicFAZ(BytesOut);
1161  AtomicAdd(LinkBytesOut, tmpLL); AtomicAdd(BytesOutTot, tmpLL);
1162  tmpI4 = AtomicFAZ(SfIntr);
1163  AtomicAdd(LinkSfIntr, tmpI4);
1165 
1166 // Make sure the protocol updates it's statistics as well
1167 //
1168  if (Protocol) Protocol->Stats(0, 0, 1);
1169 
1170 // All done
1171 //
1172  if (!ctime) LinkInfo.opMutex.UnLock();
1173 }
1174 
1175 /******************************************************************************/
1176 /* Protected: T L S _ E r r o r */
1177 /******************************************************************************/
1178 
1179 int XrdLinkXeq::TLS_Error(const char *act, XrdTls::RC rc)
1180 {
1181  std::string reason = XrdTls::RC2Text(rc);
1182  char msg[512];
1183 
1184  snprintf(msg, sizeof(msg), "Unable to %s %s;", act, ID);
1185  Log.Emsg("TLS", msg, reason.c_str());
1186  return -1;
1187 }
1188 
1189 /******************************************************************************/
1190 /* T L S _ P e e k */
1191 /******************************************************************************/
1192 
1193 int XrdLinkXeq::TLS_Peek(char *Buff, int Blen, int timeout)
1194 {
1195  XrdSysMutexHelper theMutex;
1196  XrdTls::RC retc;
1197  int rc, rlen;
1198 
1199 // Lock the read mutex if we need to, the helper will unlock it upon exit
1200 //
1201  if (LockReads) theMutex.Lock(&rdMutex);
1202 
1203 // Wait until we can actually read something
1204 //
1205  isIdle = 0;
1206  if (timeout)
1207  {rc = Wait4Data(timeout);
1208  if (rc < 1) return rc;
1209  }
1210 
1211 // Do the peek and if sucessful, the number of bytes available.
1212 //
1213  retc = tlsIO.Peek(Buff, Blen, rlen);
1214  if (retc == XrdTls::TLS_AOK) return rlen;
1215 
1216 // Dianose the TLS error and return failure
1217 //
1218  return TLS_Error("peek on", retc);
1219 }
1220 
1221 /******************************************************************************/
1222 /* T L S _ R e c v */
1223 /******************************************************************************/
1224 
1225 int XrdLinkXeq::TLS_Recv(char *Buff, int Blen)
1226 {
1227  XrdSysMutexHelper theMutex;
1228  XrdTls::RC retc;
1229  int rlen;
1230 
1231 // Lock the read mutex if we need to, the helper will unlock it upon exit
1232 //
1233  if (LockReads) theMutex.Lock(&rdMutex);
1234 
1235 // Note that we will read only as much as is queued. Use Recv() with a
1236 // timeout to receive as much data as possible.
1237 //
1238  isIdle = 0;
1239  retc = tlsIO.Read(Buff, Blen, rlen);
1240  if (retc != XrdTls::TLS_AOK) return TLS_Error("receive from", retc);
1241  if (rlen > 0) AtomicAdd(BytesIn, rlen);
1242  return rlen;
1243 }
1244 
1245 /******************************************************************************/
1246 
1247 int XrdLinkXeq::TLS_Recv(char *Buff, int Blen, int timeout, bool havelock)
1248 {
1249  XrdSysMutexHelper theMutex;
1250  XrdTls::RC retc;
1251  int pend, rlen, totlen = 0;
1252 
1253 // Lock the read mutex if we need to, the helper will unlock it upon exit
1254 //
1255  if (LockReads && !havelock) theMutex.Lock(&rdMutex);
1256 
1257 // Wait up to timeout milliseconds for data to arrive
1258 //
1259  isIdle = 0;
1260  while(Blen > 0)
1261  {pend = tlsIO.Pending(true);
1262  if (!pend) pend = Wait4Data(timeout);
1263  if (pend < 1)
1264  {if (pend < 0) return -1;
1265  tardyCnt++;
1266  if (totlen)
1267  {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
1268  AtomicAdd(BytesIn, totlen);
1269  }
1270  return totlen;
1271  }
1272 
1273  // Read as much data as you can. Note that we will force an error
1274  // if we get a zero-length read after poll said it was OK. However,
1275  // if we never read anything, then we simply return -ENOMSG to avoid
1276  // generating a "read link error" as clearly there was a hangup.
1277  //
1278  retc = tlsIO.Read(Buff, Blen, rlen);
1279  if (retc != XrdTls::TLS_AOK)
1280  {if (!totlen) return -ENOMSG;
1281  AtomicAdd(BytesIn, totlen);
1282  return TLS_Error("receive from", retc);
1283  }
1284  if (rlen <= 0) break;
1285  totlen += rlen; Blen -= rlen; Buff += rlen;
1286  }
1287 
1288  AtomicAdd(BytesIn, totlen);
1289  return totlen;
1290 }
1291 
1292 /******************************************************************************/
1293 
1294 int XrdLinkXeq::TLS_Recv(const struct iovec *iov, int iocnt, int timeout)
1295 {
1296  XrdSysMutexHelper theMutex;
1297  char *Buff;
1298  int Blen, rlen, totlen = 0;
1299 
1300 // Lock the read mutex if we need to, the helper will unlock it upon exit
1301 //
1302  if (LockReads) theMutex.Lock(&rdMutex);
1303 
1304 // Individually process each element until we can't read any more
1305 //
1306  isIdle = 0;
1307  for (int i = 0; i < iocnt; i++)
1308  {Buff = (char *)iov[i].iov_base;
1309  Blen = iov[i].iov_len;
1310  rlen = TLS_Recv(Buff, Blen, timeout, true);
1311  if (rlen <= 0) break;
1312  totlen += rlen;
1313  if (rlen < Blen) break;
1314  }
1315 
1316  if (totlen) {AtomicAdd(BytesIn, totlen);}
1317  return totlen;
1318 }
1319 
1320 /******************************************************************************/
1321 /* T L S _ R e c v A l l */
1322 /******************************************************************************/
1323 
1324 int XrdLinkXeq::TLS_RecvAll(char *Buff, int Blen, int timeout)
1325 {
1326  int retc;
1327 
1328 // Check if timeout specified. Notice that the timeout is the max we will
1329 // wait for some data. We will wait forever for all the data. Yeah, it's weird.
1330 //
1331  if (timeout >= 0)
1332  {retc = tlsIO.Pending(true);
1333  if (!retc) retc = Wait4Data(timeout);
1334  if (retc < 1) return (retc ? -1 : -ETIMEDOUT);
1335  }
1336 
1337 // Note that we will block until we receive all the bytes.
1338 //
1339  return TLS_Recv(Buff, Blen, -1);
1340 }
1341 
1342 /******************************************************************************/
1343 /* T L S _ S e n d */
1344 /******************************************************************************/
1345 
1346 int XrdLinkXeq::TLS_Send(const char *Buff, int Blen)
1347 {
1349  ssize_t bytesleft = Blen;
1350  XrdTls::RC retc;
1351  int byteswritten;
1352 
1353 // Prepare to send
1354 //
1355  isIdle = 0;
1356  AtomicAdd(BytesOut, Blen);
1357 
1358 // Do non-blocking writes if we are setup to do so.
1359 //
1360  if (sendQ) return sendQ->Send(Buff, Blen);
1361 
1362 // Write the data out
1363 //
1364  while(bytesleft)
1365  {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1366  if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1367  bytesleft -= byteswritten; Buff += byteswritten;
1368  }
1369 
1370 // All done
1371 //
1372  return Blen;
1373 }
1374 
1375 /******************************************************************************/
1376 
1377 int XrdLinkXeq::TLS_Send(const struct iovec *iov, int iocnt, int bytes)
1378 {
1380  XrdTls::RC retc;
1381  int byteswritten;
1382 
1383 // Get a lock and assume we will be successful (statistically we are). Note
1384 // that the calling interface gauranteed bytes are not zero.
1385 //
1386  isIdle = 0;
1387  AtomicAdd(BytesOut, bytes);
1388 
1389 // Do non-blocking writes if we are setup to do so.
1390 //
1391  if (sendQ) return sendQ->Send(iov, iocnt, bytes);
1392 
1393 // Write the data out.
1394 //
1395  for (int i = 0; i < iocnt; i++)
1396  {ssize_t bytesleft = iov[i].iov_len;
1397  char *Buff = (char *)iov[i].iov_base;
1398  while(bytesleft)
1399  {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1400  if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1401  bytesleft -= byteswritten; Buff += byteswritten;
1402  }
1403  }
1404 
1405 // All done
1406 //
1407  return bytes;
1408 }
1409 
1410 /******************************************************************************/
1411 
1412 int XrdLinkXeq::TLS_Send(const sfVec *sfP, int sfN)
1413 {
1415  int bytes, buffsz, fileFD, retc;
1416  off_t offset;
1417  ssize_t totamt = 0;
1418  char myBuff[65536];
1419 
1420 // Convert the sendfile to a regular send. The conversion is not particularly
1421 // fast and caller are advised to avoid using sendfile on TLS connections.
1422 //
1423  isIdle = 0;
1424  for (int i = 0; i < sfN; sfP++, i++)
1425  {if (!(bytes = sfP->sendsz)) continue;
1426  totamt += bytes;
1427  if (sfP->fdnum < 0)
1428  {if (!TLS_Write(sfP->buffer, bytes)) return -1;
1429  continue;
1430  }
1431  offset = sfP->offset;
1432  fileFD = sfP->fdnum;
1433  buffsz = (bytes < (int)sizeof(myBuff) ? bytes : sizeof(myBuff));
1434  do {do {retc = pread(fileFD, myBuff, buffsz, offset);}
1435  while(retc < 0 && errno == EINTR);
1436  if (retc < 0) return SFError(errno);
1437  if (!retc) break;
1438  if (!TLS_Write(myBuff, buffsz)) return -1;
1439  offset += buffsz; bytes -= buffsz; totamt += retc;
1440  } while(bytes > 0);
1441  }
1442 
1443 // We are done
1444 //
1445  AtomicAdd(BytesOut, totamt);
1446  return totamt;
1447 }
1448 
1449 /******************************************************************************/
1450 /* Protected: T L S _ W r i t e */
1451 /******************************************************************************/
1452 
1453 bool XrdLinkXeq::TLS_Write(const char *Buff, int Blen)
1454 {
1455  XrdTls::RC retc;
1456  int byteswritten;
1457 
1458 // Write the data out
1459 //
1460  while(Blen)
1461  {retc = tlsIO.Write(Buff, Blen, byteswritten);
1462  if (retc != XrdTls::TLS_AOK)
1463  {TLS_Error("write to", retc);
1464  return false;
1465  }
1466  Blen -= byteswritten; Buff += byteswritten;
1467  }
1468 
1469 // All done
1470 //
1471  return true;
1472 }
1473 
1474 /******************************************************************************/
1475 /* v e r T L S */
1476 /******************************************************************************/
1477 
1478 const char *XrdLinkXeq::verTLS()
1479 {
1480  return tlsIO.Version();
1481 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
ssize_t readv(int fildes, const struct iovec *iov, int iovcnt)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t writev(int fildes, const struct iovec *iov, int iovcnt)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
#define eMsg(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
const char * Comment
Definition: XrdJob.hh:47
static void SyncAll()
Synchronize statustics for ll links.
Definition: XrdLinkCtl.cc:374
static void Unhook(int fd)
Unhook a link from the active table of links.
Definition: XrdLinkCtl.cc:392
time_t conTime
Definition: XrdLinkInfo.hh:44
void Reset()
Definition: XrdLinkInfo.hh:52
char * Etext
Definition: XrdLinkInfo.hh:45
XrdSysRecMutex opMutex
Definition: XrdLinkInfo.hh:46
XrdSysCondVar * KillcvP
Definition: XrdLinkInfo.hh:42
static const char * TraceID
Definition: XrdLinkXeq.hh:157
int TLS_Send(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1346
long long BytesOut
Definition: XrdLinkXeq.hh:172
int TLS_Error(const char *act, XrdTls::RC rc)
Definition: XrdLinkXeq.cc:1179
int TLS_Peek(char *Buff, int Blen, int timeout)
Definition: XrdLinkXeq.cc:1193
int stallCntTot
Definition: XrdLinkXeq.hh:175
int Client(char *buff, int blen)
Definition: XrdLinkXeq.cc:168
char Uname[24]
Definition: XrdLinkXeq.hh:200
XrdTlsPeerCerts * getPeerCerts()
Definition: XrdLinkXeq.cc:337
static int LinkCountMax
Definition: XrdLinkXeq.hh:166
XrdLinkInfo LinkInfo
Definition: XrdLinkXeq.hh:144
XrdProtocol * ProtoAlt
Definition: XrdLinkXeq.hh:184
int Close(bool defer=false)
Definition: XrdLinkXeq.cc:188
XrdNetAddr Addr
Definition: XrdLinkXeq.hh:192
int TLS_Recv(char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1225
int sendData(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:842
long long BytesInTot
Definition: XrdLinkXeq.hh:171
bool TLS_Write(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1453
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
Definition: XrdLinkXeq.cc:865
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
Definition: XrdLinkXeq.cc:959
static long long LinkCountTot
Definition: XrdLinkXeq.hh:164
long long BytesOutTot
Definition: XrdLinkXeq.hh:173
void Shutdown(bool getLock)
Definition: XrdLinkXeq.cc:1054
int Peek(char *buff, int blen, int timeout=-1)
Definition: XrdLinkXeq.cc:346
static int LinkCount
Definition: XrdLinkXeq.hh:165
void Reset()
Definition: XrdLinkXeq.cc:129
int Backlog()
Definition: XrdLinkXeq.cc:155
XrdSysMutex wrMutex
Definition: XrdLinkXeq.hh:194
static int Stats(char *buff, int blen, bool do_sync=false)
Definition: XrdLinkXeq.cc:1086
XrdSendQ * sendQ
Definition: XrdLinkXeq.hh:195
XrdPollInfo PollInfo
Definition: XrdLinkXeq.hh:145
void setID(const char *userid, int procid)
Definition: XrdLinkXeq.cc:906
bool LockReads
Definition: XrdLinkXeq.hh:197
int Recv(char *buff, int blen)
Definition: XrdLinkXeq.cc:389
static long long LinkBytesIn
Definition: XrdLinkXeq.hh:161
int TLS_RecvAll(char *Buff, int Blen, int timeout)
Definition: XrdLinkXeq.cc:1324
int SFError(int rc)
Definition: XrdLinkXeq.cc:1044
long long BytesIn
Definition: XrdLinkXeq.hh:170
int tardyCntTot
Definition: XrdLinkXeq.hh:177
int Send(const char *buff, int blen)
Definition: XrdLinkXeq.cc:608
XrdSysMutex rdMutex
Definition: XrdLinkXeq.hh:193
const char * verTLS()
Definition: XrdLinkXeq.cc:1478
bool setNB()
Definition: XrdLinkXeq.cc:930
int RecvIOV(const struct iovec *iov, int iocnt)
Definition: XrdLinkXeq.cc:567
char Lname[256]
Definition: XrdLinkXeq.hh:201
static long long LinkConTime
Definition: XrdLinkXeq.hh:163
static int LinkSfIntr
Definition: XrdLinkXeq.hh:169
XrdTlsSocket tlsIO
Definition: XrdLinkXeq.hh:188
void DoIt()
Definition: XrdLinkXeq.cc:308
int RecvAll(char *buff, int blen, int timeout=-1)
Definition: XrdLinkXeq.cc:525
XrdProtocol * Protocol
Definition: XrdLinkXeq.hh:183
bool Register(const char *hName)
Definition: XrdLinkXeq.cc:589
static XrdSysMutex statsMutex
Definition: XrdLinkXeq.hh:179
void setProtName(const char *name)
Definition: XrdLinkXeq.cc:976
static int LinkStalls
Definition: XrdLinkXeq.hh:168
static long long LinkBytesOut
Definition: XrdLinkXeq.hh:162
void syncStats(int *ctime=0)
Definition: XrdLinkXeq.cc:1122
bool setTLS(bool enable, XrdTlsContext *ctx=0)
Definition: XrdLinkXeq.cc:990
static int LinkTimeOuts
Definition: XrdLinkXeq.hh:167
void SetDialect(const char *dP)
Definition: XrdNetAddr.hh:205
bool Register(const char *hName)
Definition: XrdNetAddr.cc:172
void SetTLS(bool val)
Definition: XrdNetAddr.cc:582
void Zorch()
Definition: XrdPollInfo.hh:49
XrdPoll * Poller
Definition: XrdPollInfo.hh:43
virtual int Enable(XrdPollInfo &pInfo)=0
static char * Poll2Text(short events)
Definition: XrdPoll.cc:272
static void Detach(XrdPollInfo &pInfo)
Definition: XrdPoll.cc:177
virtual void Recycle(XrdLink *lp=0, int consec=0, const char *reason=0)=0
virtual int Stats(char *buff, int blen, int do_sync=0)=0
virtual int Process(XrdLink *lp)=0
void Terminate(XrdLink *lP=0)
Definition: XrdSendQ.cc:396
int Send(const char *buff, int blen)
Definition: XrdSendQ.cc:230
unsigned int Backlog()
Definition: XrdSendQ.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
Definition: XrdTcpMonPin.hh:61
long long bytesOut
Bytes written to the socket.
Definition: XrdTcpMonPin.hh:64
int consec
Seconds connected.
Definition: XrdTcpMonPin.hh:62
virtual void Monitor(XrdNetAddrInfo &netInfo, LinkInfo &lnkInfo, int liLen)=0
long long bytesIn
Bytes read from the socket.
Definition: XrdTcpMonPin.hh:63
const char * tident
Pointer to the client's trace identifier.
Definition: XrdTcpMonPin.hh:60
@ TLS_HS_BLOCK
Always block during handshake.
Definition: XrdTlsSocket.hh:53
XrdTls::RC Accept(std::string *eMsg=0)
void Shutdown(SDType=sdImmed)
@ TLS_RBL_WBL
blocking read blocking write
Definition: XrdTlsSocket.hh:48
XrdTls::RC Write(const char *buffer, size_t size, int &bytesOut)
const char * Version()
XrdTls::RC Read(char *buffer, size_t size, int &bytesRead)
Read from the TLS connection. If necessary, a handshake will be done.
const char * Init(XrdTlsContext &ctx, int sfd, RW_Mode rwm, HS_Mode hsm, bool isClient, bool serial=true, const char *tid="")
void SetTraceID(const char *tid)
int Pending(bool any=true)
XrdTls::RC Peek(char *buffer, size_t size, int &bytesPeek)
XrdTlsPeerCerts * getCerts(bool ver=true)
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
Definition: XrdTls.cc:127
@ TLS_AOK
All went well, will always be zero.
Definition: XrdTls.hh:40
XrdTlsContext * tlsCtx
Definition: XrdGlobals.cc:52
XrdTcpMonPin * TcpMonPin
Definition: XrdLinkXeq.cc:96
XrdSysError Log
Definition: XrdConfig.cc:111
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
int maxIOV
Definition: XrdLinkXeq.cc:98
int devNull
Definition: XrdGlobals.cc:55
int fdnum
File descriptor for data.
Definition: XrdOucSFVec.hh:47
int sendsz
Length of data at offset.
Definition: XrdOucSFVec.hh:46