1 package edu.internet2.middleware.grouper.pspng;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import java.util.*;
20 import java.util.concurrent.TimeUnit;
21
22 import edu.internet2.middleware.grouper.app.loader.db.Hib3GrouperLoaderLog;
23 import edu.internet2.middleware.grouper.audit.GrouperEngineBuiltin;
24 import edu.internet2.middleware.grouper.hibernate.GrouperContext;
25 import edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystem;
26 import edu.internet2.middleware.grouper.pspng.lbmq.LinkedBlockingMultiQueue;
27 import edu.internet2.middleware.grouper.util.GrouperUtil;
28 import edu.internet2.middleware.grouperClient.messaging.*;
29 import org.apache.log4j.MDC;
30 import org.joda.time.DateTime;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import edu.internet2.middleware.grouper.GrouperSession;
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class FullSyncProvisioner {
49 public static enum FULL_SYNC_COMMAND {FULL_SYNC_GROUP, CLEANUP, FULL_SYNC_ALL_GROUPS};
50
51
52 private static final int FULL_SYNC_PROGRESS_INTERVAL_SECS = 5;
53
54
55 public static final String FULL_SYNC_ALL_GROUPS = "::full-sync-all-groups::";
56 private static final InheritableThreadLocal<FullSyncQueueItem> currentFullSyncItem
57 = new InheritableThreadLocal<FullSyncQueueItem>();
58
59 final private Logger LOG;
60
61 final Object grouperMessagingQueueSetupLock = new Object();
62
63 final protected Provisioner<?,?,?> provisioner;
64
65
66
67
68
69
70
71 final LinkedBlockingMultiQueue<String, FullSyncQueueItem> queues = new LinkedBlockingMultiQueue<>();
72
73
74 final static int QUEUE_CAPCITY=2;
75
76
77
78
79 public static enum QUEUE_TYPE {
80 ASAP_LOCAL(10, false),
81 ASAP(20, true),
82 CHANGELOG(30, true),
83 SCHEDULED_LOCAL(40, false),
84 BACKGROUND_LOCAL(50, false),
85 BACKGROUND(60, true),
86 RETRY_LOCAL(90, false),
87 RETRY(100, true);
88
89 int priority;
90 String queueName_short;
91 boolean usesGrouperMessagingQueue;
92 int queueCapacity;
93 QUEUE_TYPE(int priority, boolean usesGrouperMessagingQueue)
94 {
95 this.priority = priority;
96 this.queueName_short = this.name().toLowerCase();
97 this.usesGrouperMessagingQueue=usesGrouperMessagingQueue;
98
99
100 if (usesGrouperMessagingQueue) {
101 queueCapacity=QUEUE_CAPCITY;
102 } else {
103 queueCapacity=Integer.MAX_VALUE;
104 }
105 }
106
107 String getQueueName_grouperMessaging(FullSyncProvisioner fullSyncProvisioner) {
108 return String.format("pspng_full_sync_%s_%s", fullSyncProvisioner.provisioner.getConfigName(), queueName_short);
109 }
110 }
111
112
113 final Map<String, DateTime> lastSuccessfulFullSyncDate = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
114
115
116
117
118
119
120
121 protected FullSyncProvisioner(Provisioner provisioner) {
122 LOG = LoggerFactory.getLogger(String.format("%s.%s", getClass().getName(), provisioner.getDisplayName()));
123 LOG.debug("Constructing PspngFullSyncer-{}", provisioner.getDisplayName());
124
125 this.provisioner = provisioner;
126 GrouperUtil.assertion(provisioner.fullSyncMode, "FullSync provisioners must be constructed with full-sync enabled");
127
128
129 for ( QUEUE_TYPE queue_type : QUEUE_TYPE.values() ) {
130 queues.addSubQueue(queue_type.queueName_short, queue_type.priority, queue_type.queueCapacity);
131 }
132 }
133
134 public String getName() {
135 return String.format ("FullSyncer(%s)", getConfigName());
136 }
137
138
139
140
141 protected void start() {
142 Thread t = new Thread(new Runnable() {
143 @Override
144 public void run() {
145 PspUtils.setupNewThread();
146 try {
147 thread_manageFullSyncProcessing();
148 } catch (Throwable t) {
149 LOG.error("{}: FullSync failed", getName(), t);
150 }
151 finally {
152 LOG.warn("{}: FullSync thread has exited", getName());
153 }
154 }
155 }, getName() + "-Thread");
156
157 t.setDaemon(true);
158 t.start();
159
160 for (QUEUE_TYPE queue_type : QUEUE_TYPE.values()) {
161 if (queue_type.usesGrouperMessagingQueue) {
162 setUpGrouperMessagingQueue(queue_type);
163 startMessageReadingThread(queue_type);
164 }
165 }
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179 protected void startMessageReadingThread(final QUEUE_TYPE queue_type) {
180
181 final FullSyncProvisioner theFullSyncProvisioner = this;
182 Thread t = new Thread(new Runnable() {
183 @Override
184 public void run() {
185
186 PspUtils.setupNewThread();
187 try {
188 thread_fullSyncMessageQueueReader(queue_type);
189 } catch (Throwable t) {
190 LOG.error("{}: Full-sync queue reader failed ({})", getName(), queue_type, t);
191 }
192 finally {
193 LOG.error("{}: Full-sync queue reader has exited ({})", getName(), queue_type);
194 }
195 }
196 }, getName() + "-MessageReaderThread-" + queue_type.queueName_short);
197
198 t.setDaemon(true);
199 t.start();
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214 protected void thread_manageFullSyncProcessing() {
215 MDC.put("who", getName()+"/");
216 Provisioner.activeProvisioner.set(this.provisioner);
217
218 GrouperSession grouperSession = null;
219 GrouperContext grouperContext = null;
220
221 while (true) {
222 while ( !provisioner.config.isEnabled() ) {
223 LOG.warn("Provisioner is disabled. Full-sync not being done.");
224 GrouperUtil.sleep(15000);
225 continue;
226 }
227
228
229 if ( grouperSession != null ) {
230 GrouperSession.stopQuietly(grouperSession);
231 }
232
233 if ( grouperContext != null ) {
234 GrouperContext.deleteDefaultContext();
235 }
236
237 grouperSession = GrouperSession.startRootSession();
238 grouperContext = GrouperContext.createNewDefaultContext(GrouperEngineBuiltin.LOADER, false, true);
239
240 FullSyncQueueItem queueItem = getNextFullSyncRequest();
241
242
243
244 if ( !provisioner.config.isEnabled() ) {
245 continue;
246 }
247
248 LOG.info("Starting to process full-sync queue item: {}", queueItem);
249
250 try {
251 MDC.put("why", String.format("qid=%d/", queueItem.id));
252 processQueueItem(queueItem);
253 }
254 catch (Throwable t) {
255 LOG.error("{}: Full-Sync processing failed: {}", new Object[]{getName(), queueItem, t});
256 queueItem.processingCompletedUnsuccessfully(false,
257 "%s: %s",
258 t.getClass().getName(), t.getMessage());
259
260 }
261 finally {
262 MDC.remove("why");
263 }
264 if ( provisioner.getConfig().needsTargetSystemUsers() || provisioner.getConfig().needsTargetSystemGroups()) {
265 LOG.debug("Caching Stats: TSSubject: {} || TSGroup: {}",
266 provisioner.targetSystemUserCache.getStats(), provisioner.targetSystemGroupCache.getStats());
267 }
268
269 }
270 }
271
272 private ProvisionerCoordinator getProvisionerCoordinator() {
273 return ProvisionerFactory.getProvisionerCoordinator(provisioner);
274 }
275
276
277
278
279
280
281
282
283
284
285
286 protected void thread_fullSyncMessageQueueReader(QUEUE_TYPE queueType) {
287 MDC.put("why", String.format("full-sync-message-reader:%s/", queueType.queueName_short));
288 MDC.put("who", getName() + "/");
289
290 String messagingQueueName = queueType.getQueueName_grouperMessaging(this);
291 String subQueueName = queueType.queueName_short;
292
293 LOG.info("{} message reader Starting: {}-->{}",
294 new Object[]{getName(), messagingQueueName, subQueueName});
295
296 GrouperSession gs = GrouperSession.startRootSession();
297
298
299
300
301 int nextSleepTime_secs = 0;
302
303 LOG.info("{} message reader: created queue {} and granted send/receive permission to {}",
304 new Object[]{getName(), messagingQueueName, gs.getSubject()});
305
306 while (true) {
307 if (nextSleepTime_secs > 0) {
308 GrouperUtil.sleep(1000L * nextSleepTime_secs);
309 }
310
311
312 if ( queues.getSubQueue(subQueueName).remainingCapacity() < 0) {
313 LOG.trace("{} message reader: waiting for space in queue {}",
314 getName(), messagingQueueName);
315
316
317 nextSleepTime_secs=1;
318 continue;
319 }
320
321
322
323 nextSleepTime_secs = 0;
324
325 GrouperMessageReceiveParam receive = new GrouperMessageReceiveParam();
326 receive.assignGrouperMessageSystemName(provisioner.getConfig().getGrouperMessagingSystemName());
327 receive.assignQueueName(messagingQueueName);
328 receive.assignMaxMessagesToReceiveAtOnce(queues.getSubQueue(subQueueName).remainingCapacity());
329 receive.assignLongPollMillis(300*1000);
330
331 GrouperMessageReceiveResult grouperMessageReceiveResult = GrouperMessagingEngine.receive(receive);
332
333 Collection<GrouperMessage> grouperMessages;
334 try {
335 LOG.debug("{} message reader: requesting messages from queue {}",
336 getName(), messagingQueueName);
337
338 grouperMessages = grouperMessageReceiveResult.getGrouperMessages();
339 } catch (Exception e) {
340 nextSleepTime_secs = 15;
341 LOG.error("{} message reader: Problem pulling messages from grouper message queue",
342 getName(), e);
343 continue;
344 }
345
346 if (grouperMessages.size() == 0) {
347 nextSleepTime_secs = 5;
348 LOG.debug("{}/{} message reader: no messages received", getName(), subQueueName);
349 continue;
350 }
351
352 LOG.info("{}/{} message reader: received and processing {} messages",
353 getName(), subQueueName, grouperMessages.size());
354
355 try {
356 for (GrouperMessage message : grouperMessages) {
357
358 GrouperMessageAcknowledgeParam messageAcknowledgeParam =
359 new GrouperMessageAcknowledgeParam()
360 .assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed)
361 .assignQueueName(messagingQueueName)
362 .assignGrouperMessageSystemName(GrouperBuiltinMessagingSystem.BUILTIN_NAME)
363 .addGrouperMessage(message);
364
365 String body = message.getMessageBody();
366 LOG.debug("{}/{} message reader: Processing grouper message {} = {}",
367 new Object[]{getName(), subQueueName,message.getId(), body});
368
369 FullSyncQueueItem queueItem = FullSyncQueueItem.fromJson(queueType, getConfigName(), body);
370
371 if ( queueItem==null ) {
372 LOG.error("{}: Skipping invalid full-sync message: {}", getConfigName(), body);
373 GrouperMessagingEngine.acknowledge(messageAcknowledgeParam);
374 continue;
375 }
376 else {
377 queueItem.messageToAcknowledge = messageAcknowledgeParam;
378 }
379
380 LOG.info("{} message reader: queuing onto {} queue: {}", getName(), subQueueName, queueItem);
381
382 queues.getSubQueue(subQueueName).put(queueItem);
383 }
384 } catch (Exception e) {
385 nextSleepTime_secs = 15;
386 LOG.error("{} message reader: Problem while reading message and queuing it internally",
387 getName(), e);
388 }
389 }
390 }
391
392 protected void setUpGrouperMessagingQueue(QUEUE_TYPE queueType) {
393 GrouperSession gs = GrouperSession.staticGrouperSession();
394 String messagingQueueName = queueType.getQueueName_grouperMessaging(this);
395
396
397
398 synchronized (grouperMessagingQueueSetupLock) {
399 GrouperBuiltinMessagingSystem.createQueue(messagingQueueName);
400 if (!GrouperBuiltinMessagingSystem.allowedToSendToQueue(messagingQueueName, gs.getSubject())) {
401 LOG.info("Queue permission: Granting send permission to subject={} for queue={}", messagingQueueName, gs.getSubject());
402 GrouperBuiltinMessagingSystem.allowSendToQueue(messagingQueueName, gs.getSubject());
403 }
404
405 if (!GrouperBuiltinMessagingSystem.allowedToReceiveFromQueue(messagingQueueName, gs.getSubject())) {
406 LOG.info("Queue permission: Granting receive permission to subject={} for queue={}", messagingQueueName, gs.getSubject());
407 GrouperBuiltinMessagingSystem.allowReceiveFromQueue(messagingQueueName, gs.getSubject());
408 }
409 }
410 }
411
412
413 protected void processQueueItem(FullSyncQueueItem queueItem) throws PspException {
414 switch (queueItem.command) {
415 case FULL_SYNC_GROUP:
416 DateTime lastSuccessFullSync = lastSuccessfulFullSyncDate.get(queueItem.groupName);
417 if ( lastSuccessFullSync!=null && queueItem.asofDate!=null && lastSuccessFullSync.isAfter(queueItem.asofDate) ) {
418 queueItem.processingCompletedSuccessfully("Skipping redundant full-sync. Group was full-synced successfully on %s which is after (asOf date) %s",
419 lastSuccessFullSync, queueItem.asofDate);
420 return;
421 }
422
423
424 queueItem.startStep("ReadGroupFromGrouper");
425 GrouperGroupInfo grouperGroupInfo = provisioner.getGroupInfoOfExistingGroup(queueItem.groupName);
426 if ( grouperGroupInfo==null ) {
427 LOG.error("{}: Group not found for full-sync: '{}'", getConfigName(), queueItem.groupName);
428 queueItem.processingCompletedUnsuccessfully(false, "Group not found");
429 return;
430 }
431
432 if ( provisioner.shouldGroupBeProvisioned(grouperGroupInfo) ) {
433 boolean changesWereMade=false;
434
435 for(int i=0; i<provisioner.getConfig().getMaxNumberOfTimesToRepeatedlyFullSyncGroup(); i++) {
436
437
438 if (i>0) {
439 LOG.info("{}: FullSync #{} of {}: Disabling group caching to ensure most recent information is used",
440 getConfigName(), i+1, grouperGroupInfo);
441 provisioner.uncacheGroup(grouperGroupInfo, null);
442 }
443
444
445
446 if (i==1) {
447 LOG.info("{}: Repeating full sync of {} to make sure changes made in first full sync did not clobber incremental changes", getConfigName(), grouperGroupInfo);
448 } else if (i>1) {
449 LOG.warn("{}: Full sync of {} continues to make changes ({} times so far). The group is probably changing frequently, either within grouper or directly on target system",
450 getConfigName(), grouperGroupInfo, i);
451
452
453 GrouperUtil.sleep(provisioner.getConfig().getTimeToSleepBetweenRepeatedFullSyncs_ms());
454 }
455
456 changesWereMade = fullSyncGroup(grouperGroupInfo, queueItem);
457
458 if (!changesWereMade) {
459
460 break;
461 }
462 }
463
464
465 if ( changesWereMade ) {
466 LOG.warn("{}: FullSync of {} was done {} times looking for stability, but the final one still required changes. There is a small possibility that realtime changes have been provisioned incorrectly and will be addressed during a future full sync.",
467 getConfigName(), grouperGroupInfo,
468 provisioner.getConfig().getMaxNumberOfTimesToRepeatedlyFullSyncGroup());
469
470
471 }
472 } else {
473 queueItem.processingCompletedUnsuccessfully(false, "Group is not selected for provisioning");
474 }
475 break;
476 case CLEANUP:
477
478 processGroupCleanup(queueItem);
479 break;
480 case FULL_SYNC_ALL_GROUPS:
481 List<FullSyncQueueItem> scheduledItems = queueAllGroupsForFullSync(queueItem.sourceQueue, queueItem.externalReference, "Requested by message: %s", queueItem);
482 for (FullSyncQueueItem scheduledItem : scheduledItems) {
483 LOG.info("{} message reader: Group is queued for full sync: {}", getName(), scheduledItem);
484 }
485 queueItem.processingCompletedSuccessfully("Scheduled %d groups for full sync", scheduledItems.size());
486 break;
487 default:
488 queueItem.processingCompletedUnsuccessfully(false, "Full-sync command not known: %s", queueItem.command);
489 }
490 }
491
492
493
494
495
496
497
498 protected FullSyncQueueItem getNextFullSyncRequest() {
499 FullSyncQueueItem result = null;
500 while (result == null) {
501 try {
502 result = queues.poll(5, TimeUnit.MINUTES);
503 if (result == null) {
504 LOG.debug("{}: No full sync requests found", getName());
505 continue;
506 }
507
508 result.wasDequeued();
509
510
511
512
513
514 if (result.wakeTimeDate!=null && result.wakeTimeDate.isAfterNow()) {
515 LOG.trace("Found fullSyncQueueItem that wasn't ready. Requeuing and sleeping: {} isn't ready until {}",
516 result.id, result.wakeTimeDate);
517
518 requeue(result, false);
519 result = null;
520 Thread.sleep(5000);
521 continue;
522 }
523
524 LOG.info("{}: Next full-sync request: {}", getName(), result);
525
526 } catch (InterruptedException e) {
527
528 }
529 }
530 return result;
531 }
532
533
534 public JobStatistics startFullSyncOfAllGroupsAndWaitForCompletion(Hib3GrouperLoaderLog hib3GrouploaderLog) throws PspException {
535 Date startDate = new Date();
536
537 List<FullSyncQueueItem> queuedGroupSyncs
538 = queueAllGroupsForFullSync(QUEUE_TYPE.SCHEDULED_LOCAL, null,"Scheduled full sync");
539
540 boolean everythingHasBeenCompleted = false;
541 Date statusLastLoggedDate = null;
542
543
544 int doneCount=0;
545 while ( !everythingHasBeenCompleted ) {
546 JobStatisticsbStatistics.html#JobStatistics">JobStatistics statisticsSoFar = new JobStatistics(startDate);
547
548 doneCount = 0;
549 everythingHasBeenCompleted=true;
550 for (FullSyncQueueItem item : queuedGroupSyncs) {
551 if ( item.hasBeenProcessed() ) {
552 statisticsSoFar.add(item.stats);
553 doneCount++;
554 }
555 else {
556 everythingHasBeenCompleted=false;
557 }
558 }
559
560
561 if ( everythingHasBeenCompleted ||
562 statusLastLoggedDate==null ||
563 (System.currentTimeMillis()-statusLastLoggedDate.getTime())/1000L > FULL_SYNC_PROGRESS_INTERVAL_SECS ) {
564 String status = String.format("%d groups of %d (%d%%)",
565 doneCount, queuedGroupSyncs.size(),
566 (int)(100.0 * doneCount / queuedGroupSyncs.size()));
567
568 LOG.info("{}: Full Sync of all groups progress: {}: {}", getName(), status, statisticsSoFar);
569
570 statisticsSoFar.updateLoaderLog(hib3GrouploaderLog);
571 hib3GrouploaderLog.setJobMessage(status);
572 hib3GrouploaderLog.store();
573
574 statusLastLoggedDate = new Date();
575 }
576
577
578 if ( !everythingHasBeenCompleted ) {
579 GrouperUtil.sleep(250);
580 }
581 }
582
583 JobStatistics/JobStatistics.html#JobStatistics">JobStatistics overallStats = new JobStatistics();
584
585 for (FullSyncQueueItem item : queuedGroupSyncs) {
586 overallStats.add(item.stats);
587 }
588 overallStats.done();
589
590 LOG.info("{}: Full Sync of all groups: Finished. Stats: {}", getName(), overallStats);
591 return overallStats;
592 }
593
594
595
596
597
598 protected List<FullSyncQueueItem> queueAllGroupsForFullSync(QUEUE_TYPE queue,
599 String externalReference,
600 String reasonFormat, Object... reasonArgs) throws PspException {
601 String reason = String.format(reasonFormat, reasonArgs);
602 LOG.info("{}: Queuing ({}) all groups for full sync. ({})", getName(), queue.queueName_short, reason);
603 List<FullSyncQueueItem> result = new ArrayList<>();
604
605 Collection<GrouperGroupInfo> allGroups = provisioner.getAllGroupsForProvisioner();
606 for ( GrouperGroupInfo group : allGroups ) {
607 result.add(scheduleGroupForSync(queue, group.name, externalReference, reason));
608 }
609
610 if ( provisioner.config.isGrouperAuthoritative()) {
611 FullSyncQueueItem cleanupItem = scheduleGroupCleanup(queue, externalReference, reason);
612
613 if ( cleanupItem != null ) {
614 result.add(cleanupItem);
615 }
616 }
617
618 return result;
619 }
620
621
622
623
624
625
626
627
628 public FullSyncQueueItem scheduleGroupForSync(
629 QUEUE_TYPE queue,
630 String groupName,
631 String externalReference,
632 String reasonFormat, Object... reasonArgs) {
633 String reason = String.format(reasonFormat, reasonArgs);
634 FullSyncQueueItemFullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem queueItem = new FullSyncQueueItem(getConfigName(), queue, groupName, reason);
635 queueItem.externalReference = externalReference;
636
637 LOG.debug("[qid={}] Scheduling group for {} full-sync: {}: {}",
638 queueItem.id,
639 queue.queueName_short,
640 groupName,
641 reason
642 );
643 return queue(queue, queueItem);
644 }
645
646 private String getConfigName() {
647 return provisioner.getConfigName();
648 }
649
650
651
652
653
654
655 public FullSyncQueueItem scheduleGroupCleanup(QUEUE_TYPE queue, String externalReference, String reasonFormat, Object... reasonArgs) {
656 String reason = String.format(reasonFormat, reasonArgs);
657
658 if ( provisioner.config.isGrouperAuthoritative() ) {
659 FullSyncQueueItemFullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem queueItem = new FullSyncQueueItem(
660 getConfigName(), queue,
661 FULL_SYNC_COMMAND.CLEANUP,
662 reason);
663 queueItem.externalReference = externalReference;
664 LOG.debug("Scheduling group cleanup [{}]: {}", queue.name(), queueItem);
665
666 return queue(queue, queueItem);
667 } else {
668 LOG.warn("Ignoring group-cleanup request because grouper is not authoritative within the target system");
669 return null;
670 }
671 }
672
673 protected FullSyncQueueItemet2/middleware/grouper/pspng/FullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem queue(QUEUE_TYPE queue, FullSyncQueueItem queueItem)
674 {
675
676 queueItem.mostRecentDequeueTime=null;
677
678 if (queue.usesGrouperMessagingQueue)
679 {
680
681 String queueItemJson = queueItem.toJson();
682 GrouperMessageSendParam sendParam = new GrouperMessageSendParam()
683 .assignGrouperMessageSystemName(provisioner.getConfig().getGrouperMessagingSystemName())
684 .assignQueueOrTopicName(queue.getQueueName_grouperMessaging(this))
685 .assignQueueType(GrouperMessageQueueType.queue)
686 .addMessageBody(queueItemJson);
687 GrouperMessagingEngine.send(sendParam);
688 } else {
689 try {
690 queues.getSubQueue(queue.queueName_short).put(queueItem);
691 } catch (InterruptedException e)
692 {
693 LOG.error("Interrupted while adding to in-memory queue {}/{}: {}",
694 new Object[] {queue.name(), queue.queueName_short, queueItem});
695 }
696 }
697
698 return queueItem;
699 }
700
701
702
703
704
705
706
707
708
709
710 protected FullSyncQueueItem../../edu/internet2/middleware/grouper/pspng/FullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem requeue(FullSyncQueueItem queueItem, boolean processingFailed) {
711
712
713 if ( processingFailed ) {
714 queueItem.incrementRetryCount();
715 }
716
717 if (queueItem.sourceQueue.usesGrouperMessagingQueue) {
718 queue(QUEUE_TYPE.RETRY, queueItem);
719 }
720 else
721 queue(QUEUE_TYPE.RETRY_LOCAL, queueItem);
722
723 return queueItem;
724 }
725
726
727
728
729
730
731
732
733
734 protected boolean fullSyncGroup(GrouperGroupInfo _grouperGroupInfo, FullSyncQueueItem fullSyncQueueItem) {
735 fullSyncQueueItem.startStep("ClearingGroupCache");
736
737 provisioner.uncacheGroup(_grouperGroupInfo, null);
738 provisioner.targetSystemGroupCache.clear();
739
740 GrouperGroupInfo grouperGroupInfo = provisioner.getGroupInfoOfExistingGroup(_grouperGroupInfo.getName());
741
742 ProvisioningWorkItem workItem = ProvisioningWorkItem.createForFullSync(grouperGroupInfo, fullSyncQueueItem.asofDate);
743 final List<ProvisioningWorkItem> workItems = Arrays.asList(workItem);
744
745 fullSyncQueueItem.startStep("StartCoordination");
746 provisioner.startCoordination(workItems);
747 try {
748 MDC.put("what", String.format("%s/", grouperGroupInfo));
749 MDC.put("why", String.format("QID=%d/ExtRef=%s/", fullSyncQueueItem.id, fullSyncQueueItem.externalReference));
750 MDC.put("step", "start/");
751 LOG.info("{}: Starting Full-Sync ({}) of group {}",
752 new Object[]{getName(), fullSyncQueueItem.reason, grouperGroupInfo});
753
754 fullSyncQueueItem.startStep("StartProvisioning(get group & subject info)");
755 provisioner.startProvisioningBatch(workItems);
756
757 MDC.put("step", "doit/");
758
759 provisioner.setCurrentWorkItem(workItem);
760
761 fullSyncQueueItem.startStep("doFullSync");
762 boolean changesWereNecessary = provisioner.doFullSync(grouperGroupInfo, fullSyncQueueItem.asofDate, fullSyncQueueItem.stats);
763
764 MDC.put("step", "finsh/");
765 fullSyncQueueItem.startStep("FinishProvisioning");
766 provisioner.finishProvisioningBatch(workItems);
767
768 fullSyncQueueItem.startStep("FinishCoordination");
769 provisioner.finishCoordination( workItems, true);
770
771 lastSuccessfulFullSyncDate.put(fullSyncQueueItem.groupName, DateTime.now());
772 fullSyncQueueItem.processingCompletedSuccessfully("Success");
773 return changesWereNecessary;
774 } catch (PspException e) {
775 LOG.error("{}: Problem doing full sync. Requeuing group {}",
776 new Object[]{ getName(), grouperGroupInfo, e} );
777
778 fullSyncQueueItem.processingCompletedUnsuccessfully(true, "%s: %s", e.getClass().getName(), e.getMessage());
779
780 requeue(fullSyncQueueItem, true);
781
782 return false;
783 } catch (Throwable e) {
784 LOG.error("{}: Problem doing full sync. Requeuing group {}",
785 new Object[] {getName(), grouperGroupInfo, e });
786 fullSyncQueueItem.processingCompletedUnsuccessfully(true, "%s: %s", e.getClass().getName(), e.getMessage());
787 requeue(fullSyncQueueItem, true);
788 return false;
789 }
790 finally {
791 provisioner.finishCoordination(workItems, false);
792 MDC.remove("step");
793 MDC.remove("what");
794 MDC.remove("why");
795 }
796 }
797
798 protected boolean processGroupCleanup(FullSyncQueueItem queueItem) {
799 try {
800 MDC.put("what", "group_cleanup/");
801 MDC.put("why", String.format("QID=%d/ExtRef=%s/", queueItem.id, queueItem.externalReference));
802
803 LOG.info("{}: Starting Group Cleanup ({})", getName(), queueItem.reason);
804 ProvisioningWorkItem workItem = ProvisioningWorkItem.createForGroupCleanup(queueItem.asofDate);
805 MDC.put("step", "start/");
806 provisioner.startProvisioningBatch(Arrays.asList(workItem));
807
808 MDC.put("step", "doit/");
809 provisioner.setCurrentWorkItem(workItem);
810 provisioner.prepareAndRunGroupCleanup(queueItem.stats);
811
812 MDC.put("step", "finish/");
813 provisioner.finishProvisioningBatch(Arrays.asList(workItem));
814 queueItem.processingCompletedSuccessfully("Success");
815 return true;
816 } catch (PspException e) {
817 LOG.error("{}: Problem doing group cleanup",
818 getName(), e );
819 queueItem.processingCompletedUnsuccessfully(false, "%s: %s", e.getClass().getName(), e.getMessage());
820 return false;
821 }
822 catch (Throwable e) {
823 LOG.error("{}: Problem doing group cleanup",
824 getName(), e );
825 queueItem.processingCompletedUnsuccessfully(false, "%s: %s", e.getClass().getName(), e.getMessage());
826 return false;
827 }
828 finally {
829 MDC.remove("step");
830 MDC.remove("what");
831 MDC.remove("why");
832 }
833
834 }
835
836 public DateTime getLastSuccessfulFullSyncDate(String groupName) {
837 return lastSuccessfulFullSyncDate.get(groupName);
838 }
839
840
841
842 }