%include global { const int MAX_JOBS_PER_SERVER = 2; enum eQueuedWorkSubscripts { // NOTE: update these if the prototype for JobController::queueJob changes WORK_CLIENT_OBJ, WORK_CLIENT_INFO, WORK_CLASS_NAME, WORK_ACL, WORK_ARGS }; }; class Standard . ForwardReply { /*! When developing fault-tolerant services, programmers sometimes need to be sensitive to the possibility of a failure occurring in the middle of an RPC-style method invocation because this could cause an application to block indefinitely waiting for a response from a failed server. The class=ForwardReply class can be used to conveniently intercept and redirect reply messages, thus enabling RPC-style method invocations to be converted into an event-driven dialogue. !*/ string forwardAsMethodName; oid forwardToObj; } inherits from Object; ForwardReply:create(string fwdMethodName, oid fwdToObj) { /*! Any reply messages will be forwarded to the object indicated by fwdToObj. The method name indicated by fwdMethodName will be used instead of reply. The usage is demonstrated below:
    replyObj = send "createObject"("ForwardReply", acl, "response",
        thisObject) to ObjectCreator;
    send "request"(args) from replyObj;
    // The reply will be received by the "response" method.
!*/ forwardAsMethodName = fwdMethodName; forwardToObj = fwdToObj; } ForwardReply:delete() {} ForwardReply:reply(any result) { send (forwardAsMethodName)(result) to forwardToObj from fromObject; } class Standard . JobController { /*! Provides a service that distributes tasks across a load-balanced pool of servers. !*/ string poolServiceName; oid peerRegistry; int serverIdCounter; assoc serverList; array serverOID; array serverLoad; array serverForJob; array queuedWork; int firstEntryIndex; int lastEntryIndex; int maxConcurrentJobs; int jobsInProgress; oid replyObj; } inherits from Object; JobController:create(string poolName) { /*! Use the servers in the pool named by poolName. !*/ peerRegistry = lookupLocalService("PeerRegistry"); if (peerRegistry == nil) { display("JobController: no PeerRegistry\n"); send "deleteYourself" to thisObject; } poolServiceName = makeAsString("ServerPool:", poolName); oid obj = lookupLocalService(poolServiceName); if (obj != nil) { // add local system iff member of named server pool send "peerRegistered"(ObjectCreator) to thisObject; } send "addNotifyWhenPeerRegistered"(thisObject, 1) to peerRegistry from nil; firstEntryIndex = 1; lastEntryIndex = 1; assoc acl = makeDefaultACL(); replyObj = send "createObject"("ForwardReply", acl, "lookupResponse", thisObject) to ObjectCreator; } JobController:delete() { if (peerRegistry != nil) { send "removeNotifyWhenPeerRegistered"(thisObject) to peerRegistry from nil; } if (replyObj != nil) { send "deleteYourself" to replyObj; } } JobController:peerRegistered(oid remotePeer) { array args; if (remotePeer == nil) exit; args[0] = poolServiceName; send "invokeLocally"("lookupLocalService", args, "PeerRegistry") to remotePeer from replyObj; // expect reply to be forwarded as "lookupResponse" } JobController:lookupResponse(oid remotePeer) { display("lookupResponse, argv=", argv); if (remotePeer == nil) exit; string key = makeAsString(remotePeer); serverIdCounter += 1; serverList[key] = serverIdCounter; serverOID[serverIdCounter] = remotePeer; serverLoad[serverIdCounter] = 0; maxConcurrentJobs = elementCount(serverList) * MAX_JOBS_PER_SERVER; while (firstEntryIndex < lastEntryIndex) { if (jobsInProgress > maxConcurrentJobs) break; call "dequeueJob"(); } } JobController:peerUnregistered(oid remotePeer) { string key = makeAsString(remotePeer); int id = serverList[key]; serverList = deleteIndex(serverList, key); serverOID = deleteIndex(serverOID, id); serverLoad = deleteIndex(serverLoad, id); maxConcurrentJobs = elementCount(serverList) * MAX_JOBS_PER_SERVER; } JobController:queueJob(oid client, any clientInfo, string className, assoc acl, set args) { /*! Queue a work unit. Result returned to client by sending it a jobComplete message and passing it clientInfo and the result of the job. !*/ // NOTE: update eQueuedWorkSubscripts enum if method prototype changes queuedWork[lastEntryIndex] = argv; lastEntryIndex += 1; display("jobs=", jobsInProgress, " max=", maxConcurrentJobs, "\n"); if (jobsInProgress < maxConcurrentJobs) { call "dequeueJob"(); } return (0); } JobController:dequeueJob() { if (lastEntryIndex == firstEntryIndex) return (0); int transactionId = firstEntryIndex; firstEntryIndex += 1; jobsInProgress += 1; int server = call "selectServer"(); serverForJob[transactionId] = server; oid rmtObj = serverOID[server]; serverLoad[server] += 1; array rec = queuedWork[transactionId]; string className = rec[WORK_CLASS_NAME]; assoc acl = rec[WORK_ACL]; set args = rec[WORK_ARGS]; // create OID that only allows returnJobResult method oid ourID = createNewOIDthatOnlyAllowsOthers(thisObject, "returnJobResult"); send "createObject"(className, acl, ourID, transactionId, args) to rmtObj from nil; return (transactionId); } JobController:returnJobResult(int transactionId, any result) { /*! Notes the completion of a queued task. !*/ if (indexExists(serverForJob, transactionId) == 0) { debugDisplay(debugLogLevel3, "JobController:jobComplete no such transaction", transactionId, "\n"); exit; } jobsInProgress -= 1; int server = serverForJob[transactionId]; serverLoad[server] -= 1; array rec = queuedWork[transactionId]; oid client = rec[WORK_CLIENT_OBJ]; any clientInfo = rec[WORK_CLIENT_INFO]; queuedWork = deleteIndex(queuedWork, transactionId); serverForJob = deleteIndex(serverForJob, transactionId); send "jobComplete"(clientInfo, result) to client; if (jobsInProgress < maxConcurrentJobs) { call "dequeueJob"(); } } JobController:selectServer() { int i; int minId = -1; int minLoad = 0x7f0000; for(i=nextIndex(serverLoad, 0);i != 0;i=nextIndex(serverLoad,i)) { int l = serverLoad[i]; if (l < minLoad) { minLoad = l; minId = i; } } return (minId); } JobController:timerExpired() { } class Local . TestJobController { oid controllerObj; int queuedJobs; int completedJobs; } inherits from Object; TestJobController:create(int count) { int i; set args; assoc acl = makeDefaultACL(); controllerObj = send "createObject"("JobController", acl, "TestPool") to ObjectCreator; queuedJobs = count; args += "hi"; for(i=1;i<=count;i+=1) { send "queueJob"(thisObject, i, "TestJob", acl, emptySet + args) to controllerObj from nil; } } TestJobController:delete() { send "deleteYourself" to controllerObj; } TestJobController:jobComplete(any jobInfo, any result) { display("result from job ", jobInfo, " is ", result, "\n"); completedJobs += 1; if (completedJobs == queuedJobs) { send "deleteYourself" to thisObject; } } class Local . TestJob { int transactionId; oid jobController; } inherits from Object; TestJob:create(oid jobCntrl, int transId, any arg1) { jobController = jobCntrl; transactionId = transId; string result = makeAsString("[", transId, ":", arg1, "]"); send "returnJobResult"(transactionId, result) to jobController; send "deleteYourself" to thisObject; } TestJob:delete() { } /* vim: set expandtab shiftwidth=4 tabstop=4: */