View Javadoc
1   package edu.internet2.middleware.grouper.pspng;
2   
3   /*******************************************************************************
4    * Copyright 2015 Internet2
5    * 
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    * 
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   ******************************************************************************/
18  
19  import java.util.*;
20  import java.util.concurrent.TimeUnit;
21  
22  import edu.internet2.middleware.grouper.app.loader.db.Hib3GrouperLoaderLog;
23  import edu.internet2.middleware.grouper.audit.GrouperEngineBuiltin;
24  import edu.internet2.middleware.grouper.hibernate.GrouperContext;
25  import edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystem;
26  import edu.internet2.middleware.grouper.pspng.lbmq.LinkedBlockingMultiQueue;
27  import edu.internet2.middleware.grouper.util.GrouperUtil;
28  import edu.internet2.middleware.grouperClient.messaging.*;
29  import org.apache.log4j.MDC;
30  import org.joda.time.DateTime;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import edu.internet2.middleware.grouper.GrouperSession;
35  
36  
37  /** 
38   * This class manages a full-sync thread for a provisioner. This is expected to be instantiated
39   * within the Grouper Loader/Daemon jvm and it reads from the grouper-loader settings, but it is not
40   * triggered by any changelog or message harness. 
41   * 
42   * Instead, full-refreshes are triggered via Quartz and the processing interacts directly with the 
43   * Group registry.
44   * 
45   * @author Bert Bee-Lindgren
46   *
47   */
48  public class FullSyncProvisioner  {
49    public static enum FULL_SYNC_COMMAND {FULL_SYNC_GROUP, CLEANUP, FULL_SYNC_ALL_GROUPS};
50  
51    // How often to print the progress of full-sync tasks
52    private static final int FULL_SYNC_PROGRESS_INTERVAL_SECS = 5;
53  
54    // This string is chosen to not match any group name
55    public static final String FULL_SYNC_ALL_GROUPS = "::full-sync-all-groups::";
56    private static final InheritableThreadLocal<FullSyncQueueItem> currentFullSyncItem
57                = new InheritableThreadLocal<FullSyncQueueItem>();
58  
59    final private Logger LOG;
60  
61    final Object grouperMessagingQueueSetupLock = new Object();
62    
63    final protected Provisioner<?,?,?> provisioner;
64  
65    /**
66     * There are several sources for FullSync operations. This LinkedBlockingMultiQueue
67     * is the single data structure the FullSync Engine listens to for work. FullSyncQueueItems
68     * are put into "subqueues" within the MultiQueue from either messaging, changeLog processing
69     * or gsh.
70     */
71    final LinkedBlockingMultiQueue<String, FullSyncQueueItem> queues = new LinkedBlockingMultiQueue<>();
72    // Define the queues with just a little capacity so most queued messages remain in the
73    // real queuing system where other Daemons could process them, for example
74    final static int QUEUE_CAPCITY=2;
75  
76    // These queues are defined in priority order (most important first)
77    // The local ones are unbounded (so injecting threads are never blocked) and
78    //   allow the injecting threads to monitor the progress
79    public static enum QUEUE_TYPE {
80      ASAP_LOCAL(10, false),
81      ASAP(20, true),
82      CHANGELOG(30, true),
83      SCHEDULED_LOCAL(40, false),
84      BACKGROUND_LOCAL(50, false),
85      BACKGROUND(60, true),
86      RETRY_LOCAL(90, false),
87      RETRY(100, true);
88  
89      int priority; // Lower priority is put at the front of the queue
90      String queueName_short;
91      boolean usesGrouperMessagingQueue;
92      int queueCapacity;
93      QUEUE_TYPE(int priority, boolean usesGrouperMessagingQueue)
94      {
95        this.priority = priority;
96        this.queueName_short = this.name().toLowerCase();
97        this.usesGrouperMessagingQueue=usesGrouperMessagingQueue;
98  
99        // GrouperMessaging queues have limited capacity so messages stay in upstream queues
100       if (usesGrouperMessagingQueue) {
101         queueCapacity=QUEUE_CAPCITY;
102       } else {
103         queueCapacity=Integer.MAX_VALUE;
104       }
105     }
106 
107     String getQueueName_grouperMessaging(FullSyncProvisioner fullSyncProvisioner) {
108       return String.format("pspng_full_sync_%s_%s", fullSyncProvisioner.provisioner.getConfigName(), queueName_short);
109     }
110   }
111 
112 
113   final Map<String, DateTime> lastSuccessfulFullSyncDate = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
114 
115   /**
116    * Constructor used by the getfullSyncer() factory method to construct a full-sync wrapper
117    * around a provisioner. In other words, do not call this constructor directly.
118    * 
119    * @param provisioner
120    */
121   protected FullSyncProvisioner(Provisioner provisioner) {
122     LOG = LoggerFactory.getLogger(String.format("%s.%s", getClass().getName(), provisioner.getDisplayName()));
123     LOG.debug("Constructing PspngFullSyncer-{}", provisioner.getDisplayName());
124 
125     this.provisioner = provisioner;
126     GrouperUtil.assertion(provisioner.fullSyncMode, "FullSync provisioners must be constructed with full-sync enabled");
127 
128     // Create the subQueues
129     for ( QUEUE_TYPE queue_type : QUEUE_TYPE.values() ) {
130       queues.addSubQueue(queue_type.queueName_short, queue_type.priority, queue_type.queueCapacity);
131     }
132   }
133 
134   public String getName() {
135     return String.format ("FullSyncer(%s)", getConfigName());
136   }
137 
138     /**
139    * Get the FullSync thread_manageFullSyncProcessing() thread running
140    */
141   protected void start() {
142     Thread t = new Thread(new Runnable() {
143       @Override
144       public void run() {
145           PspUtils.setupNewThread();
146           try {
147               thread_manageFullSyncProcessing();
148           } catch (Throwable t) {
149               LOG.error("{}: FullSync failed", getName(), t);
150           }
151           finally {
152               LOG.warn("{}: FullSync thread has exited", getName());
153           }
154       }
155     }, getName() + "-Thread");
156     
157     t.setDaemon(true);
158     t.start();
159 
160     for (QUEUE_TYPE queue_type : QUEUE_TYPE.values()) {
161       if (queue_type.usesGrouperMessagingQueue) {
162         setUpGrouperMessagingQueue(queue_type);
163         startMessageReadingThread(queue_type);
164       }
165     }
166   }
167 
168     /**
169      * starts a thread that reads from a GrouperMessaging queue and puts the items in
170      * a local-memory subQueue (which defines the item's eventual priority).
171      *
172      * The messages _are_not_ acknowledged by this thread, but are instead acknowledged
173      * when they are processed by the FullSync engine. Therefore, if the jvm dies
174      * while the messages are in-memory, they'll be retried eventually because they
175      * still live within GrouperMessaging.
176      *
177      * @param queue_type
178      */
179     protected void startMessageReadingThread(final QUEUE_TYPE queue_type) {
180         // Need a copy of the provisioner for the inner, anonymous class below
181         final FullSyncProvisioner theFullSyncProvisioner = this;
182         Thread t = new Thread(new Runnable() {
183             @Override
184             public void run() {
185 
186                 PspUtils.setupNewThread();
187                 try {
188                     thread_fullSyncMessageQueueReader(queue_type);
189                 } catch (Throwable t) {
190                     LOG.error("{}: Full-sync queue reader failed ({})", getName(), queue_type, t);
191                 }
192                 finally {
193                     LOG.error("{}: Full-sync queue reader has exited ({})", getName(), queue_type);
194                 }
195             }
196         }, getName() + "-MessageReaderThread-" + queue_type.queueName_short);
197 
198         t.setDaemon(true);
199         t.start();
200     }
201 
202 
203     /**
204    * method that manages full-sync processing. Most of the time you think of FullSync as 
205    * what happens on a schedule (weekly full-sync, for example). However, sometimes
206    * incremental provisioning needs a specific group to be sanity checked. For example,
207    * if a +member change and a -member change are batched together, the desired order 
208    * can be lost. Therefore, a full-sync of the group will make sure the result is accurate.
209    * 
210    * This method will wait for any full-refreshes to be requested, either from incremental
211    * provisioning or from a quartz-scheduled thread that triggers all the groups to be 
212    * sync'ed. 
213    */
214   protected void thread_manageFullSyncProcessing() {
215     MDC.put("who", getName()+"/");
216     Provisioner.activeProvisioner.set(this.provisioner);
217 
218     GrouperSession grouperSession = null;
219     GrouperContext grouperContext = null;
220     
221     while (true) {
222       while ( !provisioner.config.isEnabled() ) {
223         LOG.warn("Provisioner is disabled. Full-sync not being done.");
224         GrouperUtil.sleep(15000);
225         continue;
226       }
227 
228       // Start with a new session and context each time
229       if ( grouperSession != null ) {
230           GrouperSession.stopQuietly(grouperSession);
231       }
232 
233       if ( grouperContext != null ) {
234           GrouperContext.deleteDefaultContext();
235       }
236 
237       grouperSession = GrouperSession.startRootSession();
238       grouperContext = GrouperContext.createNewDefaultContext(GrouperEngineBuiltin.LOADER, false, true);
239 
240       FullSyncQueueItem queueItem = getNextFullSyncRequest();
241 
242       // We might have blocked for a long time on getNextFullSyncRequest(), so let's
243       // double-check that we're still enabled, and skip back to top of loop if we are not
244       if ( !provisioner.config.isEnabled() ) {
245         continue;
246       }
247 
248       LOG.info("Starting to process full-sync queue item: {}", queueItem);
249 
250       try {
251         MDC.put("why", String.format("qid=%d/", queueItem.id));
252         processQueueItem(queueItem);
253       }
254       catch (Throwable t) {
255           LOG.error("{}: Full-Sync processing failed: {}", new Object[]{getName(), queueItem, t});
256           queueItem.processingCompletedUnsuccessfully(false,
257                   "%s: %s",
258                   t.getClass().getName(), t.getMessage());
259 
260       }
261       finally {
262         MDC.remove("why");
263       }
264       if ( provisioner.getConfig().needsTargetSystemUsers() || provisioner.getConfig().needsTargetSystemGroups()) {
265         LOG.debug("Caching Stats: TSSubject: {} || TSGroup: {}",
266                 provisioner.targetSystemUserCache.getStats(), provisioner.targetSystemGroupCache.getStats());
267       }
268 
269     }
270   }
271 
272   private ProvisionerCoordinator getProvisionerCoordinator() {
273         return ProvisionerFactory.getProvisionerCoordinator(provisioner);
274   }
275 
276 
277   /**
278    * Method that reads a full-sync queue (See Grouper Messaging) and forwards messages
279    * to an internal (in memory) subqueue. Note that the capcity of most subqueues is
280    * quite limited so that these Reader threads block and leave most messages in the
281    * upstream queues... so other daemons can fetch and process them.
282    *
283    * Note: each full-sync provisioner runs several of these, one for each of the different
284    * messaging queues
285    */
286   protected void thread_fullSyncMessageQueueReader(QUEUE_TYPE queueType) {
287       MDC.put("why", String.format("full-sync-message-reader:%s/", queueType.queueName_short));
288       MDC.put("who", getName() + "/");
289 
290       String messagingQueueName = queueType.getQueueName_grouperMessaging(this);
291       String subQueueName = queueType.queueName_short;
292 
293       LOG.info("{} message reader Starting: {}-->{}",
294               new Object[]{getName(), messagingQueueName, subQueueName});
295 
296       GrouperSession gs = GrouperSession.startRootSession();
297 
298       // In order to remain gentle in the face of repeating errors or empty queue-pull results,
299       // we want to sleep in our loop. This variable is set to different values depending on
300       // what happened in our last pull-and-process loop
301       int nextSleepTime_secs = 0;
302 
303       LOG.info("{} message reader: created queue {} and granted send/receive permission to {}",
304               new Object[]{getName(), messagingQueueName, gs.getSubject()});
305 
306       while (true) {
307           if (nextSleepTime_secs > 0) {
308               GrouperUtil.sleep(1000L * nextSleepTime_secs);
309           }
310 
311           // Check to see if there is space in our assigned subQueue
312           if ( queues.getSubQueue(subQueueName).remainingCapacity() < 0) {
313             LOG.trace("{} message reader: waiting for space in queue {}",
314                     getName(), messagingQueueName);
315 
316 
317             nextSleepTime_secs=1;
318             continue;
319           }
320 
321 
322           // Normally, we don't want to sleep, so start each loop like nothing went wrong
323           nextSleepTime_secs = 0;
324 
325           GrouperMessageReceiveParam receive = new GrouperMessageReceiveParam();
326           receive.assignGrouperMessageSystemName(provisioner.getConfig().getGrouperMessagingSystemName());
327           receive.assignQueueName(messagingQueueName);
328           receive.assignMaxMessagesToReceiveAtOnce(queues.getSubQueue(subQueueName).remainingCapacity());
329           receive.assignLongPollMillis(300*1000);
330 
331           GrouperMessageReceiveResult grouperMessageReceiveResult = GrouperMessagingEngine.receive(receive);
332 
333           Collection<GrouperMessage> grouperMessages;
334           try {
335               LOG.debug("{} message reader: requesting messages from queue {}",
336                       getName(), messagingQueueName);
337 
338               grouperMessages = grouperMessageReceiveResult.getGrouperMessages();
339           } catch (Exception e) {
340               nextSleepTime_secs = 15;
341               LOG.error("{} message reader: Problem pulling messages from grouper message queue",
342                       getName(), e);
343               continue;
344           }
345 
346           if (grouperMessages.size() == 0) {
347               nextSleepTime_secs = 5;
348               LOG.debug("{}/{} message reader: no messages received", getName(), subQueueName);
349               continue;
350           }
351 
352           LOG.info("{}/{} message reader: received and processing {} messages",
353                   getName(), subQueueName, grouperMessages.size());
354 
355           try {
356               for (GrouperMessage message : grouperMessages) {
357                   // Set up what we need to ack the message
358                   GrouperMessageAcknowledgeParam messageAcknowledgeParam =
359                     new GrouperMessageAcknowledgeParam()
360                         .assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed)
361                         .assignQueueName(messagingQueueName)
362                         .assignGrouperMessageSystemName(GrouperBuiltinMessagingSystem.BUILTIN_NAME)
363                         .addGrouperMessage(message);
364 
365                   String body = message.getMessageBody();
366                   LOG.debug("{}/{} message reader: Processing grouper message {} = {}",
367                           new Object[]{getName(), subQueueName,message.getId(), body});
368 
369                   FullSyncQueueItem queueItem = FullSyncQueueItem.fromJson(queueType, getConfigName(), body);
370 
371                   if ( queueItem==null ) {
372                     LOG.error("{}: Skipping invalid full-sync message: {}", getConfigName(), body);
373                     GrouperMessagingEngine.acknowledge(messageAcknowledgeParam);
374                     continue;
375                   }
376                   else {
377                     queueItem.messageToAcknowledge = messageAcknowledgeParam;
378                   }
379 
380                   LOG.info("{} message reader: queuing onto {} queue: {}", getName(), subQueueName, queueItem);
381 
382                   queues.getSubQueue(subQueueName).put(queueItem);
383               }
384           } catch (Exception e) {
385               nextSleepTime_secs = 15;
386               LOG.error("{} message reader: Problem while reading message and queuing it internally",
387                       getName(), e);
388           }
389       }
390   }
391 
392   protected void setUpGrouperMessagingQueue(QUEUE_TYPE queueType) {
393     GrouperSession gs = GrouperSession.staticGrouperSession();
394     String messagingQueueName = queueType.getQueueName_grouperMessaging(this);
395 
396     // These methods are not atomic and paralell invocation results in Duplicate Key exceptions
397     // when the subject is added to groups multiple times
398     synchronized (grouperMessagingQueueSetupLock) {
399       GrouperBuiltinMessagingSystem.createQueue(messagingQueueName);
400       if (!GrouperBuiltinMessagingSystem.allowedToSendToQueue(messagingQueueName, gs.getSubject())) {
401         LOG.info("Queue permission: Granting send permission to subject={} for queue={}", messagingQueueName, gs.getSubject());
402         GrouperBuiltinMessagingSystem.allowSendToQueue(messagingQueueName, gs.getSubject());
403       }
404 
405       if (!GrouperBuiltinMessagingSystem.allowedToReceiveFromQueue(messagingQueueName, gs.getSubject())) {
406         LOG.info("Queue permission: Granting receive permission to subject={} for queue={}", messagingQueueName, gs.getSubject());
407         GrouperBuiltinMessagingSystem.allowReceiveFromQueue(messagingQueueName, gs.getSubject());
408       }
409     }
410   }
411 
412 
413   protected void processQueueItem(FullSyncQueueItem queueItem) throws PspException {
414     switch (queueItem.command) {
415       case FULL_SYNC_GROUP:
416         DateTime lastSuccessFullSync = lastSuccessfulFullSyncDate.get(queueItem.groupName);
417         if ( lastSuccessFullSync!=null && queueItem.asofDate!=null && lastSuccessFullSync.isAfter(queueItem.asofDate) ) {
418           queueItem.processingCompletedSuccessfully("Skipping redundant full-sync. Group was full-synced successfully on %s which is after (asOf date) %s",
419                   lastSuccessFullSync, queueItem.asofDate);
420           return;
421         }
422 
423 
424         queueItem.startStep("ReadGroupFromGrouper");
425         GrouperGroupInfo grouperGroupInfo = provisioner.getGroupInfoOfExistingGroup(queueItem.groupName);
426         if ( grouperGroupInfo==null ) {
427           LOG.error("{}: Group not found for full-sync: '{}'", getConfigName(), queueItem.groupName);
428           queueItem.processingCompletedUnsuccessfully(false, "Group not found");
429           return;
430         }
431 
432         if ( provisioner.shouldGroupBeProvisioned(grouperGroupInfo) ) {
433           boolean changesWereMade=false;
434 
435           for(int i=0; i<provisioner.getConfig().getMaxNumberOfTimesToRepeatedlyFullSyncGroup(); i++) {
436             // If we're repeatedly full-syncing, we want to make sure we're using most recent information
437             // (essentially disable caching)
438             if (i>0) {
439               LOG.info("{}: FullSync #{} of {}: Disabling group caching to ensure most recent information is used",
440                       getConfigName(), i+1, grouperGroupInfo);
441               provisioner.uncacheGroup(grouperGroupInfo, null);
442             }
443 
444             // Log a nice message about repeated full syncs, where level and message varies according to
445             // which fullsync retry we're doing
446             if (i==1) {
447               LOG.info("{}: Repeating full sync of {} to make sure changes made in first full sync did not clobber incremental changes", getConfigName(), grouperGroupInfo);
448             } else if (i>1) {
449               LOG.warn("{}: Full sync of {} continues to make changes ({} times so far). The group is probably changing frequently, either within grouper or directly on target system",
450                       getConfigName(), grouperGroupInfo, i);
451 
452               // Make sure we don't loop too aggressively
453               GrouperUtil.sleep(provisioner.getConfig().getTimeToSleepBetweenRepeatedFullSyncs_ms());
454             }
455 
456             changesWereMade = fullSyncGroup(grouperGroupInfo, queueItem);
457 
458             if (!changesWereMade) {
459               // No changes were necessary, so break out of the full-sync-repeating loop
460               break;
461             }
462           }
463 
464           // Did our last full sync need changes?
465           if ( changesWereMade ) {
466             LOG.warn("{}: FullSync of {} was done {} times looking for stability, but the final one still required changes. There is a small possibility that realtime changes have been provisioned incorrectly and will be addressed during a future full sync.",
467                     getConfigName(), grouperGroupInfo,
468                     provisioner.getConfig().getMaxNumberOfTimesToRepeatedlyFullSyncGroup());
469             // TODO: Requeue the group for some time in the future, but track the total number of times this group is requeued
470             // to prevent requeuing forever
471           }
472         } else {
473           queueItem.processingCompletedUnsuccessfully(false, "Group is not selected for provisioning");
474         }
475         break;
476       case CLEANUP:
477         // Time to look for extra groups
478         processGroupCleanup(queueItem);
479         break;
480       case FULL_SYNC_ALL_GROUPS:
481         List<FullSyncQueueItem> scheduledItems = queueAllGroupsForFullSync(queueItem.sourceQueue, queueItem.externalReference, "Requested by message: %s", queueItem);
482         for (FullSyncQueueItem scheduledItem : scheduledItems) {
483           LOG.info("{} message reader: Group is queued for full sync: {}", getName(), scheduledItem);
484         }
485         queueItem.processingCompletedSuccessfully("Scheduled %d groups for full sync", scheduledItems.size());
486         break;
487       default:
488         queueItem.processingCompletedUnsuccessfully(false, "Full-sync command not known: %s", queueItem.command);
489     }
490   }
491 
492 
493   /**
494    * get the next full-sync request from our queues/subqueues.
495    * 
496    * This method blocks until a request is received
497    */
498   protected FullSyncQueueItem getNextFullSyncRequest() {
499     FullSyncQueueItem result = null;
500     while (result == null) {
501       try {
502         result = queues.poll(5, TimeUnit.MINUTES);
503         if (result == null) {
504           LOG.debug("{}: No full sync requests found", getName());
505           continue;
506         }
507 
508         result.wasDequeued();
509 
510         // Item is not ready yet. Push it back into its queue and sleep so we
511         // don't busyloop. This sleep only happens when the front of the queues
512         // are not ready to be run.
513         // TODO: Only sleep when we're repeatedly hammering
514         if (result.wakeTimeDate!=null && result.wakeTimeDate.isAfterNow()) {
515           LOG.trace("Found fullSyncQueueItem that wasn't ready. Requeuing and sleeping: {} isn't ready until {}",
516                   result.id, result.wakeTimeDate);
517 
518           requeue(result, false);
519           result = null;
520           Thread.sleep(5000);
521           continue;
522         }
523 
524         LOG.info("{}: Next full-sync request: {}", getName(), result);
525 
526       } catch (InterruptedException e) {
527         // ignore
528       }
529     }
530     return result;
531   }
532 
533 
534   public JobStatistics startFullSyncOfAllGroupsAndWaitForCompletion(Hib3GrouperLoaderLog hib3GrouploaderLog) throws PspException {
535       Date startDate = new Date();
536 
537       List<FullSyncQueueItem> queuedGroupSyncs
538               = queueAllGroupsForFullSync(QUEUE_TYPE.SCHEDULED_LOCAL, null,"Scheduled full sync");
539 
540       boolean everythingHasBeenCompleted = false;
541       Date statusLastLoggedDate = null;
542 
543       // Monitor the progress of our full-sync task items
544       int doneCount=0;
545       while ( !everythingHasBeenCompleted ) {
546           JobStatisticsbStatistics.html#JobStatistics">JobStatistics statisticsSoFar = new JobStatistics(startDate);
547 
548           doneCount = 0;
549           everythingHasBeenCompleted=true;
550           for (FullSyncQueueItem item : queuedGroupSyncs) {
551               if ( item.hasBeenProcessed() ) {
552                   statisticsSoFar.add(item.stats);
553                   doneCount++;
554               }
555               else {
556                   everythingHasBeenCompleted=false;
557               }
558           }
559 
560           // log the progress every FULL_SYNC_PROGRESS_INTERVAL_SECS
561           if ( everythingHasBeenCompleted ||
562                   statusLastLoggedDate==null ||
563                   (System.currentTimeMillis()-statusLastLoggedDate.getTime())/1000L > FULL_SYNC_PROGRESS_INTERVAL_SECS ) {
564               String status = String.format("%d groups of %d (%d%%)",
565                         doneCount, queuedGroupSyncs.size(),
566                         (int)(100.0 * doneCount / queuedGroupSyncs.size()));
567 
568               LOG.info("{}: Full Sync of all groups progress: {}: {}", getName(), status, statisticsSoFar);
569 
570               statisticsSoFar.updateLoaderLog(hib3GrouploaderLog);
571               hib3GrouploaderLog.setJobMessage(status);
572               hib3GrouploaderLog.store();
573 
574               statusLastLoggedDate = new Date();
575           }
576 
577         // Sleep for a bit to let the full-syncs progress
578           if ( !everythingHasBeenCompleted ) {
579               GrouperUtil.sleep(250);
580           }
581       }
582 
583     JobStatistics/JobStatistics.html#JobStatistics">JobStatistics overallStats = new JobStatistics();
584 
585     for (FullSyncQueueItem item : queuedGroupSyncs) {
586           overallStats.add(item.stats);
587       }
588       overallStats.done();
589 
590       LOG.info("{}: Full Sync of all groups: Finished. Stats: {}", getName(), overallStats);
591       return overallStats;
592   }
593 
594     /**
595    * Go through the Grouper Groups and queue up the ones that match the provisioner's 
596    * ShouldBeProvisioned filter.
597    */
598   protected List<FullSyncQueueItem> queueAllGroupsForFullSync(QUEUE_TYPE queue,
599                                                               String externalReference,
600                                                               String reasonFormat, Object... reasonArgs) throws PspException {
601     String reason = String.format(reasonFormat, reasonArgs);
602     LOG.info("{}: Queuing ({}) all groups for full sync. ({})", getName(), queue.queueName_short, reason);
603     List<FullSyncQueueItem> result = new ArrayList<>();
604 
605     Collection<GrouperGroupInfo> allGroups = provisioner.getAllGroupsForProvisioner();
606     for ( GrouperGroupInfo group : allGroups ) {
607       result.add(scheduleGroupForSync(queue, group.name, externalReference, reason));
608     }
609     
610     if ( provisioner.config.isGrouperAuthoritative()) {
611         FullSyncQueueItem cleanupItem = scheduleGroupCleanup(queue, externalReference, reason);
612 
613         if ( cleanupItem != null ) {
614             result.add(cleanupItem);
615         }
616     }
617 
618     return result;
619   }
620   
621   
622   /**
623    * Put the given group in a queue for full syncing
624    * @param queue : What queue should be used?
625    * @param groupName
626    * @param reasonArgs
627    */
628   public FullSyncQueueItem scheduleGroupForSync(
629           QUEUE_TYPE queue,
630           String groupName,
631           String externalReference,
632           String reasonFormat, Object... reasonArgs) {
633     String reason = String.format(reasonFormat, reasonArgs);
634     FullSyncQueueItemFullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem queueItem = new FullSyncQueueItem(getConfigName(), queue, groupName, reason);
635     queueItem.externalReference = externalReference;
636 
637     LOG.debug("[qid={}] Scheduling group for {} full-sync: {}: {}",
638                 queueItem.id,
639                 queue.queueName_short,
640                 groupName,
641                 reason
642         );
643     return queue(queue, queueItem);
644   }
645 
646   private String getConfigName() {
647     return provisioner.getConfigName();
648   }
649 
650   /**
651    * Put a GROUP_CLEANUP_MARKER into the full-sync schedule. This means that
652    * the target system will be checked for information about groups that either
653    * no longer exist or that are no longer selected to be provisioned to the system.
654    */
655   public FullSyncQueueItem scheduleGroupCleanup(QUEUE_TYPE queue, String externalReference, String reasonFormat, Object... reasonArgs) {
656     String reason = String.format(reasonFormat, reasonArgs);
657 
658     if ( provisioner.config.isGrouperAuthoritative() ) {
659       FullSyncQueueItemFullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem queueItem = new FullSyncQueueItem(
660               getConfigName(), queue,
661               FULL_SYNC_COMMAND.CLEANUP,
662               reason);
663       queueItem.externalReference = externalReference;
664       LOG.debug("Scheduling group cleanup [{}]: {}", queue.name(), queueItem);
665 
666       return queue(queue, queueItem);
667     } else {
668       LOG.warn("Ignoring group-cleanup request because grouper is not authoritative within the target system");
669       return null;
670     }
671   }
672 
673   protected FullSyncQueueItemet2/middleware/grouper/pspng/FullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem queue(QUEUE_TYPE queue, FullSyncQueueItem queueItem)
674   {
675     // Item is going into a queue, so clear it's dequeue time
676     queueItem.mostRecentDequeueTime=null;
677 
678     if (queue.usesGrouperMessagingQueue)
679     {
680       // JSON queueItem and message it
681       String queueItemJson = queueItem.toJson();
682       GrouperMessageSendParam sendParam = new GrouperMessageSendParam()
683               .assignGrouperMessageSystemName(provisioner.getConfig().getGrouperMessagingSystemName())
684               .assignQueueOrTopicName(queue.getQueueName_grouperMessaging(this))
685               .assignQueueType(GrouperMessageQueueType.queue)
686               .addMessageBody(queueItemJson);
687       GrouperMessagingEngine.send(sendParam);
688     } else {
689       try {
690         queues.getSubQueue(queue.queueName_short).put(queueItem);
691       } catch (InterruptedException e)
692       {
693         LOG.error("Interrupted while adding to in-memory queue {}/{}: {}",
694                 new Object[] {queue.name(), queue.queueName_short, queueItem});
695       }
696     }
697 
698     return queueItem;
699   }
700 
701 
702   /**
703    * Requeue item either into GrouperMessaging or a local queue, depending on where the queueItem
704    * came from
705    * @param queueItem
706    * @param processingFailed True when the event was actually processed and needs to have its
707    *                         retryCount and wakeTime updated
708    * @return the input queueItem after any adjustments
709    */
710   protected FullSyncQueueItem../../edu/internet2/middleware/grouper/pspng/FullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem requeue(FullSyncQueueItem queueItem, boolean processingFailed) {
711     // Put the group into the error queue
712 
713     if ( processingFailed ) {
714       queueItem.incrementRetryCount();
715     }
716 
717     if (queueItem.sourceQueue.usesGrouperMessagingQueue) {
718       queue(QUEUE_TYPE.RETRY, queueItem);
719     }
720     else
721       queue(QUEUE_TYPE.RETRY_LOCAL, queueItem);
722 
723     return queueItem;
724   }
725 
726 
727 
728   /**
729    * Workhorse method that handles the FullSync of a specific group.
730    * @param _grouperGroupInfo Group on which to do a Full Sync. The grouper group will be reread from database to make sure information is fresh.
731    * @param fullSyncQueueItem What is driving this sync
732    * @return true if changes to target system were made
733    */
734   protected boolean fullSyncGroup(GrouperGroupInfo _grouperGroupInfo, FullSyncQueueItem fullSyncQueueItem) {
735       fullSyncQueueItem.startStep("ClearingGroupCache");
736       // Uncache the group we're processing
737       provisioner.uncacheGroup(_grouperGroupInfo, null);
738       provisioner.targetSystemGroupCache.clear();
739 
740       GrouperGroupInfo grouperGroupInfo = provisioner.getGroupInfoOfExistingGroup(_grouperGroupInfo.getName());
741 
742       ProvisioningWorkItem workItem = ProvisioningWorkItem.createForFullSync(grouperGroupInfo, fullSyncQueueItem.asofDate);
743       final List<ProvisioningWorkItem> workItems = Arrays.asList(workItem);
744 
745       fullSyncQueueItem.startStep("StartCoordination");
746       provisioner.startCoordination(workItems);
747       try {
748           MDC.put("what", String.format("%s/", grouperGroupInfo));
749           MDC.put("why", String.format("QID=%d/ExtRef=%s/", fullSyncQueueItem.id, fullSyncQueueItem.externalReference));
750           MDC.put("step", "start/");
751           LOG.info("{}: Starting Full-Sync ({}) of group {}",
752                   new Object[]{getName(), fullSyncQueueItem.reason, grouperGroupInfo});
753 
754           fullSyncQueueItem.startStep("StartProvisioning(get group & subject info)");
755           provisioner.startProvisioningBatch(workItems);
756 
757           MDC.put("step",  "doit/");
758 
759           provisioner.setCurrentWorkItem(workItem);
760 
761           fullSyncQueueItem.startStep("doFullSync");
762           boolean changesWereNecessary = provisioner.doFullSync(grouperGroupInfo, fullSyncQueueItem.asofDate, fullSyncQueueItem.stats);
763 
764           MDC.put("step", "finsh/");
765           fullSyncQueueItem.startStep("FinishProvisioning");
766           provisioner.finishProvisioningBatch(workItems);
767 
768           fullSyncQueueItem.startStep("FinishCoordination");
769           provisioner.finishCoordination( workItems, true);
770 
771           lastSuccessfulFullSyncDate.put(fullSyncQueueItem.groupName, DateTime.now());
772           fullSyncQueueItem.processingCompletedSuccessfully("Success");
773           return changesWereNecessary;
774       } catch (PspException e) {
775           LOG.error("{}: Problem doing full sync. Requeuing group {}",
776               new Object[]{ getName(), grouperGroupInfo, e} );
777 
778           fullSyncQueueItem.processingCompletedUnsuccessfully(true, "%s: %s", e.getClass().getName(), e.getMessage());
779 
780           requeue(fullSyncQueueItem, true);
781 
782           return false;
783       } catch (Throwable e) {
784           LOG.error("{}: Problem doing full sync. Requeuing group {}",
785               new Object[] {getName(), grouperGroupInfo, e });
786           fullSyncQueueItem.processingCompletedUnsuccessfully(true, "%s: %s", e.getClass().getName(), e.getMessage());
787           requeue(fullSyncQueueItem, true);
788           return false;
789       }
790       finally {
791         provisioner.finishCoordination(workItems, false);
792         MDC.remove("step");
793         MDC.remove("what");
794         MDC.remove("why");
795       }
796   }
797   
798   protected boolean processGroupCleanup(FullSyncQueueItem queueItem) {
799     try {
800       MDC.put("what", "group_cleanup/");
801       MDC.put("why", String.format("QID=%d/ExtRef=%s/", queueItem.id, queueItem.externalReference));
802 
803       LOG.info("{}: Starting Group Cleanup ({})", getName(), queueItem.reason);
804       ProvisioningWorkItem workItem = ProvisioningWorkItem.createForGroupCleanup(queueItem.asofDate);
805       MDC.put("step", "start/");
806       provisioner.startProvisioningBatch(Arrays.asList(workItem));
807 
808       MDC.put("step", "doit/");
809       provisioner.setCurrentWorkItem(workItem);
810       provisioner.prepareAndRunGroupCleanup(queueItem.stats);
811       
812       MDC.put("step",  "finish/");
813       provisioner.finishProvisioningBatch(Arrays.asList(workItem));
814       queueItem.processingCompletedSuccessfully("Success");
815       return true;
816     } catch (PspException e) {
817       LOG.error("{}: Problem doing group cleanup",
818           getName(), e );
819       queueItem.processingCompletedUnsuccessfully(false, "%s: %s", e.getClass().getName(), e.getMessage());
820       return false;
821     }
822     catch (Throwable e) {
823       LOG.error("{}: Problem doing group cleanup",
824           getName(), e );
825       queueItem.processingCompletedUnsuccessfully(false, "%s: %s", e.getClass().getName(), e.getMessage());
826       return false;
827     }
828     finally {
829       MDC.remove("step");
830       MDC.remove("what");
831       MDC.remove("why");
832     }
833   
834   }
835 
836   public DateTime getLastSuccessfulFullSyncDate(String groupName) {
837     return lastSuccessfulFullSyncDate.get(groupName);
838   }
839 
840 
841 
842 }