/******************************************************************** * Program to trawl an Oracle table and place rows into MQ messages ********************************************************************/ #include #include #include /* includes for MQI */ #include #include "config.h" #define BUFFER_LENGTH 64000 #define MAX_FNAME_LEN 128 #define ROWID_SIZE 18 #define DEFAULT_SLEEP_TIME 10 #define STOP_COMMAND "ORA_RECV STOP" #define DEFAULT_ORAID "/" EXEC SQL include sqlca; EXEC SQL declare GETMESS cursor for select QUEUE_NAME, SEND_MESSAGE, ROWID from MQ_MESSAGES order by SEQ_NUMBER; char *uid; ConfigFileT configFile; MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQHCONN Hcon; /* connection handle */ MQHOBJ Hobj; /* object handle */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ EXEC SQL begin declare section; varchar QueueName[MQ_Q_MGR_NAME_LENGTH]; varchar buffer[BUFFER_LENGTH]; varchar row_id[ROWID_SIZE]; EXEC SQL end declare section; /********************************************************************/ /********************************************************************/ /********************************************************************/ void ServerShutDown(int ExitCode) { /* Disconnect from MQ */ MQDISC(&(Hcon), &(CompCode), &(Reason)); if (Reason != MQRC_NONE) fprintf(stderr, "MQDISC ended with reason code %ld\n", Reason); EXEC SQL commit work release; fprintf(stderr, "Daemon exiting.\n"); exit(ExitCode); } /********************************************************************/ /********************************************************************/ /********************************************************************/ void sql_error(void) { char msg_buffer[512]; int msg_length; int buffer_size = 512; EXEC SQL whenever sqlerror continue; fprintf(stderr, "SQL code %ld.\n", sqlca.sqlcode); sqlglm(msg_buffer, &(buffer_size), &(msg_length)); fprintf(stderr, "Daemon error while executing:\n"); fprintf(stderr, "%.*s\n", msg_length, msg_buffer); fprintf(stderr, "Daemon quitting.\n"); ServerShutDown(1); } /********************************************************************/ /********************************************************************/ /********************************************************************/ int PutToQ(void) { printf("Attempt to put: %.*s, onto queue :%.*s.\n", buffer.len, buffer.arr, QueueName.len, QueueName.arr); printf("Length of buffer is %d.\n", buffer.len); strncpy(od.ObjectName, (char *)QueueName.arr, MQ_Q_NAME_LENGTH); od.ObjectType = MQOT_Q; pmo.Options = MQPMO_FAIL_IF_QUIESCING; /* Put each buffer to the message queue */ if (buffer.len > 0) { memcpy(md.Format, MQFMT_STRING, MQ_FORMAT_LENGTH); memcpy(md.MsgId, MQMI_NONE, MQ_MSG_ID_LENGTH); memcpy(md.CorrelId, MQCI_NONE, MQ_CORREL_ID_LENGTH); MQPUT1(Hcon, &(od), &(md), &(pmo), buffer.len, buffer.arr, &(CompCode), &(Reason)); switch (Reason) { case MQRC_NONE : /* No problem, continue on */ break; /* The following errors may be */ /* curable if we sleep for a */ /* bit and then retry the put */ case MQRC_PUT_INHIBITED: fprintf(stderr, "MQPUT1 failed, PUT_INHIBITED, retrying\n"); break; case MQRC_Q_FULL : fprintf(stderr, "MQPUT1 failed, Q_FULL, retrying\n"); break; default : /* Fatal error */ fprintf(stderr, "MQPUT1 ended with reason code %ld\n", Reason); ServerShutDown(2); } /* Return the reason code */ return Reason; } fprintf(stderr, "Attempted to put empty buffer !!\n"); return MQRC_NONE; } /********************************************************************/ /********************************************************************/ /********************************************************************/ int main(int argc, char **argv) { if (argc < 2) { printf( "Usage: %s config.cfg\n", argv[0]); exit(1); } if (read_config_file(argv[1], MAX_FNAME_LEN) != 0) { fprintf(stderr, "Error reading config file %s.\n", argv[1]); exit(1); } uid = configFile.DatabaseLogin; EXEC SQL whenever sqlerror do sql_error(); EXEC SQL connect :uid; printf("Starting MQSeries/Oracle interface.\n"); /* Connect to queue manager */ MQCONN(configFile.QManager, &(Hcon), &(CompCode), &(Reason)); /* Report reason and stop if it failed */ if (CompCode == MQCC_FAILED) { fprintf(stderr, "MQCONN ended with reason code %ld\n", Reason); exit(Reason); } while (1) { EXEC SQL open GETMESS; EXEC SQL whenever not found do break; while (1) { EXEC SQL fetch GETMESS into :QueueName, :buffer, :row_id; /* Have we been asked to stop? */ if (strncmp((char *)QueueName.arr, STOP_COMMAND, QueueName.len) == 0) { fprintf(stderr, "Stop request received.\n"); EXEC SQL delete from MQ_MESSAGES where ROWID = :row_id; ServerShutDown(0); /* Yes we have, die in here */ } if (PutToQ() == MQRC_NONE) { /* If the PUT was successful */ /* remove the row from the */ /* input table */ printf("Delete from table.\n"); EXEC SQL delete from MQ_MESSAGES where ROWID = :row_id; } else { /* The PUT failed, but not in */ /* a fatal manner. Sleep for a */ /* bit and then try again */ break; } } EXEC SQL commit work; /* Done for now, sleep and check again... */ sleep(configFile.SleepTime); } }