View Javadoc
1   package edu.internet2.middleware.grouper.pspng;
2   
3   import edu.internet2.middleware.grouper.GrouperSession;
4   import edu.internet2.middleware.grouper.exception.GrouperStaleStateException;
5   import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeParam;
6   import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;
7   import net.sf.json.JSONObject;
8   import net.sf.json.JsonConfig;
9   import net.sf.json.util.PropertyFilter;
10  import org.apache.commons.lang.StringUtils;
11  import org.joda.time.DateTime;
12  import org.joda.time.Duration;
13  import org.joda.time.Instant;
14  import org.slf4j.Logger;
15  import org.slf4j.LoggerFactory;
16  
17  import java.util.Date;
18  import java.util.HashMap;
19  import java.util.LinkedHashMap;
20  import java.util.Map;
21  import java.util.concurrent.atomic.AtomicInteger;
22  
23  class FullSyncQueueItem {
24    private static Logger LOG = LoggerFactory.getLogger(FullSyncQueueItem.class);
25  
26    // This is used to give each queue item a unique number (at least for the current daemon run)
27    private static final AtomicInteger queueItemCounter = new AtomicInteger();
28  
29    final public static String ID_PROPERTY = "id";
30    int id;
31  
32    // What provisioner is this message targeting?
33    final public static String PROVISIONER_NAME_PROPERTY = "provisioner_name";
34    final String provisionerName;
35  
36    // Source of the full-sync request
37    final public static String SOURCE_PROPERTY = "source";
38    String source;
39  
40    // Why is the full-sync requested
41    final public static String REASON_PROPERTY = "reason";
42    String reason;
43  
44    // An external reference for logging or reply correlation
45    final public static String EXTERNAL_REFERENCE = "external_reference";
46    String externalReference;
47  
48    // how many times has this full-sync request been processed
49    final public static String RETRY_COUNT_PROPERTY = "retry_count";
50    int retryCount=0;
51  
52    // How old can the cached information be used. Defaults to firstQueuedDate
53    final public static String ASOF_JEPOCH_PROPERTY = "asof_jepoch";
54    DateTime asofDate = new DateTime();
55  
56    // When was this request first brought into grouper? This stays the same across Retries
57    final public static String FIRST_QUEUED_JEPOCH_PROPERTY = "first_queued_jepoch";
58    DateTime firstQueuedDate = new DateTime();
59  
60  
61    // When was this request most recently queued or requeued
62    final public static String MOST_RECENT_QUEUED_JEPOCH_PROPERTY = "most_recent_queued_jepoch";
63    DateTime mostRecentQueuedDate = new DateTime();
64  
65    // When was this object pulled out of a queue. This allows us to
66    // calculate how long object spent in queue, even after it's been in memory for a while
67    transient DateTime mostRecentDequeueTime = null;
68  
69    // Should this request wait for a while before being processed. This is used to
70    // implement an 'exponential' backoff as retry_count increases
71    final public static String WAKE_TIME_JEPOCH_PROPERTY = "wake_time_jepoch";
72    DateTime wakeTimeDate=null;
73  
74    transient JobStatisticsr/pspng/JobStatistics.html#JobStatistics">JobStatistics stats = new JobStatistics();
75  
76    // Should this FullSyncQueueItem be sent to a queue when processed
77    final public static String REPLY_QUEUE = "reply_queue";
78    String replyQueue;
79  
80    // Tracking/measuring how long processing took
81    transient DateTime processingCompletionTime = null;
82    transient Map<String, Duration> processingStepTimingMeasurements = new LinkedHashMap<>();
83    transient String currentProcessingStep;
84    transient Instant currentProcessingStepStartTime;
85  
86    // Was this request processed successfully
87    final public static String WAS_SUCCESSFUL_PROPERTY="was_successful";
88    Boolean wasSuccessful;
89  
90    // Will this (unsuccessful) request be retried
91    final public static String WILL_BE_RETRIED_PROPERTY="will_be_retried";
92    Boolean willBeRetried;
93  
94    // what acknowledgement to pass to messaging when we've handled this group
95    // This comes from the GrouperMessagingSystem
96    transient GrouperMessageAcknowledgeParam messageToAcknowledge;
97  
98    // What queue brought this event into Grouper
99    transient FullSyncProvisioner.QUEUE_TYPE sourceQueue;
100 
101   // What Full-sync command should be processed
102   final public static String COMMAND_PROPERTY="command";
103   FullSyncProvisioner.FULL_SYNC_COMMAND command;
104 
105   // group to process (valid for FULL_SYNC_GROUP-command type)
106   final public static String GROUP_PROPERTY = "group";
107   String groupName;
108 
109   // What happened when the group was processed
110   final public static String PROCESSING_RESULT_MESSAGE_PROPERTY = "processing_result_message";
111   String processingResultMessage;
112 
113 
114   public FullSyncQueueItem(String provisionerName, FullSyncProvisioner.QUEUE_TYPE sourceQueue,
115                            FullSyncProvisioner.FULL_SYNC_COMMAND command, String reason)
116   {
117     this.id = queueItemCounter.incrementAndGet();
118     this.sourceQueue = sourceQueue;
119     this.command=command;
120     this.provisionerName = provisionerName;
121     this.reason=reason;
122     messageToAcknowledge=null;
123   }
124 
125   public FullSyncQueueItem(String provisionerName, FullSyncProvisioner.QUEUE_TYPE sourceQueue,
126                            String groupName, String reason) {
127       this(provisionerName, sourceQueue, FullSyncProvisioner.FULL_SYNC_COMMAND.FULL_SYNC_GROUP, groupName, reason, null);
128   }
129 
130   public FullSyncQueueItem(String provisionerName, FullSyncProvisioner.QUEUE_TYPE sourceQueue,
131                            FullSyncProvisioner.FULL_SYNC_COMMAND command,
132                            String groupName, String reason,
133                            GrouperMessageAcknowledgeParam messageToAcknowledge) {
134       this.id = queueItemCounter.incrementAndGet();
135       this.sourceQueue = sourceQueue;
136       this.command= command;
137       this.provisionerName = provisionerName;
138       this.groupName = groupName;
139       this.reason = reason;
140       this.messageToAcknowledge = messageToAcknowledge;
141   }
142 
143 
144   public static FullSyncQueueItem fromJson(FullSyncProvisioner.QUEUE_TYPE sourceQueue, String provisionerName, String jsonString) {
145     // Old message format: just the group's name or all-groups
146     if ( !jsonString.startsWith("{") ) {
147       if ( jsonString.equalsIgnoreCase(FullSyncProvisioner.FULL_SYNC_ALL_GROUPS))
148         return new FullSyncQueueItem(provisionerName, sourceQueue,  FullSyncProvisioner.FULL_SYNC_COMMAND.FULL_SYNC_ALL_GROUPS, "from old-format message");
149       else
150         return new FullSyncQueueItem(
151               provisionerName,
152               sourceQueue,
153               jsonString,
154               "from old-format message");
155     }
156 
157     JSONObject jsonObject = JSONObject.fromObject(jsonString);
158 
159     String commandString = jsonObject.getString(COMMAND_PROPERTY);
160 
161     if ( commandString==null )
162     {
163       LOG.error("{}: Json of FullSyncQueueItem did not include {}: {}", provisionerName, COMMAND_PROPERTY, jsonString);
164       return null;
165     }
166 
167 
168     String reason = jsonObject.getString(REASON_PROPERTY);
169 
170     FullSyncQueueItemng/FullSyncQueueItem.html#FullSyncQueueItem">FullSyncQueueItem result = new FullSyncQueueItem(
171             provisionerName,
172             sourceQueue,
173             FullSyncProvisioner.FULL_SYNC_COMMAND.valueOf(commandString),
174             reason);
175 
176 
177     // Fill in the details
178     if ( jsonObject.containsKey(ID_PROPERTY) )
179       result.id = jsonObject.getInt(ID_PROPERTY);
180 
181     if ( jsonObject.containsKey(SOURCE_PROPERTY) )
182       result.source = jsonObject.getString(SOURCE_PROPERTY);
183     if ( jsonObject.containsKey(EXTERNAL_REFERENCE) )
184       result.externalReference = jsonObject.getString(EXTERNAL_REFERENCE);
185 
186     if ( jsonObject.containsKey(GROUP_PROPERTY) )
187       result.groupName = jsonObject.getString(GROUP_PROPERTY);
188 
189     if ( jsonObject.containsKey(FIRST_QUEUED_JEPOCH_PROPERTY) )
190       result.firstQueuedDate = new DateTime(jsonObject.getLong(FIRST_QUEUED_JEPOCH_PROPERTY));
191 
192     if ( jsonObject.containsKey(MOST_RECENT_QUEUED_JEPOCH_PROPERTY) )
193       result.mostRecentQueuedDate = new DateTime(jsonObject.getLong(MOST_RECENT_QUEUED_JEPOCH_PROPERTY));
194 
195     if ( jsonObject.containsKey(ASOF_JEPOCH_PROPERTY) )
196       result.asofDate = new DateTime(jsonObject.getLong(ASOF_JEPOCH_PROPERTY));
197 
198     if ( jsonObject.containsKey(WAS_SUCCESSFUL_PROPERTY) )
199       result.wasSuccessful = jsonObject.getBoolean(WAS_SUCCESSFUL_PROPERTY);
200 
201     if ( jsonObject.containsKey(WILL_BE_RETRIED_PROPERTY) )
202       result.willBeRetried = jsonObject.getBoolean(WILL_BE_RETRIED_PROPERTY);
203 
204     if ( jsonObject.containsKey(REPLY_QUEUE) )
205       result.replyQueue = jsonObject.getString(REPLY_QUEUE);
206 
207     if ( jsonObject.containsKey(RETRY_COUNT_PROPERTY) )
208       result.retryCount = jsonObject.getInt(RETRY_COUNT_PROPERTY);
209 
210     if ( jsonObject.containsKey(WAKE_TIME_JEPOCH_PROPERTY) )
211       result.wakeTimeDate = new DateTime(jsonObject.getLong(WAKE_TIME_JEPOCH_PROPERTY));
212 
213     if ( jsonObject.containsKey(PROCESSING_RESULT_MESSAGE_PROPERTY) )
214       result.processingResultMessage = jsonObject.getString(PROCESSING_RESULT_MESSAGE_PROPERTY);
215 
216     if ( result.command== FullSyncProvisioner.FULL_SYNC_COMMAND.FULL_SYNC_GROUP &&
217             result.groupName == null ) {
218       LOG.error("{}: Json of Group-command FullSyncQueueItem did not include {}: {}", provisionerName, GROUP_PROPERTY, jsonString);
219       return null;
220     }
221 
222     return result;
223   }
224 
225   public String toJson() {
226     // Manually building map that will be JSONed. Jackson could do this, but that isn't
227     // included in grouper right now, and I don't want to add such a dependency in this upcoming
228     // patch.
229     // Also, net.sf.json seems to be bean-specific and not able to JSONify general objects,
230     // nor to control the attribute names.
231     Map<String, Object> result = new HashMap<>();
232 
233     result.put(ID_PROPERTY, id);
234     result.put(PROVISIONER_NAME_PROPERTY, provisionerName);
235     result.put(COMMAND_PROPERTY, command.name());
236 
237     result.put(SOURCE_PROPERTY, source);
238     result.put(EXTERNAL_REFERENCE, externalReference);
239     result.put(REASON_PROPERTY, reason);
240 
241     result.put(GROUP_PROPERTY, groupName);
242 
243     result.put(FIRST_QUEUED_JEPOCH_PROPERTY, firstQueuedDate.getMillis());
244     result.put(ASOF_JEPOCH_PROPERTY, asofDate.getMillis());
245     result.put(MOST_RECENT_QUEUED_JEPOCH_PROPERTY, mostRecentQueuedDate.getMillis());
246 
247     if ( wakeTimeDate!=null ) {
248       result.put(WAKE_TIME_JEPOCH_PROPERTY, wakeTimeDate.getMillis());
249     }
250 
251     if ( wasSuccessful!= null ) {
252       result.put(WAS_SUCCESSFUL_PROPERTY, wasSuccessful ? "true" : "false");
253     }
254 
255     result.put(REPLY_QUEUE, replyQueue);
256     result.put(RETRY_COUNT_PROPERTY, retryCount);
257 
258     if ( willBeRetried!=null ) {
259       result.put(WILL_BE_RETRIED_PROPERTY, willBeRetried ? "true" : "false");
260     }
261 
262     result.put(PROCESSING_RESULT_MESSAGE_PROPERTY, processingResultMessage);
263 
264     // This jsonutils invocation wraps the result in a "<Class>": <result> envelope
265     // which gets in my way
266     // return JsonUtils.jsonConvertTo(result);
267     // So this is a partial copy of the jsonConvertTo method...
268 
269     JsonConfig jsonConfig = new JsonConfig();
270     jsonConfig.setJsonPropertyFilter(new PropertyFilter() {
271       public boolean apply(Object source, String name, Object value) {
272         return value == null;
273       }
274     });
275     JSONObject jsonObject = JSONObject.fromObject(result, jsonConfig);
276     String json = jsonObject.toString();
277     return json;
278   }
279 
280 
281   public void wasDequeued() {
282     mostRecentDequeueTime=DateTime.now();
283     currentProcessingStep="init";
284     currentProcessingStepStartTime = Instant.now();
285   }
286 
287   public void startStep(String stepLabel) {
288     // Complete the previous step if one is happening
289     if ( currentProcessingStep!=null ) {
290       Duration processingTimePeriod = new Duration(currentProcessingStepStartTime, Instant.now());
291       processingStepTimingMeasurements.put(currentProcessingStep, processingTimePeriod);
292     }
293 
294     currentProcessingStep=stepLabel;
295     currentProcessingStepStartTime = Instant.now();
296   }
297 
298 
299   public FullSyncQueueItem setSourceQueue(FullSyncProvisioner.QUEUE_TYPE queueType) {
300     sourceQueue = queueType;
301     return this;
302   }
303 
304 
305   public void processingCompletedSuccessfully(String messageFormat, Object... messageArgs) {
306       // Only act the first time we're told this was completed
307       if ( stats.processingCompletedTime != null ) {
308           return;
309       }
310 
311       if (StringUtils.isNotEmpty(messageFormat))
312         processingResultMessage = String.format("Successful: " + messageFormat, messageArgs);
313       else
314         processingResultMessage = "Successful";
315 
316       wasSuccessful = true;
317 
318       processingCompleted();
319   }
320 
321   private void processingCompleted() {
322     stats.processingCompletedTime = new Date();
323     startStep(null);
324     processingCompletionTime = DateTime.now();
325     LOG.info("FullSync Item done ({}). Stats: {}/{}: {}",
326             processingResultMessage, stats, getProcessingTimeBreakdown(), this);
327 
328     acknowledgeMessage();
329   }
330 
331   public void processingCompletedUnsuccessfully(boolean willBeRetried, String messageFormat, Object... messageArgs) {
332     // Only act the first time we're told this was completed
333     if ( stats.processingCompletedTime != null ) {
334         return;
335     }
336     this.willBeRetried=willBeRetried;
337     processingResultMessage = String.format(messageFormat, messageArgs);
338 
339     wasSuccessful = false;
340     processingCompleted();
341   }
342 
343   public void acknowledgeMessage() {
344     if ( messageToAcknowledge != null ) {
345       LOG.debug("Acknowledging that message was processed: {}", this);
346       try {
347         GrouperMessagingEngine.acknowledge(messageToAcknowledge);
348       } catch (GrouperStaleStateException gsse) {
349         LOG.debug("error acknowleding message", gsse);
350         // not sure what to do here...
351       }
352     }
353     messageToAcknowledge = null;
354   }
355 
356   public boolean hasBeenProcessed() {
357   return stats.processingCompletedTime != null;
358 }
359 
360   public Duration getTimeSpentInQueue() {
361     if ( mostRecentDequeueTime==null ) {
362       return new Duration(mostRecentQueuedDate, Instant.now());
363     } else {
364       return new Duration(mostRecentQueuedDate, mostRecentDequeueTime);
365     }
366 }
367 
368   public Duration getTimeSinceFirstQueued() {
369     return new Duration(firstQueuedDate, Instant.now());
370   }
371 
372   public String getRequestedAction() {
373     switch (command) {
374       case FULL_SYNC_GROUP:
375         return groupName;
376       case CLEANUP:
377         return "Extra-Group Cleanup";
378       case FULL_SYNC_ALL_GROUPS:
379         return "All-Groups";
380     }
381     return "Unknown-command-"+command.name();
382   }
383 
384   public String toString() {
385       return String.format("Action=%s|qid=%d|Trigger=%s|ExternalRef=%s|AsOf=%s|QTime=%s|Age=%s",
386               getRequestedAction(), id, reason, externalReference,
387               PspUtils.formatDate_DateHoursMinutes(asofDate, "none"),
388               PspUtils.formatElapsedTime(getTimeSpentInQueue()),
389               PspUtils.formatElapsedTime(getTimeSinceFirstQueued()));
390   }
391 
392   public String getProcessingTimeBreakdown() {
393     StringBuilder result = new StringBuilder();
394 
395     Duration totalProcessingTime;
396     if ( mostRecentDequeueTime == null )
397       return "NotYetProcessed";
398 
399     if ( processingCompletionTime==null ) {
400       totalProcessingTime = new Duration(mostRecentDequeueTime, Instant.now());
401       result.append(String.format("ProcTime(SoFar): %s", PspUtils.formatElapsedTime(totalProcessingTime)));
402     } else {
403       totalProcessingTime = new Duration(mostRecentDequeueTime, processingCompletionTime);
404       result.append(String.format("ProcTime: %s", PspUtils.formatElapsedTime(totalProcessingTime)));
405     }
406 
407     if (processingStepTimingMeasurements.size()>0 && totalProcessingTime.getMillis()>0) {
408       result.append(" Timing breakdown: ");
409 
410       for (Map.Entry<String, Duration> processingStep : processingStepTimingMeasurements.entrySet()) {
411         result.append(String.format("%s=%d%%/", processingStep.getKey(), (int) (100 * processingStep.getValue().getMillis() / totalProcessingTime.getMillis())));
412       }
413     }
414 
415     return result.toString();
416   }
417 
418 
419   public void incrementRetryCount() {
420     retryCount++;
421     mostRecentQueuedDate = new DateTime();
422 
423     // Push event out 10 seconds more each retry, up to 15minutes
424     int sleepTime_secs = Math.min(10*retryCount, 900);
425 
426     wakeTimeDate = new DateTime().withDurationAdded(1000*sleepTime_secs, 1);
427   }
428 }