Documentation of the Filter6 tool used for trans2.test
.
The Filer6 tool is used to test the filter-feature of libmqmsgque.
To run the filter test, a first client, one or more filters and a final server are created. All are connected to the libmqmsgque protocol.
The trans2.test
carries out common filter tests and special stress tests. A stress test is performed by exiting one or more filters or servers and observing the response and behavior when reconnecting.
The GOAL for this test is:
/** * @file NHI1/example/c/Filter6.c * @brief Filter6.c - 02 Oct 2024 - aotto1968 * @copyright (C) NHI - #1 - Project - Group * This software has NO permission to copy, * please contact AUTHOR for additional information * @version 6b6d463bfff4302b7ea9898694b5548df6d8aef4 * @date Wed Oct 2 23:41:27 2024 +0200 * @author aotto1968 <aotto1968@t-online.de> */ /* LABEL-START */ #define META_FILE_NAME "Filter6.c" /* LABEL-END */ #include <regex.h> #include "common.h" #define TRANSCTX ((struct FilterCtxS*const)mqctx) #define META_CONTEXT_S mqctx #define SETUP_ctx struct FilterCtxS*const ctx = TRANSCTX #define CHECK_MQ_SERVICE_CALL_ARGS(s) \ if (MqReadGetNumItems(mqctx)) { \ return MkErrorSetV_2M (mqctx, "usage: %s (%s)\n", __func__, s); \ } #define check_MqError(E) if (MkErrorCheckI(E)) #define ftrctx(x) ((struct FilterCtxS*const)x) struct FilterCtxS { struct MqContextS mqctx; struct MkLogFileS *FH; regex_t rgx; bool isWriter; }; static MK_I32 retryCnt = 3; static MK_TYP Filter6TT = NULL; #define Filter6T ((typeof(MqContextC_T)) Filter6TT) /*****************************************************************************/ /* */ /* Request Handler */ /* */ /*****************************************************************************/ static void __attribute__ ((noreturn)) FilterHelp (const char * base) { fputs("\n", stderr); fprintf(stderr, "usage: %s [OPTION]... [ARGUMENT]...\n", base); fputs("\n", stderr); fputs(" This tool is database (MqStorage...) test tool of NHI1.\n", stderr); fputs("\n", stderr); fprintf(stderr, " %s [ARGUMENT]... syntax:\n", base); fprintf(stderr, " aclient... %c %s ... %c aserver\n", MK_ALFA, base, MK_ALFA); fputs("\n", stderr); fputs(MqHelp (NULL), stderr); fputs("\n", stderr); fprintf(stderr, " %s [OPTION]:\n", base); fputs( " -h, --help print this help\n", stderr); fputs("\n", stderr); fputs( " --retryCnt how often an item should be send on error (default: 3)\n", stderr); fputs("\n", stderr); exit(EXIT_SUCCESS); } static void ErrorWrite ( MK_RT_ARGS MQ_CTX const mqctx ) { SETUP_ctx; MkLogFileWriteV(ctx->FH,"ERROR: %s\n", MkErrorGetText_0E()); MkErrorReset_1X (mqctx); } static enum MkErrorE FilterEvent ( MQ_SERVICE_CALL_ARGS ) { //X0(mqctx) MQ_LTR Id = 0LL; //MqStorageLog(mqctx,__func__); // check if an item is available if (MqStorageCount_e(mqctx) == 0) { // no data available MkRtSetup_XN(mqctx); MkErrorSetCONTINUE_0E(); return MkErrorGetCode_0E(); } else { //MQ_LTR Id = 0LL; MQ_CTX ftr; MqSlaveGetFilter_E (mqctx, &ftr); // if connection is down -> connect again //MkErrorCheck1 (MqLinkConnect (ftr)); // fill the read-buffer from storage check_MkErrorE (MqStorageImport (mqctx, &Id)) goto error2; //printXV(mqctx, "START: tok<%s>, Id<%i>\n", MqServiceTokenGet(mqctx), Id); //MqStorageLog(mqctx,"FilterEvent"); // send BDY data to the link-target, on error write message but do not stop processing if (MkErrorCheckI (MqProxyForward(mqctx, ftr, NULL, 10))) { MK_I32 errCnt; check_MqError (MqStorageErrCnt(mqctx,Id,&errCnt)) goto error2; if (errCnt <= retryCnt) { check_MqError (MqStorageDecrRef(mqctx,Id)) goto error2; MkErrorReset_1X(mqctx); //printXV(mqctx, "CATCH: reset → %i\n", Id); return MK_OK; } else { goto error2; } } //printXV(mqctx, "END: delete → %i\n", Id); MqStorageDelete_E (mqctx, &Id); return MK_OK; error2: //printXV(mqctx,"ERR: error2 → %i\n", Id); ErrorWrite ( MK_RT_CALL mqctx); if (Id != 0LL) MqStorageDelete_E (mqctx, &Id); return MK_OK; } return MK_OK; error: //printXV(mqctx,"ERR: error → %i\n", Id); return MkErrorStack_1X(mqctx); } static enum MkErrorE LOGF ( MQ_SERVICE_CALL_ARGS ) { SETUP_ctx; if ( regexec(&ctx->rgx, MqConfigGetName (mqctx), 0, NULL, 0) == 0 ) { ctx->isWriter = true; MK_STRN file = MqReadSTR_e (mqctx); ctx->FH = MkLogFileOpen_e(MkOBJ(mqctx),file); } else { MQ_CTX ftr = MqSlaveGetFilter_e (mqctx); MqProxyForward_E (mqctx, ftr, NULL,MK_TIMEOUT_DEFAULT); ctx->FH = MkLogFileOpen_e(MkOBJ(mqctx),"stderr"); } error: return MqSendRETURN(mqctx); } static enum MkErrorE WRIT ( MQ_SERVICE_CALL_ARGS ) { MQ_CTX master = MqSlaveGetMaster (mqctx); struct FilterCtxS *ftrCtx = ftrctx(master); if ( ftrCtx->isWriter ) { MkLogFileWriteV_E (ftrCtx->FH, "%s\n", MqReadSTR_e(mqctx)); } else { MqProxyForward_E (mqctx, master, NULL, MK_TIMEOUT_DEFAULT); } error: return MqSendRETURN(mqctx); } static enum MkErrorE EXIT ( MQ_SERVICE_CALL_ARGS ) { MqExit_1(mqctx); return MK_OK; } static enum MkErrorE SOEX ( MQ_SERVICE_CALL_ARGS ) { MkErrorSetEXIT_1M(mqctx); return MK_OK; } static enum MkErrorE FiIn ( MQ_SERVICE_CALL_ARGS ) { //MQ_LTR Id = 0LL; //X0(mqctx) //MqStorageLog(mqctx,"FiIn"); //MqStorageExport (mqctx, &Id); MqStorageExport (mqctx, NULL); //X1(mqctx) //printXV(mqctx, "tok<%s>, Id<%i>\n", MqServiceTokenGet(mqctx), Id); //MqStorageLog(mqctx,"FiIn"); return MqSendRETURN(mqctx); } static enum MkErrorE CLTI ( MQ_SERVICE_CALL_ARGS ) { return MqSend(mqctx,"R","I",mqctx->link.protect.rmtTransLId); } /*****************************************************************************/ /* */ /* context_init */ /* */ /*****************************************************************************/ static enum MkErrorE FilterCleanup ( MQ_SERVICE_CALL_ARGS ) { SETUP_ctx; MkLogFileClose(ctx->FH); regfree(&ctx->rgx); return MK_OK; } static enum MkErrorE FilterSetup ( MQ_SERVICE_CALL_ARGS ) { register SETUP_ctx; MQ_CTX ftr; MqSlaveGetFilter_E (mqctx, &ftr); // [filter_service_example] MqServiceCreate_E (mqctx, "LOGF", LOGF, NULL, NULL, NULL); MqServiceCreate_E (mqctx, "EXIT", EXIT, NULL, NULL, NULL); MqServiceCreate_E (mqctx, "SOEX", SOEX, NULL, NULL, NULL); MqServiceStorage_E (mqctx, "PRNT"); MqServiceStorage_E (mqctx, "PRN2"); MqServiceCreate_E (mqctx, "+ALL", FiIn, NULL, NULL, NULL); MqServiceCreate_E (ftr, "WRIT", WRIT, NULL, NULL, NULL); MqServiceCreate_E (mqctx, "WRIT", WRIT, NULL, NULL, NULL); MqServiceCreate_E (mqctx, "CLTI", CLTI, NULL, NULL, NULL); MqServiceProxy_E (ftr, "WRT2", MQ_SLAVE_MASTER); // [filter_service_example] ctx->FH = NULL; int ret = regcomp(&ctx->rgx, "^(Filter6-1|Filter6|Filter6E|fs1.*)$", REG_EXTENDED); //printXP(mqctx,&ftrctx(mqctx)->rgx) if (ret != 0) { return MkErrorSetC_2M(mqctx,"compiling regular expression failed"); } return MK_OK; error: return MkErrorStack_1X(mqctx); } enum MkErrorE Filter6Factory ( MQ_CALLBACK_FACTORY_CTOR_ARGS ) { MQ_CTX const mqctx = *contextP = MqContextCreate(Filter6TT,tmpl); mqctx->setup.isServer = true; mqctx->setup.ServerSetup.fCall = FilterSetup; mqctx->setup.ServerCleanup.fCall = FilterCleanup; mqctx->setup.Event.fCall = FilterEvent; mqctx->setup.EventIsServer = true; mqctx->setup.ignoreExit = true; return MK_OK; } /*****************************************************************************/ /* */ /* main */ /* */ /*****************************************************************************/ /// [error_example] int main ( const int argc, MK_STRN argv[] ) { AllRtSetup_NULL; // define the new type Filter6TT = MkTypeDup2(MqContextC_TT,"Filter6"); Filter6TT->objsize = sizeof(struct FilterCtxS); Filter6T->fHelp = FilterHelp; // create the factory MQ_FCT mqfct = MqFactoryAdd(MK_ERROR_PANIC, Filter6Factory, NULL,NULL,NULL,NULL,NULL,NULL,NULL,"Filter6"); MQ_CTX mqctx = NULL; #ifdef BUG1 // for BUG testing in 'trans2-2-T1aE-...' MqFactoryInitial(mqfct); #endif // parse the command-line MK_BFL args = MkBufferListCreateVC (argc, argv); // search for option "--retryCnt" MkBufferListCheckOptionI32_E (args, "--retryCnt", 3, true, &retryCnt); // create the filter MqFactoryNew_E (mqfct, NULL, &mqctx); // create the link MqLinkCreate_E (mqctx, args); // start event-loop and wait forever MqProcessEvent_E (mqctx, MQ_WAIT_FOREVER, MK_TIMEOUT_USER); // finish and exit error: MkBufferListDelete(args); MqExit_1 (mqctx); } /// [error_example]