/****************************************************************** * Template program for processing messages passed via MQSeries * * and store them into a nominal data (ORACLE) * ******************************************************************/ #include #include #include #include #include #include "config.h" EXEC SQL include sqlca; #define BUFFER_LENGTH 64000 #define IN_SECONDS * 1000 #define HEADER_FIELDS 3 #define SOURCE_LEN 32 #define TRAN_TYPE_LEN 32 #define EXECUTABLE_LEN 128 ConfigFileT configFile; /* Had to make these global to make */ /* the sql_error() handler work */ MQHCONN QMgrConHandle; PMQTMC2 triggerData; /* Specify what to do on SQL error */ void sql_error(void); EXEC SQL whenever sqlerror do sql_error(); /******************************************************************/ /* Process the passed message */ /******************************************************************/ int process_message(PMQCHAR msg, MQLONG len) { EXEC SQL begin declare section; VARCHAR separator[1]; VARCHAR source[SOURCE_LEN]; VARCHAR tran_type[TRAN_TYPE_LEN]; VARCHAR executable[EXECUTABLE_LEN]; VARCHAR message[BUFFER_LENGTH]; EXEC SQL end declare section; int i; char *start; char *end; /* Get the field separator first */ separator.arr[0] = msg[0]; separator.len = 1; /* Now split the fields up */ start = msg + 1; if ((end = strchr(start, *msg)) != NULL) { /* If the source field is too large */ /* need to return an error */ if (end - start > SOURCE_LEN) return 1; *end = '\0'; strncpy((char *)source.arr, start, SOURCE_LEN); source.len = end - start; *end = *msg; start = end + 1; } if ((end = strchr(start, *msg)) != NULL) { /* If the transaction field is too large */ /* need to return an error */ if (end - start > TRAN_TYPE_LEN) return 1; *end = '\0'; strncpy((char *)tran_type.arr, start, TRAN_TYPE_LEN); tran_type.len = end - start; *end = *msg; start = end + 1; } if ((end = strchr(start, *msg)) != NULL) { /* If the executable field is too large */ /* need to return an error */ if (end - start > EXECUTABLE_LEN) return 1; *end = '\0'; strncpy((char *)executable.arr, start, EXECUTABLE_LEN); executable.len = end - start; *end = *msg; start = end + 1; } /* Finally stuff all the message */ /* contents into the holder */ strncpy((char *)message.arr, msg, len); message.len = len; /* Okay, so insert the message */ EXEC SQL insert into mq_messages_in (field_sep, source, tran_type, executable, recv_message, seq_number) values (:separator, :source, :tran_type, :executable, :message, mq_mess_in_id.nextval); return 0; } /******************************************************************/ /* Disconnect from given Q Mgr */ /******************************************************************/ void disconnect_from_Mgr(PMQHCONN QMgrConHandle) { MQLONG compCode; MQLONG reasonCode; /* Disconnect from MQM */ MQDISC(QMgrConHandle, &(compCode), &(reasonCode)); if (compCode != MQCC_OK) fprintf(stderr, "MQDISC Disconnect from QMgr failed - %ld\n", reasonCode); } /******************************************************************/ /* Open a connection to the given QMgr */ /******************************************************************/ void connect_to_Mgr(PMQCHAR QMgrName, PMQHCONN QMgrConHandle) { MQLONG compCode; MQLONG reasonCode; /* Connect to queue manager */ MQCONN(QMgrName, QMgrConHandle, &(compCode), &(reasonCode)); /* Report reason and stop if it failed */ if (compCode != MQCC_OK) { fprintf(stderr, "MQCONN Could not connect to MQSeries - %ld\n", reasonCode); exit(reasonCode); } } /******************************************************************/ /* Open a connection to the given QMgr and QName */ /******************************************************************/ void connect_to_Q(MQHCONN QMgrConHandle, PMQCHAR QName, PMQHOBJ QHandle) { MQLONG compCode; MQLONG reasonCode; MQOD getDesc = {MQOD_DEFAULT}; /* Open the named message queue for input */ strncpy(getDesc.ObjectName, QName, MQ_Q_NAME_LENGTH); getDesc.ObjectType = MQOT_Q; MQOPEN(QMgrConHandle, &(getDesc), MQOO_INPUT_AS_Q_DEF | MQOO_FAIL_IF_QUIESCING, QHandle, &(compCode), &(reasonCode)); /* Report reason */ if (compCode != MQCC_OK) { disconnect_from_Mgr(&(QMgrConHandle)); fprintf(stderr, "MQOPEN Q open failed - %ld\n", reasonCode); exit(reasonCode); } } /******************************************************************/ /* Close the given Q */ /******************************************************************/ long close_Q(MQHCONN QMgrConHandle, PMQHOBJ QHandle) { MQLONG compCode; MQLONG reasonCode; /* Close the message queue */ MQCLOSE(QMgrConHandle, QHandle, 0, &(compCode), &(reasonCode)); /* Report reason */ if (compCode != MQCC_OK) { fprintf(stderr, "MQCLOSE Q close failed - %ld\n", reasonCode); return reasonCode; } return 0; } /******************************************************************/ /* Error routine called on an SQL error */ /******************************************************************/ void sql_error(void) { char msg_buffer[512]; int msg_length; int buffer_size = 512; EXEC SQL whenever sqlerror continue; sqlglm(msg_buffer, &(buffer_size), &(msg_length)); fprintf(stderr, "SQL error %ld - %s\n", sqlca.sqlcode, msg_buffer); exit(sqlca.sqlcode); } /******************************************************************/ /* Close the read queue and remove the trigger */ /******************************************************************/ void change_trigger(MQHCONN QMgrConHandle, int on, PMQCHAR QName) { MQLONG compCode; MQLONG reasonCode; MQHOBJ QHandle; MQLONG selector[2]; MQLONG setAttr[2]; MQOD getDesc = {MQOD_DEFAULT}; /* Open the named message queue for attribute setting */ strncpy(getDesc.ObjectName, QName, MQ_Q_NAME_LENGTH); getDesc.ObjectType = MQOT_Q; MQOPEN(QMgrConHandle, &(getDesc), MQOO_SET | MQOO_FAIL_IF_QUIESCING, &(QHandle), &(compCode), &(reasonCode)); /* Report reason */ if (compCode != MQCC_OK) { fprintf(stderr, "MQOPEN Q trigger open failed - %ld\n", reasonCode); disconnect_from_Mgr(&(QMgrConHandle)); exit(reasonCode); } /* Change the triggering as required */ if (on) { /* Set the triggering back on */ selector[0] = MQIA_TRIGGER_CONTROL; setAttr[0] = MQTC_ON; selector[1] = MQIA_TRIGGER_TYPE; setAttr[1] = MQTT_FIRST; MQSET(QMgrConHandle, QHandle, 2, selector, 2, setAttr, 0, NULL, &(compCode), &(reasonCode)); } else { /* Set the triggering off */ selector[0] = MQIA_TRIGGER_CONTROL; setAttr[0] = MQTC_OFF; MQSET(QMgrConHandle, QHandle, 1, selector, 1, setAttr, 0, NULL, &(compCode), &(reasonCode)); } /* Report reason */ if (compCode != MQCC_OK) { fprintf(stderr, "MQSET Trigger %s failed - %ld\n", on ? "reset" : "remove", reasonCode); } close_Q(QMgrConHandle, &(QHandle)); } /******************************************************************/ /* Read the next message from the given Q */ /******************************************************************/ int get_message(MQHCONN QMgrConHandle, MQHOBJ QHandle, PMQCHAR QName, PMQCHAR messBuf, PMQLONG messLength, PMQMD messageDesc) { MQGMO getOptions = {MQGMO_DEFAULT}; MQLONG compCode; MQLONG reasonCode; /* Wait for the next message to appear */ /* Fetch it under syncpoint in case of */ /* a problem later on */ getOptions.Options = MQGMO_WAIT | MQGMO_CONVERT | MQGMO_SYNCPOINT; getOptions.WaitInterval = configFile.SleepTime IN_SECONDS; /* In order to read the messages in sequence, MsgId and */ /* CorrelID must have the default value */ memcpy(messageDesc->MsgId, MQMI_NONE, MQ_MSG_ID_LENGTH); memcpy(messageDesc->CorrelId, MQCI_NONE, MQ_CORREL_ID_LENGTH); while (1) { MQGET(QMgrConHandle, QHandle, messageDesc, &(getOptions), BUFFER_LENGTH, messBuf, messLength, &(compCode), &(reasonCode)); /* Report reason */ switch (reasonCode) { case MQRC_NONE: /* Everything okay */ return 1; case MQRC_NO_MSG_AVAILABLE: /* Timed out so quit looking */ fprintf(stderr, "Queue empty for %d secs, quitting\n", configFile.SleepTime); return 0; case MQRC_GET_INHIBITED: /* Cannot GET from the queue at */ /* the moment, so wait a bit */ fprintf(stderr, "GET_INHIBITED retrying\n"); sleep(30); break; case MQRC_OBJECT_CHANGED: /* Only get this error when the queue */ /* is an alias and the queue that it */ /* points to has been changed. Close */ /* and reopen the queue */ close_Q(QMgrConHandle, &(QHandle)); connect_to_Q(QMgrConHandle, QName, &(QHandle)); break; default: /* Some other error condition */ /* report it and exit */ fprintf(stderr, "MQGET Message get failed - %ld\n", reasonCode); close_Q(QMgrConHandle, &(QHandle)); disconnect_from_Mgr(&(QMgrConHandle)); exit(reasonCode); } } } /******************************************************************/ /* Rollback the GET as the processor returned a fatal error */ /******************************************************************/ void rollback(MQHCONN QMgrConHandle) { MQLONG reasonCode; MQLONG compCode; MQBACK(QMgrConHandle, &(compCode), &(reasonCode)); /* If the rollback failed fatal error */ if (compCode != MQCC_OK) { fprintf(stderr, "MQBACK Rollback of changes failed - %ld\n", reasonCode); exit(reasonCode); } /* Undo last SQL action and logoff */ EXEC SQL rollback work release; } /******************************************************************/ /* Commit the GET as work is complete okay with this message */ /******************************************************************/ void commit(MQHCONN QMgrConHandle) { MQLONG reasonCode; MQLONG compCode; MQCMIT(QMgrConHandle, &(compCode), &(reasonCode)); /* If the commit failed fatal error */ if (compCode != MQCC_OK) { fprintf(stderr, "MQCMIT Commit of changes failed -%ld\n", reasonCode); rollback(QMgrConHandle); exit(reasonCode); } /* Commit changes into database */ EXEC SQL commit work; } /******************************************************************/ int main(int argc, char **argv) { MQLONG compCode; MQLONG reasonCode; MQHOBJ QHandle; MQMD messageDesc = {MQMD_DEFAULT}; MQCHAR messBuf[BUFFER_LENGTH]; MQLONG messLength; int procReturnVal; EXEC SQL begin declare section; char *userID; EXEC SQL end declare section; /* First argument is the trigger message */ if (argc != 2) { fprintf(stderr, "Program invoked incorrectly, insufficient arguments\n"); exit(1); } triggerData = (PMQTMC2 )argv[1]; /* Read the configuration file */ if (read_config_file(triggerData->UserData, sizeof(triggerData->UserData))) { fprintf(stderr, "Unable to read config file\n"); exit(1); } userID = configFile.DatabaseLogin; /* Connect to the specified Q manager */ /* and then the dedicated Q for this */ /* application processor */ connect_to_Mgr(triggerData->QMgrName, &(QMgrConHandle)); change_trigger(QMgrConHandle, 0, triggerData->QName); connect_to_Q(QMgrConHandle, triggerData->QName, &(QHandle)); /* Connect to the ORACLE database */ EXEC SQL connect :userID; /* Get messages from the message queue */ while (1) { /* Read the next message */ if (!get_message(QMgrConHandle, QHandle, triggerData->QName, messBuf, &(messLength), &(messageDesc))) break; /* Call the external function provided by */ /* the application developers to perform */ /* the required actions for this message */ if ((procReturnVal = process_message(messBuf, messLength)) != 0) { fprintf(stderr, "Error processing message, field length invalid\n"); rollback(QMgrConHandle); close_Q(QMgrConHandle, &(QHandle)); disconnect_from_Mgr(&(QMgrConHandle)); exit(procReturnVal); } /* Everything has worked okay so */ /* commit the GET */ commit(QMgrConHandle); } /* Logoff ORACLE gracefully */ EXEC SQL commit work release; /* Reset the trigger on the Q */ close_Q(QMgrConHandle, &(QHandle)); change_trigger(QMgrConHandle, 1, triggerData->QName); /* Disconnect from Q */ disconnect_from_Mgr(&(QMgrConHandle)); return 0; }