/******************************************************************** * Program to read a file and write, line by line, to a queue ********************************************************************/ #include #include #include #include #define MAX_FNAME_LEN 128 #define BUFFER_LENGTH 64000 #include "config.h" ConfigFileT configFile; int UseReplyQueue; int i; MQOD odG = {MQOD_DEFAULT}; /* Object Descriptor for MQGET */ MQOD odP = {MQOD_DEFAULT}; /* Object Descriptor for MQPUT1 */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */ MQHCONN Hcon; /* connection handle */ MQHOBJ PHobj; /* PUT object handle */ MQHOBJ GHobj; /* GET object handle */ MQLONG O_options; /* MQOPEN options */ MQLONG C_options; /* MQCLOSE options */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ MQLONG CReason; /* reason code for MQCONN */ MQLONG buflen; /* buffer length */ MQLONG messlen; /* message length received */ char buffer[BUFFER_LENGTH]; /* message buffer */ MQBYTE24 SavedMsgId; /* Saved PUT Message ID */ /********************************************************************/ /********************************************************************/ /********************************************************************/ void ConnQMgr(void) { MQCONN(configFile.QManager, &(Hcon), &(CompCode), &(CReason)); /* Report reason and stop if it failed */ if (CompCode == MQCC_FAILED) { printf("MQCONN ended with reason code %ld\n", CReason); exit(CReason); } } /********************************************************************/ /********************************************************************/ /********************************************************************/ void ShutDown(MQLONG error_code) { /* No close options */ C_options = 0; /* Output Q first */ MQCLOSE(Hcon, &(PHobj), C_options, &(CompCode), &(Reason)); /* Input Q next */ MQCLOSE(Hcon, &(GHobj), C_options, &(CompCode), &(Reason)); /* Disconnect from MQ */ MQDISC(&(Hcon), &(CompCode), &(Reason)); printf("SEND2AIM end\n"); exit(error_code); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void OpenReplyQ(void) { strncpy(md.ReplyToQ, configFile.RQName, MQ_Q_NAME_LENGTH); strncpy(md.ReplyToQMgr, configFile.QManager, MQ_Q_MGR_NAME_LENGTH); strncpy(odG.ObjectName, configFile.RQName, MQ_Q_NAME_LENGTH); printf("Source queue is %s\n", odG.ObjectName); /* Try to open as shared, will fail if cannot */ O_options = MQOO_INPUT_SHARED | MQOO_FAIL_IF_QUIESCING; MQOPEN(Hcon, &(odG), O_options, &(GHobj), &(OpenCode), &(Reason)); /* Report reason, if any; stop if failed */ if (Reason != MQRC_NONE) printf("MQOPEN ended with reason code %ld\n", Reason); if (OpenCode == MQCC_FAILED) printf("Unable to open queue for input\n"); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void OpenSendQ(void) { /* Open the target message queue for output */ strncpy(odP.ObjectName, configFile.QName, MQ_Q_NAME_LENGTH); printf("Target queue is %s\n", odP.ObjectName); O_options = MQOO_OUTPUT | MQOO_FAIL_IF_QUIESCING; MQOPEN(Hcon, &(odP), O_options, &(PHobj), &(OpenCode), &(Reason)); /* Report reason, if any; stop if failed */ if (Reason != MQRC_NONE) printf("MQOPEN ended with reason code %ld\n", Reason); else printf("MQOPEN for PUT successful.\n"); if (OpenCode == MQCC_FAILED) printf("Unable to open queue for output\n"); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void SendMess(void) { /* Put each buffer to the message queue */ memcpy(md.Format, MQFMT_STRING, MQ_FORMAT_LENGTH); if (UseReplyQueue) { md.MsgType = MQMT_REQUEST; md.Report = MQRO_PASS_CORREL_ID; } else { md.MsgType = MQMT_DATAGRAM; md.Report = MQRO_NONE; } memcpy(md.MsgId, MQMI_NONE, MQ_MSG_ID_LENGTH); memcpy(md.CorrelId, MQCI_NONE, MQ_CORREL_ID_LENGTH); MQPUT(Hcon, PHobj, &(md), &(pmo), buflen, buffer, &(CompCode), &(Reason)); /* Report reason, if any */ if (Reason != MQRC_NONE) { printf("MQPUT ended with reason code %ld\n", Reason); ShutDown(Reason); } #ifdef DEBUG printf("Message ID: "); for (i = 0; i < MQ_MSG_ID_LENGTH; i++) printf("%02X", md.MsgId[i]); printf("'X\n"); printf("Correll ID: "); for (i = 0; i < MQ_CORREL_ID_LENGTH; i++) printf("%02X", md.CorrelId[i]); printf("'X\n"); printf("Sent.\n"); #endif } /********************************************************************/ /********************************************************************/ /********************************************************************/ void GetReply(void) { /* Now browse the the Reply-To Q to see if the PUT has been */ /* processed - we know this if the Correl. ID matches the orig. */ /* message ID. */ gmo.Options = MQGMO_WAIT | MQGMO_CONVERT; gmo.WaitInterval = 50000; MQGET(Hcon, GHobj, &(md), &(gmo), BUFFER_LENGTH, buffer, &(messlen), &(CompCode), &(Reason)); if (Reason != MQRC_NONE) { printf("MQGET ended with reason code %ld\n", Reason); ShutDown(Reason); } #ifdef DEBUG else { printf("Received acknowledgement to message.\n"); printf("Message ID: "); for (i = 0; i < MQ_MSG_ID_LENGTH; i++) printf("%02X", md.MsgId[i]); printf("'X\n"); printf("Correll ID: "); for (i = 0; i < MQ_CORREL_ID_LENGTH; i++) printf("%02X", md.CorrelId[i]); printf("'X\n"); } #endif } /********************************************************************/ /********************************************************************/ /********************************************************************/ int main(int argc, char **argv) { FILE *stream = stdin; int msgsSent = 0; int msgsToSend; printf("SEND2AIM start\n"); if (argc < 2) { printf("Required parameter missing - config file name (%s).\n", argv[0]); exit(99); } if (argc > 2) { if ((stream = fopen(argv[2], "r")) == NULL) { printf("Cannot open input file - (%s).\n", argv[2]); exit(100); } msgsToSend = atoi(argv[3]); } else msgsToSend = -1; if (read_config_file(argv[1], MAX_FNAME_LEN) != 0) { printf("Error reading config file %s (%s).\n", argv[1], argv[0]); exit(1); } UseReplyQueue = strlen(configFile.RQName); ConnQMgr(); if (UseReplyQueue) OpenReplyQ(); OpenSendQ(); /* Read lines from the file and put them to the message queue */ /* Loop until null line or end of file, or there is a failure */ printf("Sending messages ...\n"); while (CompCode != MQCC_FAILED) { if (fgets(buffer, BUFFER_LENGTH, stream) != NULL) { /* Sent all the required messages ? */ if (msgsToSend >= 0) if (msgsToSend-- <= 0) break; buflen = strlen(buffer); if (buffer[buflen - 1] == '\n') { buffer[buflen - 1] = '\0'; --buflen; } SendMess(); #ifdef DEBUG printf("%i:%.79s\n", msgsSent, buffer); #endif msgsSent++; if (UseReplyQueue) GetReply(); } else if (msgsToSend < 0) CompCode = MQCC_FAILED; else rewind(stream); } printf("Sent %d messages\n", msgsSent); ShutDown(0); }