1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package edu.internet2.middleware.grouper.app.messaging;
18
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.regex.Matcher;
26 import java.util.regex.Pattern;
27
28 import org.apache.commons.logging.Log;
29 import org.joda.time.DateTime;
30 import org.joda.time.format.ISODateTimeFormat;
31 import org.quartz.DisallowConcurrentExecution;
32 import org.quartz.Job;
33 import org.quartz.JobExecutionContext;
34 import org.quartz.JobExecutionException;
35
36 import com.fasterxml.jackson.databind.JsonNode;
37 import com.fasterxml.jackson.databind.node.ObjectNode;
38
39 import edu.internet2.middleware.grouper.GrouperSession;
40 import edu.internet2.middleware.grouper.app.loader.GrouperDaemonUtils;
41 import edu.internet2.middleware.grouper.app.loader.GrouperLoaderConfig;
42 import edu.internet2.middleware.grouper.cfg.GrouperConfig;
43 import edu.internet2.middleware.grouper.util.GrouperHttpClient;
44 import edu.internet2.middleware.grouper.util.GrouperHttpMethod;
45 import edu.internet2.middleware.grouper.util.GrouperUtil;
46 import edu.internet2.middleware.grouperClient.messaging.GrouperMessage;
47 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeParam;
48 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeType;
49 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageQueueParam;
50 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageQueueType;
51 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageReceiveParam;
52 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageReceiveResult;
53 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageSendParam;
54 import edu.internet2.middleware.grouperClient.messaging.GrouperMessageSystemParam;
55 import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;
56 import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingSystem;
57 import edu.internet2.middleware.grouperClient.util.GrouperClientUtils;
58 import org.apache.commons.lang3.StringUtils;
59
60
61
62
63
64 @DisallowConcurrentExecution
65 public class MessageConsumerDaemon implements Job {
66
67
68
69
70 private static final Log LOG = GrouperUtil.getLog(MessageConsumerDaemon.class);
71
72
73
74
75 @Override
76 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
77
78 Pattern pattern = Pattern.compile("^grouper\\.messaging\\.([^.]+)\\.messagingSystemName$");
79 GrouperLoaderConfig grouperLoaderConfig = GrouperLoaderConfig.retrieveConfig();
80
81 String configName = null;
82
83 String actAsSubjectId = null;
84 String actAsSubjectSourceId = null;
85 String messagingSystemName = null;
86 String queueOrTopicName = null;
87 String routingKey = null;
88 String exchangeType = null;
89 String messageQueueType = null;
90 Integer longPollingSeconds = null;
91 Map<String, Object> queueArguments = null;
92
93 for (String propertyName : grouperLoaderConfig.propertyNames()) {
94 Matcher matcher = pattern.matcher(propertyName);
95 if (matcher.matches()) {
96
97 configName = matcher.group(1);
98
99 messagingSystemName = grouperLoaderConfig.propertyValueString(propertyName);
100 if (StringUtils.isBlank(messagingSystemName)) {
101 LOG.info("No messaging system name found so not going to connect to any queue or topic.");
102 return;
103 }
104
105 queueOrTopicName = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".queueOrTopicName");
106 routingKey = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".routingKey");
107 exchangeType = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".exchangeType");
108 messageQueueType = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".messageQueueType");
109 actAsSubjectSourceId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectSourceId");
110 actAsSubjectId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectId");
111 longPollingSeconds = grouperLoaderConfig.propertyValueInt("grouper.messaging."+configName+".longPollingSeconds", 1);
112
113 for (int i=0;i<100;i++) {
114 String key = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".queueArgs." + i + ".key");
115 if (key == null || "".equals(key)) {
116 break;
117 }
118
119 String value = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".queueArgs." + i + ".value");
120 if (queueArguments == null) {
121 queueArguments = new HashMap<>();
122 }
123 queueArguments.put(key, value);
124 }
125 }
126 }
127
128 if (StringUtils.isBlank(configName)) {
129 return;
130 }
131
132 try {
133 GrouperSession.startBySubjectIdAndSource(actAsSubjectId, actAsSubjectSourceId);
134 } catch (Exception e) {
135 LOG.error("Error occurred while starting grouper session for subjectId" + actAsSubjectId +" and source id "+actAsSubjectSourceId, e);
136 return;
137 }
138
139 GrouperMessagingSystem grouperMessageSystem = null;
140
141 try {
142 grouperMessageSystem = GrouperMessagingEngine.retrieveGrouperMessageSystem(messagingSystemName);
143 } catch(Exception e) {
144 LOG.error("Error occurred while retrieving grouper message system for "+messagingSystemName, e);
145 return;
146 }
147
148 Collection<GrouperMessage> grouperMessages = null;
149 try {
150 grouperMessages = receiveMessages(messagingSystemName, queueOrTopicName, routingKey, exchangeType, messageQueueType, longPollingSeconds, queueArguments, grouperMessageSystem);
151 LOG.info("Received "+grouperMessages.size() +" message(s) from "+queueOrTopicName +" for message system: "+messagingSystemName);
152
153 GrouperDaemonUtils.stopProcessingIfJobPaused();
154 } catch (Exception e) {
155 LOG.error("Error occurred while receiving messages from "+queueOrTopicName, e);
156 return;
157 }
158
159 processMessages(messagingSystemName, grouperMessageSystem, messageQueueType, queueOrTopicName, grouperMessages, configName);
160
161 }
162
163
164
165
166
167
168
169
170
171
172 protected void processMessages(String messagingSystemName, GrouperMessagingSystem grouperMessageSystem,
173 String messageQueueType, String queueTopicName,
174 Collection<GrouperMessage> grouperMessages, String configName) {
175
176 List<GrouperMessage> messagesToBeAcknowledged = new ArrayList<GrouperMessage>();
177
178 for (GrouperMessage inputMessage: grouperMessages) {
179 GrouperDaemonUtils.stopProcessingIfJobPaused();
180
181 String messageBody = inputMessage.getMessageBody();
182 InputMessageGrouperHeader grouperHeader = null;
183
184 JsonNode jsonObject = null;
185
186
187 try {
188 jsonObject = GrouperUtil.jsonJacksonNode(messageBody);
189
190 } catch(Exception e) {
191 LOG.error("Error occurred while building json object for "+messageBody);
192 continue;
193 }
194
195 JsonNode grouperHeaderJson = null;
196 try {
197 grouperHeaderJson = jsonObject.get("grouperHeader");
198 } catch (Exception e) {
199 LOG.error("Error occurred while retrieving key grouperHeader out of message body: "+messageBody);
200 continue;
201 }
202
203 try {
204 grouperHeader = GrouperUtil.jsonConvertFrom(grouperHeaderJson, InputMessageGrouperHeader.class);
205 } catch (Exception e) {
206 LOG.error("Error occurred while building Json object for "+grouperHeaderJson);
207 continue;
208 }
209
210 Collection<String> errors = validate(grouperHeader);
211
212 String replyToQueueOrTopic = grouperHeader.getReplyToQueueOrTopic();
213 String replyToQueueOrTopicName = grouperHeader.getReplyToQueueOrTopicName();
214 String routingKey = grouperHeader.getReplyToRoutingKey();
215 if (errors.size() > 0) {
216 if (canReplyToErrorMessages(replyToQueueOrTopicName, replyToQueueOrTopic)) {
217 String errorJson = buildErrorResponse(errors, grouperHeader);
218 sendReplyMessage(grouperMessageSystem, grouperHeader, messagingSystemName, errorJson, routingKey);
219 } else {
220 LOG.error("Invalid message received from the queue. Errors: "+GrouperUtil.collectionToString(errors));
221 }
222 continue;
223 }
224
225 String endpoint = grouperHeader.getEndpoint();
226
227 String wsRequestBody = GrouperUtil.jsonJacksonGetString(jsonObject, endpoint);
228
229 String newJson = "{ \"" + endpoint + "\" :" + wsRequestBody + "}";
230
231 String wsBaseUrl = GrouperLoaderConfig.retrieveConfig().propertyValueString("grouper.messaging."+configName+".ws.url");
232
233 WsResponse wsReponse = null;
234 try {
235 wsReponse = callWebService(newJson, wsBaseUrl + grouperHeader.getHttpPath(), configName);
236 messagesToBeAcknowledged.add(inputMessage);
237 } catch (Exception e) {
238 wsReponse = new WsResponse();
239 wsReponse.setHttpStatusCode(400);
240 wsReponse.setSuccess("F");
241 wsReponse.setResultCode("EXCEPTION");
242 wsReponse.setBody("\"errorMessage\": \"" + e.getMessage() + "\"");
243 LOG.error("Error occurred while calling web service: " + e.getMessage(), e);
244 }
245
246
247 if (StringUtils.isNotBlank(replyToQueueOrTopic) && StringUtils.isNotBlank(replyToQueueOrTopicName)) {
248 String messageToBeSent = buildWsReplyToMessage(wsReponse, grouperHeader);
249 sendReplyMessage(grouperMessageSystem, grouperHeader, messagingSystemName, messageToBeSent, routingKey);
250 }
251
252 }
253
254 acknowledge(grouperMessageSystem, messagingSystemName, messageQueueType, queueTopicName, messagesToBeAcknowledged);
255
256 }
257
258
259
260
261
262
263
264
265 private void acknowledge(GrouperMessagingSystem grouperMessageSystem,
266 String messagingSystemName, String messageQueueType, String queueTopicName, Collection<GrouperMessage> messagesToBeAcknowledged) {
267
268 GrouperMessageAcknowledgeParam acknowledgeParam = new GrouperMessageAcknowledgeParam();
269 acknowledgeParam.assignQueueName(queueTopicName);
270 acknowledgeParam.assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed);
271 acknowledgeParam.assignGrouperMessages(messagesToBeAcknowledged);
272 acknowledgeParam.assignGrouperMessageSystemName(messagingSystemName);
273
274 GrouperMessageSystemParam systemParam = new GrouperMessageSystemParam();
275 systemParam.assignMesssageSystemName(messagingSystemName);
276 acknowledgeParam.assignGrouperMessageSystemParam(systemParam);
277
278 GrouperMessageQueueParam queueParam = new GrouperMessageQueueParam();
279 queueParam.assignQueueOrTopicName(queueTopicName);
280 queueParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(messageQueueType, true));
281 acknowledgeParam.assignGrouperMessageQueueParam(queueParam);
282
283 grouperMessageSystem.acknowledge(acknowledgeParam);
284
285 }
286
287
288
289
290
291
292 private boolean canReplyToErrorMessages(String queueOrTopicName, String queueType) {
293 return StringUtils.isNotBlank(queueType) && StringUtils.isNotBlank(queueOrTopicName) && GrouperMessageQueueType.valueOfIgnoreCase(queueType, false) != null;
294 }
295
296
297
298
299
300 private List<String> validate(InputMessageGrouperHeader grouperHeader) {
301
302 List<String> errors = new ArrayList<String>();
303
304 if (StringUtils.isBlank(grouperHeader.getMessageVersion())) {
305 errors.add("grouperHeader.messageVersion is required.");
306 }
307
308 if (StringUtils.isBlank(grouperHeader.getTimestampInput())) {
309 errors.add("grouperHeader.timestampInput is required.");
310 }
311
312 try {
313 DateTime.parse(grouperHeader.getTimestampInput(), ISODateTimeFormat.dateTime());
314 } catch (Exception e) {
315 errors.add("Error converting "+grouperHeader.getTimestampInput() +" to datetime using "+ISODateTimeFormat.dateTime());
316 }
317
318 if (StringUtils.isBlank(grouperHeader.getType())) {
319 errors.add("grouperHeader.type is required.");
320 }
321
322 if (StringUtils.isBlank(grouperHeader.getEndpoint())) {
323 errors.add("grouperHeader.endpoint is required.");
324 }
325
326 if (StringUtils.isBlank(grouperHeader.getMessageInputUuid())) {
327 errors.add("grouperHeader.messageInputUuid is required.");
328 }
329
330 if (StringUtils.isBlank(grouperHeader.getHttpMethod())) {
331 errors.add("grouperHeader.httpMethod is required.");
332 }
333
334 if (StringUtils.isBlank(grouperHeader.getHttpPath())) {
335 errors.add("grouperHeader.httpPath is required.");
336 }
337
338 if (StringUtils.isNotBlank(grouperHeader.getReplyToQueueOrTopic()) &&
339 GrouperMessageQueueType.valueOfIgnoreCase(grouperHeader.getReplyToQueueOrTopic(), false) == null) {
340 errors.add("grouperHeader.replyToQueueOrTopic can only be queue or topic.");
341 }
342
343 return errors;
344 }
345
346
347 private Collection<GrouperMessage> receiveMessages(String messagingSystemName,
348 String queueOrTopicName, String routingKey, String exchangeType, String messageQueueType, Integer longPollingSeconds,
349 Map<String, Object> queueArguments, GrouperMessagingSystem grouperMessageSystem) {
350
351 GrouperMessageReceiveParam receiveParam = new GrouperMessageReceiveParam();
352 receiveParam.assignGrouperMessageSystemName(messagingSystemName);
353
354 GrouperMessageQueueParam queueParam = new GrouperMessageQueueParam();
355 queueParam.assignQueueOrTopicName(queueOrTopicName);
356 queueParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(messageQueueType, true));
357 receiveParam.assignGrouperMessageQueueParam(queueParam);
358
359 GrouperMessageSystemParam systemParam = new GrouperMessageSystemParam();
360 systemParam.assignMesssageSystemName(messagingSystemName);
361
362 receiveParam.assignQueueName(queueOrTopicName);
363
364 receiveParam.assignGrouperMessageSystemParam(systemParam);
365 receiveParam.assignLongPollMillis(longPollingSeconds*1000);
366 receiveParam.assignAutocreateObjects(true);
367 receiveParam.assignRoutingKey(routingKey);
368 receiveParam.assignExchangeType(exchangeType);
369 receiveParam.assignQueueArguments(queueArguments);
370
371 GrouperMessageReceiveResult grouperMessageReceiveResult = grouperMessageSystem.receive(receiveParam);
372 return grouperMessageReceiveResult.getGrouperMessages();
373 }
374
375
376
377
378
379
380
381
382 private WsResponse callWebService(String jsonInput, String url, String configName) throws Exception {
383
384 GrouperHttpClientrHttpClient.html#GrouperHttpClient">GrouperHttpClient grouperHttpClient = new GrouperHttpClient();
385
386 grouperHttpClient.assignUrl(url);
387 grouperHttpClient.assignGrouperHttpMethod(GrouperHttpMethod.post);
388
389 GrouperLoaderConfig grouperLoaderConfig = GrouperLoaderConfig.retrieveConfig();
390
391 String username = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.username");
392 String password = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.password");
393 if (StringUtils.isNotBlank(password)) {
394 password = GrouperClientUtils.decryptFromFileIfFileExists(password, null);
395 grouperHttpClient.assignUser(username);
396 grouperHttpClient.assignPassword(password);
397 }
398
399 String proxyUrl = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.proxyUrl");
400 String proxyType = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".ws.proxyType");
401
402 grouperHttpClient.assignProxyUrl(proxyUrl);
403 grouperHttpClient.assignProxyType(proxyType);
404
405 String actAsSubjectSourceId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectSourceId");
406 String actAsSubjectId = grouperLoaderConfig.propertyValueString("grouper.messaging."+configName+".actAsSubjectId");
407
408 grouperHttpClient.addHeader("X-Grouper-actAsSourceId", actAsSubjectSourceId);
409 grouperHttpClient.addHeader("X-Grouper-actAsSubjectId", actAsSubjectId);
410
411 grouperHttpClient.addHeader("Connection", "close");
412
413 grouperHttpClient.assignBody(jsonInput);
414
415 grouperHttpClient.addHeader("Content-type", "text/x-json; charset=UTF-8");
416
417 grouperHttpClient.executeRequest();
418
419 String response = grouperHttpClient.getResponseBody();
420
421 WsResponseer/app/messaging/WsResponse.html#WsResponse">WsResponse wsResponse = new WsResponse();
422 wsResponse.setBody(response);
423 wsResponse.setHttpStatusCode(grouperHttpClient.getResponseCode());
424
425 String successString = grouperHttpClient.getResponseHeaders().get("X-Grouper-success");
426 wsResponse.setSuccess(successString);
427 String resultCode = grouperHttpClient.getResponseHeaders().get("X-Grouper-resultCode");
428 wsResponse.setResultCode(resultCode);
429
430 String resultCode2 = grouperHttpClient.getResponseHeaders().get("X-Grouper-resultCode2");
431 wsResponse.setResultCode2(resultCode2);
432
433 return wsResponse;
434 }
435
436
437
438
439
440
441 private String buildErrorResponse(Collection<String> errors, InputMessageGrouperHeader inputGrouperHeader) {
442
443 String timestampOutput = new DateTime().toString(ISODateTimeFormat.dateTime());
444
445 OutputMessageGrouperHeaderutputMessageGrouperHeader.html#OutputMessageGrouperHeader">OutputMessageGrouperHeader outputHeader = new OutputMessageGrouperHeader();
446 outputHeader.setMessageVersion(inputGrouperHeader.getMessageVersion());
447 outputHeader.setTimestampInput(inputGrouperHeader.getTimestampInput());
448 outputHeader.setTimestampOutput(timestampOutput);
449 outputHeader.setType("grouperMessagingFromWebService");
450 outputHeader.setEndpoint(inputGrouperHeader.getEndpoint());
451 outputHeader.setMessageInputUuid(inputGrouperHeader.getMessageInputUuid());
452
453 outputHeader.setHttpHeaderXGrouperSuccess("F");
454 outputHeader.setHttpResponseCode(400);
455 outputHeader.setHttpHeaderXGrouperResultCode2("NONE");
456 outputHeader.setHttpHeaderXGrouperResultCode("ERROR");
457
458 ObjectNode objectNode = GrouperUtil.jsonConvertFromObjectToObjectNode(outputHeader);
459
460 renameKeys(objectNode);
461
462 String header = GrouperUtil.jsonConvertTo(objectNode, false);
463
464 String errorMessages = GrouperUtil.jsonConvertTo(errors, false);
465
466 String finalOuput = " { \"grouperHeader\": "+header+", \"errors\": " + errorMessages + " }" ;
467
468 return finalOuput;
469
470 }
471
472 private void renameKeys(ObjectNode jsonObject) {
473
474 jsonObject.put("httpHeader_X-Grouper-resultCode", jsonObject.get("httpHeaderXGrouperResultCode"));
475 jsonObject.put("httpHeader_X-Grouper-success", jsonObject.get("httpHeaderXGrouperSuccess"));
476 jsonObject.put("httpHeader_X-Grouper-resultCode2", jsonObject.get("httpHeaderXGrouperResultCode2"));
477
478 jsonObject.remove("httpHeaderXGrouperResultCode");
479 jsonObject.remove("httpHeaderXGrouperSuccess");
480 jsonObject.remove("httpHeaderXGrouperResultCode2");
481
482
483 }
484
485
486
487
488
489
490 private String buildWsReplyToMessage(WsResponse wsResponse, InputMessageGrouperHeader inputGrouperHeader) {
491
492 String timestampOutput = new DateTime().toString(ISODateTimeFormat.dateTime());
493
494 OutputMessageGrouperHeaderutputMessageGrouperHeader.html#OutputMessageGrouperHeader">OutputMessageGrouperHeader outputHeader = new OutputMessageGrouperHeader();
495 outputHeader.setMessageVersion(inputGrouperHeader.getMessageVersion());
496 outputHeader.setTimestampInput(inputGrouperHeader.getTimestampInput());
497 outputHeader.setTimestampOutput(timestampOutput);
498 outputHeader.setType("grouperMessagingFromWebService");
499 outputHeader.setEndpoint(inputGrouperHeader.getEndpoint());
500 outputHeader.setMessageInputUuid(inputGrouperHeader.getMessageInputUuid());
501 outputHeader.setHttpResponseCode(wsResponse.getHttpStatusCode());
502 outputHeader.setHttpHeaderXGrouperResultCode(wsResponse.getResultCode());
503 outputHeader.setHttpHeaderXGrouperSuccess(wsResponse.getSuccess());
504 outputHeader.setHttpHeaderXGrouperResultCode2(wsResponse.getResultCode2());
505
506 ObjectNode objectNode = GrouperUtil.jsonConvertFromObjectToObjectNode(outputHeader);
507 renameKeys(objectNode);
508
509 String header = GrouperUtil.jsonConvertTo(objectNode, false);
510
511 String finalOuput = " { \"grouperHeader\": "+header+", " + wsResponse.getBody() + " }" ;
512
513 return finalOuput;
514
515 }
516
517
518
519
520
521
522
523 private void sendReplyMessage(GrouperMessagingSystem grouperMessagingSystem,
524 InputMessageGrouperHeader inputGrouperHeader, String messagingSystemName,
525 String finalOuput, String routingKey) {
526 GrouperMessageSendParam sendParam = new GrouperMessageSendParam();
527 sendParam.assignAutocreateObjects(true);
528
529 GrouperMessageQueueParam queueParam = new GrouperMessageQueueParam();
530 queueParam.assignQueueOrTopicName(inputGrouperHeader.getReplyToQueueOrTopicName());
531 queueParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(inputGrouperHeader.getReplyToQueueOrTopic(), true));
532 sendParam.assignGrouperMessageQueueParam(queueParam);
533
534 sendParam.assignGrouperMessageSystemName(messagingSystemName);
535 sendParam.assignQueueOrTopicName(inputGrouperHeader.getReplyToQueueOrTopicName());
536 sendParam.assignQueueType(GrouperMessageQueueType.valueOfIgnoreCase(inputGrouperHeader.getReplyToQueueOrTopic(), true));
537
538 GrouperMessageSystemParam systemParam = new GrouperMessageSystemParam();
539 systemParam.assignAutocreateObjects(true);
540 systemParam.assignMesssageSystemName(messagingSystemName);
541 sendParam.assignGrouperMessageSystemParam(systemParam);
542
543 sendParam.assignMessageBodies(Collections.singleton(finalOuput));
544 sendParam.assignRoutingKey(routingKey);
545 sendParam.assignExchangeType(inputGrouperHeader.getReplyToExchangeType());
546 sendParam.assignQueueArguments(inputGrouperHeader.getReplyToQueueArguments());
547 try {
548 grouperMessagingSystem.send(sendParam);
549 } catch (Exception e) {
550 LOG.error("Error occurred while sending reply message "+ inputGrouperHeader.getMessageInputUuid()+" to "+inputGrouperHeader.getReplyToQueueOrTopicName(), e);
551 }
552 }
553
554 }