1 package edu.internet2.middleware.grouper.pspng;
2
3 import edu.internet2.middleware.grouper.GrouperSession;
4 import edu.internet2.middleware.grouper.exception.GrouperStaleStateException;
5 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeParam;
6 import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;
7 import net.sf.json.JSONObject;
8 import net.sf.json.JsonConfig;
9 import net.sf.json.util.PropertyFilter;
10 import org.apache.commons.lang.StringUtils;
11 import org.joda.time.DateTime;
12 import org.joda.time.Duration;
13 import org.joda.time.Instant;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.LinkedHashMap;
20 import java.util.Map;
21 import java.util.concurrent.atomic.AtomicInteger;
22
23 class FullSyncQueueItem {
24 private static Logger LOG = LoggerFactory.getLogger(FullSyncQueueItem.class);
25
26
27 private static final AtomicInteger queueItemCounter = new AtomicInteger();
28
29 final public static String ID_PROPERTY = "id";
30 int id;
31
32
33 final public static String PROVISIONER_NAME_PROPERTY = "provisioner_name";
34 final String provisionerName;
35
36
37 final public static String SOURCE_PROPERTY = "source";
38 String source;
39
40
41 final public static String REASON_PROPERTY = "reason";
42 String reason;
43
44
45 final public static String EXTERNAL_REFERENCE = "external_reference";
46 String externalReference;
47
48
49 final public static String RETRY_COUNT_PROPERTY = "retry_count";
50 int retryCount=0;
51
52
53 final public static String ASOF_JEPOCH_PROPERTY = "asof_jepoch";
54 DateTime asofDate = new DateTime();
55
56
57 final public static String FIRST_QUEUED_JEPOCH_PROPERTY = "first_queued_jepoch";
58 DateTime firstQueuedDate = new DateTime();
59
60
61
62 final public static String MOST_RECENT_QUEUED_JEPOCH_PROPERTY = "most_recent_queued_jepoch";
63 DateTime mostRecentQueuedDate = new DateTime();
64
65
66
67 transient DateTime mostRecentDequeueTime = null;
68
69
70
71 final public static String WAKE_TIME_JEPOCH_PROPERTY = "wake_time_jepoch";
72 DateTime wakeTimeDate=null;
73
74 transient JobStatisticsr/pspng/JobStatistics.html#JobStatistics">JobStatistics stats = new JobStatistics();
75
76
77 final public static String REPLY_QUEUE = "reply_queue";
78 String replyQueue;
79
80
81 transient DateTime processingCompletionTime = null;
82 transient Map<String, Duration> processingStepTimingMeasurements = new LinkedHashMap<>();
83 transient String currentProcessingStep;
84 transient Instant currentProcessingStepStartTime;
85
86
87 final public static String WAS_SUCCESSFUL_PROPERTY="was_successful";
88 Boolean wasSuccessful;
89
90
91 final public static String WILL_BE_RETRIED_PROPERTY="will_be_retried";
92 Boolean willBeRetried;
93
94
95
96 transient GrouperMessageAcknowledgeParam messageToAcknowledge;
97
98
99 transient FullSyncProvisioner.QUEUE_TYPE sourceQueue;
100
101
102 final public static String COMMAND_PROPERTY="command";
103 FullSyncProvisioner.FULL_SYNC_COMMAND command;
104
105
106 final public static String GROUP_PROPERTY = "group";
107 String groupName;
108
109
110 final public static String PROCESSING_RESULT_MESSAGE_PROPERTY = "processing_result_message";
111 String processingResultMessage;
112
113
114 public FullSyncQueueItem(String provisionerName, FullSyncProvisioner.QUEUE_TYPE sourceQueue,
115 FullSyncProvisioner.FULL_SYNC_COMMAND command, String reason)
116 {
117 this.id = queueItemCounter.incrementAndGet();
118 this.sourceQueue = sourceQueue;
119 this.command=command;
120 this.provisionerName = provisionerName;
121 this.reason=reason;
122 messageToAcknowledge=null;
123 }
124
125 public FullSyncQueueItem(String provisionerName, FullSyncProvisioner.QUEUE_TYPE sourceQueue,
126 String groupName, String reason) {
127 this(provisionerName, sourceQueue, FullSyncProvisioner.FULL_SYNC_COMMAND.FULL_SYNC_GROUP, groupName, reason, null);
128 }
129
130 public FullSyncQueueItem(String provisionerName, FullSyncProvisioner.QUEUE_TYPE sourceQueue,
131 FullSyncProvisioner.FULL_SYNC_COMMAND command,
132 String groupName, String reason,
133 GrouperMessageAcknowledgeParam messageToAcknowledge) {
134 this.id = queueItemCounter.incrementAndGet();
135 this.sourceQueue = sourceQueue;
136 this.command= command;
137 this.provisionerName = provisionerName;
138 this.groupName = groupName;
139 this.reason = reason;
140 this.messageToAcknowledge = messageToAcknowledge;
141 }
142
143
144 public static FullSyncQueueItem fromJson(FullSyncProvisioner.QUEUE_TYPE sourceQueue, String provisionerName, String jsonString) {
145
146 if ( !jsonString.startsWith("{") ) {
147 if ( jsonString.equalsIgnoreCase(FullSyncProvisioner.FULL_SYNC_ALL_GROUPS))
148 return new FullSyncQueueItem(provisionerName, sourceQueue, FullSyncProvisioner.FULL_SYNC_COMMAND.FULL_SYNC_ALL_GROUPS, "from old-format message");
149 else
150 return new FullSyncQueueItem(
151 provisionerName,
152 sourceQueue,
153 jsonString,
154 "from old-format message");
155 }
156
157 JSONObject jsonObject = JSONObject.fromObject(jsonString);
158
159 String commandString = jsonObject.getString(COMMAND_PROPERTY);
160
161 if ( commandString==null )
162 {
163 LOG.error("{}: Json of FullSyncQueueItem did not include {}: {}", provisionerName, COMMAND_PROPERTY, jsonString);
164 return null;
165 }
166
167
168 String reason = jsonObject.getString(REASON_PROPERTY);
169
170 FullSyncQueueItemng/FullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem result = new FullSyncQueueItem(
171 provisionerName,
172 sourceQueue,
173 FullSyncProvisioner.FULL_SYNC_COMMAND.valueOf(commandString),
174 reason);
175
176
177
178 if ( jsonObject.containsKey(ID_PROPERTY) )
179 result.id = jsonObject.getInt(ID_PROPERTY);
180
181 if ( jsonObject.containsKey(SOURCE_PROPERTY) )
182 result.source = jsonObject.getString(SOURCE_PROPERTY);
183 if ( jsonObject.containsKey(EXTERNAL_REFERENCE) )
184 result.externalReference = jsonObject.getString(EXTERNAL_REFERENCE);
185
186 if ( jsonObject.containsKey(GROUP_PROPERTY) )
187 result.groupName = jsonObject.getString(GROUP_PROPERTY);
188
189 if ( jsonObject.containsKey(FIRST_QUEUED_JEPOCH_PROPERTY) )
190 result.firstQueuedDate = new DateTime(jsonObject.getLong(FIRST_QUEUED_JEPOCH_PROPERTY));
191
192 if ( jsonObject.containsKey(MOST_RECENT_QUEUED_JEPOCH_PROPERTY) )
193 result.mostRecentQueuedDate = new DateTime(jsonObject.getLong(MOST_RECENT_QUEUED_JEPOCH_PROPERTY));
194
195 if ( jsonObject.containsKey(ASOF_JEPOCH_PROPERTY) )
196 result.asofDate = new DateTime(jsonObject.getLong(ASOF_JEPOCH_PROPERTY));
197
198 if ( jsonObject.containsKey(WAS_SUCCESSFUL_PROPERTY) )
199 result.wasSuccessful = jsonObject.getBoolean(WAS_SUCCESSFUL_PROPERTY);
200
201 if ( jsonObject.containsKey(WILL_BE_RETRIED_PROPERTY) )
202 result.willBeRetried = jsonObject.getBoolean(WILL_BE_RETRIED_PROPERTY);
203
204 if ( jsonObject.containsKey(REPLY_QUEUE) )
205 result.replyQueue = jsonObject.getString(REPLY_QUEUE);
206
207 if ( jsonObject.containsKey(RETRY_COUNT_PROPERTY) )
208 result.retryCount = jsonObject.getInt(RETRY_COUNT_PROPERTY);
209
210 if ( jsonObject.containsKey(WAKE_TIME_JEPOCH_PROPERTY) )
211 result.wakeTimeDate = new DateTime(jsonObject.getLong(WAKE_TIME_JEPOCH_PROPERTY));
212
213 if ( jsonObject.containsKey(PROCESSING_RESULT_MESSAGE_PROPERTY) )
214 result.processingResultMessage = jsonObject.getString(PROCESSING_RESULT_MESSAGE_PROPERTY);
215
216 if ( result.command== FullSyncProvisioner.FULL_SYNC_COMMAND.FULL_SYNC_GROUP &&
217 result.groupName == null ) {
218 LOG.error("{}: Json of Group-command FullSyncQueueItem did not include {}: {}", provisionerName, GROUP_PROPERTY, jsonString);
219 return null;
220 }
221
222 return result;
223 }
224
225 public String toJson() {
226
227
228
229
230
231 Map<String, Object> result = new HashMap<>();
232
233 result.put(ID_PROPERTY, id);
234 result.put(PROVISIONER_NAME_PROPERTY, provisionerName);
235 result.put(COMMAND_PROPERTY, command.name());
236
237 result.put(SOURCE_PROPERTY, source);
238 result.put(EXTERNAL_REFERENCE, externalReference);
239 result.put(REASON_PROPERTY, reason);
240
241 result.put(GROUP_PROPERTY, groupName);
242
243 result.put(FIRST_QUEUED_JEPOCH_PROPERTY, firstQueuedDate.getMillis());
244 result.put(ASOF_JEPOCH_PROPERTY, asofDate.getMillis());
245 result.put(MOST_RECENT_QUEUED_JEPOCH_PROPERTY, mostRecentQueuedDate.getMillis());
246
247 if ( wakeTimeDate!=null ) {
248 result.put(WAKE_TIME_JEPOCH_PROPERTY, wakeTimeDate.getMillis());
249 }
250
251 if ( wasSuccessful!= null ) {
252 result.put(WAS_SUCCESSFUL_PROPERTY, wasSuccessful ? "true" : "false");
253 }
254
255 result.put(REPLY_QUEUE, replyQueue);
256 result.put(RETRY_COUNT_PROPERTY, retryCount);
257
258 if ( willBeRetried!=null ) {
259 result.put(WILL_BE_RETRIED_PROPERTY, willBeRetried ? "true" : "false");
260 }
261
262 result.put(PROCESSING_RESULT_MESSAGE_PROPERTY, processingResultMessage);
263
264
265
266
267
268
269 JsonConfig jsonConfig = new JsonConfig();
270 jsonConfig.setJsonPropertyFilter(new PropertyFilter() {
271 public boolean apply(Object source, String name, Object value) {
272 return value == null;
273 }
274 });
275 JSONObject jsonObject = JSONObject.fromObject(result, jsonConfig);
276 String json = jsonObject.toString();
277 return json;
278 }
279
280
281 public void wasDequeued() {
282 mostRecentDequeueTime=DateTime.now();
283 currentProcessingStep="init";
284 currentProcessingStepStartTime = Instant.now();
285 }
286
287 public void startStep(String stepLabel) {
288
289 if ( currentProcessingStep!=null ) {
290 Duration processingTimePeriod = new Duration(currentProcessingStepStartTime, Instant.now());
291 processingStepTimingMeasurements.put(currentProcessingStep, processingTimePeriod);
292 }
293
294 currentProcessingStep=stepLabel;
295 currentProcessingStepStartTime = Instant.now();
296 }
297
298
299 public FullSyncQueueItem setSourceQueue(FullSyncProvisioner.QUEUE_TYPE queueType) {
300 sourceQueue = queueType;
301 return this;
302 }
303
304
305 public void processingCompletedSuccessfully(String messageFormat, Object... messageArgs) {
306
307 if ( stats.processingCompletedTime != null ) {
308 return;
309 }
310
311 if (StringUtils.isNotEmpty(messageFormat))
312 processingResultMessage = String.format("Successful: " + messageFormat, messageArgs);
313 else
314 processingResultMessage = "Successful";
315
316 wasSuccessful = true;
317
318 processingCompleted();
319 }
320
321 private void processingCompleted() {
322 stats.processingCompletedTime = new Date();
323 startStep(null);
324 processingCompletionTime = DateTime.now();
325 LOG.info("FullSync Item done ({}). Stats: {}/{}: {}",
326 processingResultMessage, stats, getProcessingTimeBreakdown(), this);
327
328 acknowledgeMessage();
329 }
330
331 public void processingCompletedUnsuccessfully(boolean willBeRetried, String messageFormat, Object... messageArgs) {
332
333 if ( stats.processingCompletedTime != null ) {
334 return;
335 }
336 this.willBeRetried=willBeRetried;
337 processingResultMessage = String.format(messageFormat, messageArgs);
338
339 wasSuccessful = false;
340 processingCompleted();
341 }
342
343 public void acknowledgeMessage() {
344 if ( messageToAcknowledge != null ) {
345 LOG.debug("Acknowledging that message was processed: {}", this);
346 try {
347 GrouperMessagingEngine.acknowledge(messageToAcknowledge);
348 } catch (GrouperStaleStateException gsse) {
349 LOG.debug("error acknowleding message", gsse);
350
351 }
352 }
353 messageToAcknowledge = null;
354 }
355
356 public boolean hasBeenProcessed() {
357 return stats.processingCompletedTime != null;
358 }
359
360 public Duration getTimeSpentInQueue() {
361 if ( mostRecentDequeueTime==null ) {
362 return new Duration(mostRecentQueuedDate, Instant.now());
363 } else {
364 return new Duration(mostRecentQueuedDate, mostRecentDequeueTime);
365 }
366 }
367
368 public Duration getTimeSinceFirstQueued() {
369 return new Duration(firstQueuedDate, Instant.now());
370 }
371
372 public String getRequestedAction() {
373 switch (command) {
374 case FULL_SYNC_GROUP:
375 return groupName;
376 case CLEANUP:
377 return "Extra-Group Cleanup";
378 case FULL_SYNC_ALL_GROUPS:
379 return "All-Groups";
380 }
381 return "Unknown-command-"+command.name();
382 }
383
384 public String toString() {
385 return String.format("Action=%s|qid=%d|Trigger=%s|ExternalRef=%s|AsOf=%s|QTime=%s|Age=%s",
386 getRequestedAction(), id, reason, externalReference,
387 PspUtils.formatDate_DateHoursMinutes(asofDate, "none"),
388 PspUtils.formatElapsedTime(getTimeSpentInQueue()),
389 PspUtils.formatElapsedTime(getTimeSinceFirstQueued()));
390 }
391
392 public String getProcessingTimeBreakdown() {
393 StringBuilder result = new StringBuilder();
394
395 Duration totalProcessingTime;
396 if ( mostRecentDequeueTime == null )
397 return "NotYetProcessed";
398
399 if ( processingCompletionTime==null ) {
400 totalProcessingTime = new Duration(mostRecentDequeueTime, Instant.now());
401 result.append(String.format("ProcTime(SoFar): %s", PspUtils.formatElapsedTime(totalProcessingTime)));
402 } else {
403 totalProcessingTime = new Duration(mostRecentDequeueTime, processingCompletionTime);
404 result.append(String.format("ProcTime: %s", PspUtils.formatElapsedTime(totalProcessingTime)));
405 }
406
407 if (processingStepTimingMeasurements.size()>0 && totalProcessingTime.getMillis()>0) {
408 result.append(" Timing breakdown: ");
409
410 for (Map.Entry<String, Duration> processingStep : processingStepTimingMeasurements.entrySet()) {
411 result.append(String.format("%s=%d%%/", processingStep.getKey(), (int) (100 * processingStep.getValue().getMillis() / totalProcessingTime.getMillis())));
412 }
413 }
414
415 return result.toString();
416 }
417
418
419 public void incrementRetryCount() {
420 retryCount++;
421 mostRecentQueuedDate = new DateTime();
422
423
424 int sleepTime_secs = Math.min(10*retryCount, 900);
425
426 wakeTimeDate = new DateTime().withDurationAdded(1000*sleepTime_secs, 1);
427 }
428 }