XRootD
Loading...
Searching...
No Matches
XrdFrmXfrQueue.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r m X f r Q u e u e . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <strings.h>
33#include <cstdio>
34#include <fcntl.h>
35#include <unistd.h>
36#include <utime.h>
37#include <sys/param.h>
38#include <sys/types.h>
39#include <sys/stat.h>
40
42#include "XrdFrc/XrdFrcTrace.hh"
46#include "XrdNet/XrdNetMsg.hh"
47#include "XrdOss/XrdOss.hh"
48#include "XrdOuc/XrdOucTList.hh"
49#include "XrdSys/XrdSysError.hh"
50#include "XrdSys/XrdSysFD.hh"
51#include "XrdSys/XrdSysTimer.hh"
53
54using namespace XrdFrc;
55using namespace XrdFrm;
56
57/******************************************************************************/
58/* S t a t i c s */
59/******************************************************************************/
60
61XrdSysMutex XrdFrmXfrQueue::hMutex;
62XrdOucHash<XrdFrmXfrJob> XrdFrmXfrQueue::hTab;
63
64XrdSysMutex XrdFrmXfrQueue::qMutex;
65XrdSysSemaphore XrdFrmXfrQueue::qReady(0);
66
67XrdFrmXfrQueue::theQueue XrdFrmXfrQueue::xfrQ[XrdFrcRequest::numQ];
68
69/******************************************************************************/
70/* Public: A d d */
71/******************************************************************************/
72
74{
75 XrdFrmXfrJob *xP;
76 struct stat buf;
77 const char *xfrType = xfrName(*rP, qNum);
78 char *Lfn, lclpath[MAXPATHLEN];
79 int Outgoing = (qNum & XrdFrcRequest::outQ);
80
81// Validate queue number
82//
83 if (qNum < 0 || qNum >= XrdFrcRequest::numQ-1)
84 {sprintf(lclpath, "%d", qNum);
85 Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN);
86 if (reqFQ) reqFQ->Del(rP);
87 return 0;
88 }
89
90// First check if this request is active or pending. If it's an inbound request
91// then only the lfn matters regardless of source. For outgoing requests then
92// the lfn plus the target only matters.
93//
94 Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO);
95 hMutex.Lock();
96 if ((xP = hTab.Find(Lfn)))
98 && strcmp(xP->reqData.Notify, rP->Notify))
99 {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList);
100 xP->NoteList = tP;
101 }
102 hMutex.UnLock();
103 if (Config.Verbose || Trace.What & TRACE_Debug)
104 {sprintf(lclpath, " in progress; %s skipped for ", xfrType);
105 Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User);
106 }
107 if (reqFQ) reqFQ->Del(rP);
108 return 0;
109 }
110 hMutex.UnLock();
111
112// Obtain the local name
113//
114 if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16))
115 {if (reqFQ) reqFQ->Del(rP);
116 return Notify(rP, qNum, 1, "Unable to generate pfn");
117 }
118
119// Check if the file exists or not. For incoming requests, the file must not
120// exist. For outgoing requests the file must exist.
121//
122 if (Config.Stat((rP->LFN)+rP->LFO, lclpath, &buf))
123 {if (Outgoing)
124 {if (Config.Verbose || Trace.What & TRACE_Debug)
125 Say.Say(0, xfrType,"skipped; ",lclpath," not resident.");
126 if (reqFQ) reqFQ->Del(rP);
127 return Notify(rP, qNum, 2, "file not resident");
128 }
129 } else {
130 if (!Outgoing)
131 {if (Config.Verbose || Trace.What & TRACE_Debug)
132 Say.Say(0, xfrType, "skipped; ", lclpath, " exists.");
133 if (reqFQ) reqFQ->Del(rP);
134 return Notify(rP, qNum, 0);
135 }
136 }
137
138// Obtain a queue slot, we may block until one is available
139//
140 do {qMutex.Lock();
141 if ((xP = xfrQ[qNum].Free)) break;
142 qMutex.UnLock();
143 xfrQ[qNum].Avail.Wait();
144 } while(!xP);
145 xfrQ[qNum].Free = xP->Next;
146 qMutex.UnLock();
147
148// Initialize the slot
149//
150 xP->Next = 0;
151 xP->NoteList = 0;
152 xP->reqFQ = reqFQ;
153 xP->reqData = *rP;
154 xP->reqFile = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO);
155 strcpy(xP->PFN, lclpath);
156 xP->pfnEnd = strlen(lclpath);
157 xP->RetCode = 0;
158 xP->qNum = qNum;
159 xP->Act =*xfrType;
160 xP->Type = xfrType+1;
161
162// Add this to the table of requests
163//
164 hMutex.Lock();
165 hTab.Add(xP->reqFile, xP, 0, Hash_keep);
166 hMutex.UnLock();
167
168// Place request in the appropriate transfer queue
169//
170 qMutex.Lock();
171 if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
172 else xfrQ[qNum].Last = xfrQ[qNum].First = xP;
173 qMutex.UnLock();
174 qReady.Post();
175
176// All done
177//
178 return 1;
179}
180
181/******************************************************************************/
182/* Public: D o n e */
183/******************************************************************************/
184
185void XrdFrmXfrQueue::Done(XrdFrmXfrJob *xP, const char *Msg)
186{
187 XrdOucTList *tP;
188
189// Send notifications to everyone that wants it that this job is done
190//
191 do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg);
192 if ((tP = xP->NoteList))
193 {strcpy(xP->reqData.Notify, tP->text);
194 xP->NoteList = tP->next;
195 delete tP;
196 }
197 } while(tP);
198
199// Remove this job from the queue file
200//
201 if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData));
202
203// Remove this job from the active table
204//
205 hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock();
206
207// Place job element on the free queue
208//
209 qMutex.Lock();
210 xP->Next = xfrQ[xP->qNum].Free;
211 xfrQ[xP->qNum].Free = xP;
212 xfrQ[xP->qNum].Avail.Post();
213 qMutex.UnLock();
214}
215
216/******************************************************************************/
217/* Public: G e t */
218/******************************************************************************/
219
221{
222 XrdFrmXfrJob *xfrP;
223
224// Wait for an available job and return it
225//
226 do {qReady.Wait();} while(!(xfrP = Pull(ioQType)));
227 return xfrP;
228}
229
230/******************************************************************************/
231/* I n i t */
232/******************************************************************************/
233
234void *InitStop(void *parg)
236 return (void *)0;
237}
238
240{
241 static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"};
242 static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"};
243 XrdFrmXfrJob *xP;
244 pthread_t tid;
245 char StopFile[1024], *fnSfx;
246 int n, qNum, retc;
247
248// Prepare to initialize the queues
249//
250 strcpy(StopFile, Config.AdminPath);
251 strcat(StopFile, "STOP");
252 fnSfx = StopFile + strlen(StopFile);
253
254// Initialize each queue
255//
256 for (qNum= 0; qNum < XrdFrcRequest::numQ-1; qNum++)
257 {
258
259 // Initialize the stop file name and set the queue name and number
260 //
261 strcpy(fnSfx, StopFN[qNum]);
262 xfrQ[qNum].File = strdup(StopFile);
263 xfrQ[qNum].Name = StopQN[qNum];
264 xfrQ[qNum].qNum = qNum;
265
266 // Start the stop file monitor thread for this queue
267 //
268 if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum],
269 XRDSYSTHREAD_BIND, "Stopfile monitor")))
270 {Say.Emsg("main", retc, "create stopfile thread"); return 0;}
271
272 // Create twice as many free queue elements as we have xfr agents for the
273 // queue. This prevents stalls when a particular queue is stopped but keeps
274 // us from exceeding internal resources when we get flooded with requests.
275 //
276 n = Config.xfrMax*2;
277 while(n--)
278 {xP = new XrdFrmXfrJob;
279 xP->Next = xfrQ[qNum].Free;
280 xfrQ[qNum].Free = xP;
281 xfrQ[qNum].Avail.Post();
282 }
283 }
284
285// All done
286//
287 return 1;
288}
289
290/******************************************************************************/
291/* Private: P u l l */
292/******************************************************************************/
293
294XrdFrmXfrJob *XrdFrmXfrQueue::Pull(int ioQType)
295{
296 static bool ioX = false, prevQ[2] = {0,0};
297 XrdFrmXfrJob *xfrP;
298 int pikQ, theQ, Q1, Q2, nSel = 1;
299
300// Setup to pick a request equally multiplexing between all possible queues
301//
302 qMutex.Lock();
303do{if (!ioQType) ioX = !ioX;
304 else {ioX = (ioQType < 0 ? 1 : 0); nSel = 0;}
305 if (ioX) {Q1 = XrdFrcRequest::migQ; Q2 = XrdFrcRequest::putQ; pikQ = 1;}
306 else {Q1 = XrdFrcRequest::stgQ; Q2 = XrdFrcRequest::getQ; pikQ = 0;}
307
308// Check if we should avoid either queue because it is stopped
309//
310 if (xfrQ[Q1].Stop || Stopped(Q1)) Q1 = XrdFrcRequest::nilQ;
311 if (xfrQ[Q2].Stop || Stopped(Q2)) Q2 = XrdFrcRequest::nilQ;
312
313// Pick the oldest possible request
314//
315 if (xfrQ[Q1].First && xfrQ[Q2].First)
316 { if (xfrQ[Q1].First->reqData.addTOD < xfrQ[Q2].First->reqData.addTOD)
317 theQ = Q1;
318 else if (xfrQ[Q1].First->reqData.addTOD > xfrQ[Q2].First->reqData.addTOD)
319 theQ = Q2;
320 else theQ = (prevQ[pikQ] == Q1 ? Q2 : Q1);
321 }else theQ = (xfrQ[Q1].First ? Q1 : Q2);
322
323// Dequeue the request (we may have an empty selectoin here)
324//
325 if ((xfrP = xfrQ[theQ].First)
326 && !(xfrQ[theQ].First = xfrP->Next)) xfrQ[theQ].Last = 0;
327 } while(!xfrP && nSel--);
328
329// Return the job, if any
330//
331 prevQ[pikQ] = theQ;
332 qMutex.UnLock();
333 return xfrP;
334}
335
336/******************************************************************************/
337/* Private: N o t i f y */
338/******************************************************************************/
339
340int XrdFrmXfrQueue::Notify(XrdFrcRequest *rP, int qNum, int rc, const char *msg)
341{
342 static const char *isFile = "file:///";
343 static const int lnFile = 8;
344 static const char *isUDP = "udp://";
345 static const int lnUDP = 6;
346 static const char *qOpr[] = {"stage", "migr", "get", "put"};
347 char msgbuff[4096], *nP, *mP = rP->Notify;
348 int n;
349
350// Check if message really needs to be sent
351//
352 if ((!rc && !(rP->Options & XrdFrcRequest::msgSucc))
353 || ( rc && !(rP->Options & XrdFrcRequest::msgFail))) return 0;
354
355// Multiple destinations can be specified, each destination separated by a
356// carriable rturn. We don't screen out duplicates.
357//
358do{if ((nP = index(rP->Notify, '\r'))) *nP++ = '\0';
359
360// Check for file destination
361//
362 if (!strncmp(mP, isFile, lnFile))
363 {if (rc) n = sprintf(msgbuff, "%s %s %s %s\n", qOpr[qNum],
364 (rc > 1 ? "ENOENT":"BAD"), rP->LFN, (msg ? msg:"?"));
365 else n = sprintf(msgbuff, "stage OK %s\n", rP->LFN);
366 Send2File(mP+lnFile, msgbuff, n);
367 }
368
369// Check for udp destination
370//
371 else if (!strncmp(mP, isUDP, lnUDP))
372 {char *txtP, *dstP = mP+lnUDP;
373 if ((txtP = index(dstP, '/'))) *txtP++ = '\0';
374 else txtP = (char *)"";
375 n = sprintf(msgbuff, "%s %s %s %s", (rc ? "unprep" : "ready"),
376 rP->ID, txtP, rP->LFN);
377 Send2UDP(dstP, msgbuff, n);
378 }
379
380// Issue warning as we don't yet support mail or tcp notifications
381//
382 else if (*mP != '-')
383 Say.Emsg("Notify", "Unsupported notification path '", mP, "'.");
384 } while((mP = nP));
385
386// All done
387//
388 return 0;
389}
390
391/******************************************************************************/
392/* Private: S e n d 2 F i l e */
393/******************************************************************************/
394
395void XrdFrmXfrQueue::Send2File(char *Dest, char *Msg, int Mln)
396{
397 EPNAME("Notify");
398 int FD;
399
400// Do some debugging
401//
402 DEBUG("sending '" <<Msg <<"' via " <<Dest);
403
404// Open the file
405//
406 if ((FD = XrdSysFD_Open(Dest, O_WRONLY)) < 0)
407 {Say.Emsg("Notify", errno, "send notification via", Dest); return;}
408
409// Write the message
410//
411 if (write(FD, Msg, Mln) < 0)
412 Say.Emsg("Notify", errno, "send notification via", Dest);
413 close(FD);
414}
415
416/******************************************************************************/
417/* Private: S e n d 2 U D P */
418/******************************************************************************/
419
420void XrdFrmXfrQueue::Send2UDP(char *Dest, char *Msg, int Mln)
421{
422 EPNAME("Notify");
423 static XrdNetMsg Relay(&Say, 0);
424
425// Do some debugging
426//
427 DEBUG("sending '" <<Msg <<"' via " <<Dest);
428
429// Send off the message
430//
431 Relay.Send(Msg, Mln, Dest);
432}
433
434/******************************************************************************/
435/* Public: S t o p M o n */
436/******************************************************************************/
437
439{
440 struct theQueue *monQ = (struct theQueue *)parg;
441 XrdFrmXfrJob *xP;
442 struct stat buf;
443 char theMsg[80];
444 int Cnt;
445
446// Establish which message to produce
447//
448 sprintf(theMsg, "exists; %s transfers suspended.", monQ->Name);
449
450// Wait until someone needs to tell us to check for a stop file
451//
452 while(1)
453 {monQ->Alert.Wait();
454 Cnt = 0;
455 while(!stat(monQ->File, &buf))
456 {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;}
458 }
459 qMutex.Lock();
460 monQ->Stop = 0;
461 xP = monQ->First;
462 while(xP) {qReady.Post(); xP = xP->Next;}
463 qMutex.UnLock();
464 }
465}
466
467/******************************************************************************/
468/* Private: S t o p p e d */
469/******************************************************************************/
470
471int XrdFrmXfrQueue::Stopped(int qNum) // Called with qMutex locked!
472{
473 struct stat buf;
474
475// Check for stop file existence. If it exists and the queue has not been
476// stopped; stop it and alert the stop file monitor.
477//
478 if (stat(xfrQ[qNum].File, &buf)) return 0;
479 if (!xfrQ[qNum].Stop) {xfrQ[qNum].Stop = 1; xfrQ[qNum].Alert.Post();}
480 return 1;
481}
482
483/******************************************************************************/
484/* Private: x f r N a m e */
485/******************************************************************************/
486
487const char *XrdFrmXfrQueue::xfrName(XrdFrcRequest &reqData, int qNum)
488{
489
490// Return a human name for this transfer:
491// Migrate
492// Migr+rm
493// Staging
494// CopyIn
495// CopyOut
496// Copy+rm
497//
498 switch(qNum)
500 return "1CopyIn ";
501 break;
503 return (reqData.Options & XrdFrcRequest::Purge ?
504 "3Migr+rm ":"2Migrate ");
505 break;
507 return (reqData.Options&XrdFrcRequest::Purge ?
508 "5Copy+rm " : "4CopyOut ");
509 break;
511 return "6Staging ";
512 break;
513 default: break;
514 }
515
516 return "0Unknown ";
517}
#define DEBUG(x)
#define EPNAME(x)
XrdOucPup XrdCmsParser::Pup & Say
#define TRACE_Debug
XrdOucTList * NoteList
char PFN[MAXPATHLEN+16]
const char * Type
XrdFrmXfrJob * Next
XrdFrcRequest reqData
XrdFrcReqFile * reqFQ
void * InitStop(void *parg)
@ Hash_keep
Definition XrdOucHash.hh:55
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
#define XRDSYSTHREAD_BIND
void Del(XrdFrcRequest *rP)
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
static const int putQ
static const int Purge
static const int msgFail
static const int msgSucc
static const int outQ
static const int numQ
char Notify[512]
static const int nilQ
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
static void StopMon(void *parg)
static XrdFrmXfrJob * Get(int ioQType)
static int Init()
XrdOucTList * next
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
XrdOucTrace Trace
XrdFrmConfig Config