/******************************************************************** * Program to get messages from Q, with optional timing ********************************************************************/ #include #include #include #include #include #include "config.h" #define BUFFER_LENGTH 64000 #define MAX_FNAME_LEN 128 #define IN_SECONDS * 1000 MQOD odG = {MQOD_DEFAULT}; /* Object Descriptor for MQGET */ MQOD odP = {MQOD_DEFAULT}; /* Object Descriptor for MQPUT1 */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */ MQHCONN Hcon; /* connection handle */ MQHOBJ Hobj; /* object handle */ MQLONG O_options; /* MQOPEN options */ MQLONG C_options; /* MQCLOSE options */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ MQLONG CReason; /* reason code for MQCONN */ MQLONG buflen; /* Length of buffer */ MQLONG messlen; /* message length received */ MQLONG QMessageSize = BUFFER_LENGTH; char buffer[BUFFER_LENGTH]; /* GET/PUT buffer */ time_t start_time; time_t end_time; long nmessages; int timing; int i; ConfigFileT configFile; char *CfgFileName; char **CurrentParameter; char *CurrentArg; long DebugFlag = 0; /********************************************************************/ /********************************************************************/ /********************************************************************/ int ServerShutDown(MQLONG ExitReason) { /* Close the source queue */ if (OpenCode != MQCC_FAILED) { /* No close options */ C_options = 0; MQCLOSE(Hcon, &(Hobj), C_options, &(CompCode), &(Reason)); /* Report reason, if any */ if (Reason != MQRC_NONE) printf("MQCLOSE ended with reason code %ld\n", Reason); } /* Disconnect from MQ */ MQDISC(&(Hcon), &(CompCode), &(Reason)); /* report reason, if any */ if (Reason != MQRC_NONE) printf("MQDISC ended with reason code %ld\n", Reason); if (DebugFlag) printf("GETFRAIM end\n"); exit(ExitReason); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void GetCfgParams(int argc, char **argv) { int Targc; int Targv_len; configFile.DefaultQMessageSize = BUFFER_LENGTH ; for (Targc = 1; Targc < argc; Targc++) { CurrentArg = argv[Targc]; if (CurrentArg[0] == '-') { if (strlen(CurrentArg) == 2) { switch (CurrentArg[1]) { case 'c': CurrentParameter = &(CfgFileName); continue; case 'd': printf("DebugFlag found, Debug is ON"); DebugFlag = 1; continue; case 't': printf("Using timing\n"); timing = 1; CurrentArg = argv[++Targc]; nmessages = atol(CurrentArg); printf("Expecting %ld messages\n", nmessages); continue; default: break; } } } if (CurrentParameter != NULL) { *CurrentParameter = argv[Targc]; CurrentParameter = NULL; continue; } printf("Unknown option %s\n", argv[Targc]); printf("Usage: %s -c config.cfg\n", argv[0]); exit(1); } if ((timing && (nmessages == 0)) || ((nmessages != 0) && (timing == 0))) { printf("Use -t for throughput testing\n"); exit(1); } if (read_config_file(CfgFileName, MAX_FNAME_LEN) != 0) { printf( "Unable to open %s\n", CfgFileName ); exit(1); } buflen = QMessageSize; if (DebugFlag) { printf("QManager is %s\n", configFile.QManager); printf("QName is %s\n", configFile.QName); printf("QMessageSize is %d\n", QMessageSize); } } /********************************************************************/ /********************************************************************/ /********************************************************************/ void OpenInputQ(void) { /* Create object descriptor for subject queue */ strncpy(odG.ObjectName, configFile.QName, MQ_Q_NAME_LENGTH); /* Connect to queue manager */ MQCONN(configFile.QManager, &(Hcon), &(CompCode), &(CReason)); /* Report reason and stop if it failed */ if (CompCode == MQCC_FAILED) { printf("MQCONN ended with reason code %ld\n", CReason); exit(CReason); } /* Open the named message queue for input */ O_options = MQOO_INPUT_AS_Q_DEF | MQOO_FAIL_IF_QUIESCING; MQOPEN(Hcon, &(odG), O_options, &(Hobj), &(OpenCode), &(Reason)); /* Report reason, if any; stop if failed */ if (Reason != MQRC_NONE) { printf("MQOPEN ended with reason code %ld\n", Reason); ServerShutDown(Reason); } } /********************************************************************/ /********************************************************************/ /********************************************************************/ void GetNextMess(void) { /* Get messages from the message queue */ /* Loop until there is a failure */ gmo.Options = MQGMO_WAIT | MQGMO_CONVERT; gmo.WaitInterval = configFile.SleepTime IN_SECONDS; /* In order to read the messages in sequence, MsgId and */ /* CorrelID must have the default value */ memcpy(md.MsgId, MQMI_NONE, MQ_MSG_ID_LENGTH); memcpy(md.CorrelId, MQCI_NONE, MQ_CORREL_ID_LENGTH); MQGET(Hcon, Hobj, &(md), &(gmo), buflen, buffer, &(messlen), &(CompCode), &(Reason)); /* Report reason, if any */ if (Reason != MQRC_NONE) { if (Reason == MQRC_NO_MSG_AVAILABLE) printf("no more messages\n"); else printf("MQGET ended with reason code %ld\n", Reason); } else if (timing && (start_time == 0)) start_time = time(NULL); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void ProcessMess(void) { /* Display each message received */ buffer[messlen] = '\0'; printf("message <%s>\n", buffer); printf("Message ID: "); for (i = 0; i < MQ_MSG_ID_LENGTH; i++) printf("%02X", md.MsgId[i]); printf("'X\n"); printf("Correll ID: "); for (i = 0; i < MQ_CORREL_ID_LENGTH; i++) printf("%02X", md.CorrelId[i]); printf("'X\n"); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void SendReply(void) { /* Send reply using MQPUT1, if message type is REQUEST */ if (md.MsgType != MQMT_REQUEST) return; md.MsgType = MQMT_REPLY; /* Copy the ReplyTo queue name to the object descriptor */ strncpy(odP.ObjectName, md.ReplyToQ, MQ_Q_NAME_LENGTH); strncpy(odP.ObjectQMgrName, md.ReplyToQMgr, MQ_Q_MGR_NAME_LENGTH); /* MsgId and CorrelId are currently the values of the */ /* got message. Reset them if requested, then */ /* stop further reports */ if (md.Report & MQRO_PASS_CORREL_ID) memcpy(md.CorrelId, md.MsgId, MQ_CORREL_ID_LENGTH); if (md.Report & MQRO_PASS_MSG_ID) memcpy(md.MsgId, MQMI_NONE, MQ_MSG_ID_LENGTH); md.Report = MQRO_NONE; /* Put the message */ MQPUT1(Hcon, &(odP), &(md), &(pmo), messlen, buffer, &(CompCode), &(Reason)); if (Reason != MQRC_NONE) { printf("MQPUT1 ended with reason code %ld\n", Reason); ServerShutDown(Reason); } if (DebugFlag) printf("Reply Message sent.\n"); } /********************************************************************/ /********************************************************************/ /********************************************************************/ int main(int argc, char **argv) { GetCfgParams(argc, argv); OpenInputQ(); while (1) { GetNextMess(); if (CompCode == MQCC_FAILED) break; if (DebugFlag) ProcessMess(); if (md.MsgType == MQMT_REQUEST) SendReply(); /* Any more messages expected */ if (--nmessages == 0) break; } if (timing) { end_time = time(NULL); printf("Test took %ld seconds\n", end_time - start_time); } ServerShutDown(0); }