mpgrep is a massive parallel grep tool to search for strings in large blobs
The aim of the tool is to search for strings , this sounds simple but become problematic if large blobs are involved.
This tool divides the blobs into blocks and starts a separate process or thread for each block.
A couple of features from ccmqmsgque are used to archiev this aim:
Add client, server and worker code into a single executable.
Example from mpgrep.cc
→ using multiple application-entry-points in a single executable
// configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting MqFactoryCT<GrepClient>::Add("GrepClient")->Default(); MqFactoryCT<GrepServer>::Add("GrepServer"); MqFactoryCT<GrepWorker>::Add("GrepWorker");
Useable as client or as server, local or remote.
@
pipe and the server start the workers mpgrep --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread
mpgrep GrepServer --tcp --port 2345 --thread
mpgrep --tcp --port 2345 --filename ../../data/test.data.new --jobs 4 --string hello_world
Distribute work in parallel.
Example from mpgrep.cc
→ using an asynchronous-service-call to distribute jobs to multiple workers
// setup worker for ( auto job : jobL) { SlaveWorker( job.id, "GrepWorker", args->Dup()->AppendVA("--prefix", "wk-cl-", "--postfix", int2str(job.id), "@", "--prefix", "wk-sv-", NULL) ); } // call the "GREP" service on "Worker" using an ASYNCRONE service call for ( auto job : jobL) { SlaveGet(job.id)->Send("C",MqTokenICB(&GrepServer::callback),"GREP:CWWC", fn, job.startB, job.endB, str); } // wait for all jobs finished for (i=0; i < jobs; i++) { ProcessEvent(MQ_WAIT_ONCE); }
/** * @file NHI1/example/cc/mpgrep.cc * @brief mpgrep.cc - 12 Nov 2024 - aotto1968 * @copyright (C) NHI - #1 - Project - Group * This software has NO permission to copy, * please contact AUTHOR for additional information * @version 478c13192af70c38a452c665c8243fd6902efb63 * @date Tue Nov 12 14:49:07 2024 +0100 * @author aotto1968 <aotto1968@t-online.de> */ /* LABEL-START */ #include "debug_mq.h" #include "LibMqMsgque_cc.hh" /* LABEL-END */ // example: // Nhi1Exec mpgrep.cc --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread #include <list> #include <stdexcept> #include <iostream> #include <fstream> #include <cstring> #include <stdio.h> #include <sys/stat.h> using namespace ccmqmsgque; // "worker" application class GrepWorker : public MqContextC, public MqServerSetupIF { friend class MqFactoryCT<GrepWorker>; // define the factory constructor GrepWorker(MK_TYP const typ, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) { //printTxt("GrepWorker CTOR"); }; private: static char* read_from_file_open(MK_STRN filename, const std::streamoff start, const std::streamsize size) { std::ifstream fd(filename, std::ios_base::in | std::ios_base::binary); char *buffer= (char*)malloc(size); fd.seekg(start,std::ios_base::beg); fd.read(buffer,size); return buffer; } // service to serve all incoming requests for token "HLWO" void GREP_Service () { // get job configuration auto fnP = ReadSTR() ; auto startZ = ReadI64() ; auto endZ = ReadI64() ; auto stP = ReadSTR() ; LogV(__func__,0,"START: startZ=%-10lld, endZ=%-10lld, st=%-10s, fn=%s\n",startZ,endZ,stP,fnP); // read BLOCK of data from "fnP" size_t blkZ = endZ-startZ; char* datP = read_from_file_open(fnP, startZ, blkZ); // return start INDICES as I64 integer size_t stZ = strlen(stP); char* endP = datP + blkZ; char* curP = datP; SendSTART(); while (curP < endP) { //LogV(__func__,0,"curP<%p>, endP<%p>, todo<%zi>\n",curP,endP,endP-curP); curP = (char*) memmem(curP,endP-curP,stP,stZ); if (curP == NULL) break; SendI64(startZ+(curP-datP)); curP += stZ; } SendRETURN(); free(datP); } // define a service as link between the token "GREP" and the callback "GrepWorker::GREP_Service" void ServerSetup() { ServiceCreate("GREP", MqServiceICB(&GrepWorker::GREP_Service)); } }; // cammand "server" application class GrepServer : public MqContextC, public MqServerSetupIF { friend class MqFactoryCT<GrepServer>; public: // define the factory constructor GrepServer(MK_TYP const typ=NULL, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) { if (!ConfigGetIsServer()) ConfigSetName("GrepServer"); }; public: MkBufferListC RESULTS; private: // callback used to process the WORKER results void callback() { auto master = static_cast<GrepServer*>(SlaveGetMaster()); master->RESULTS.AppendLP(ReadALL()); LogV(__func__,0,"END: num=%-10i\n", master->RESULTS.Size()); } static MK_STRN int2str (MK_I32 i) { static char buffer[30]; snprintf(buffer,20,"%i", i); return buffer; } public: MkBufferListC* GrepServer_exec ( MK_I32 jobs, MK_STRN str, MK_STRN fn, MkBufferListC *args ) { // setup job start position struct jobS { int id; size_t startB; size_t endB; }; struct stat st; int err = stat(fn, &st); if (err == -1) { throw(std::invalid_argument("--filename = '" + std::string(fn) + "' : error = " + std::strerror(errno))); } size_t strZ = strlen(str); if (strZ == 0) { throw(std::invalid_argument("--string : error = size is '0'")); } LogV(__func__,0,"START: jobs=%-4d, str=%-20s, fn=%s[%zdMB], args=%s\n", jobs,str,fn,st.st_size/1024/1024,args->ToString()); size_t blk = st.st_size / jobs; size_t start = 0; int i; std::list<struct jobS> jobL; for (i=0; i < jobs-1; i++) { // overlap blocks (strZ-1) because a string maybe on the split jobL.push_back((struct jobS) { i+MQ_SLAVE_USER, start, start+blk+strZ-1 }); start += blk; } // LAST block always have to match the end jobL.push_back((struct jobS) { i+MQ_SLAVE_USER, start, (size_t)st.st_size }); // [jobs_example] // setup worker for ( auto job : jobL) { SlaveWorker( job.id, "GrepWorker", args->Dup()->AppendVA("--prefix", "wk-cl-", "--postfix", int2str(job.id), "@", "--prefix", "wk-sv-", NULL) ); } // call the "GREP" service on "Worker" using an ASYNCRONE service call for ( auto job : jobL) { SlaveGet(job.id)->Send("C",MqTokenICB(&GrepServer::callback),"GREP:CWWC", fn, job.startB, job.endB, str); } // wait for all jobs finished for (i=0; i < jobs; i++) { ProcessEvent(MQ_WAIT_ONCE); } // [jobs_example] return &RESULTS; } private: void GREP_service() { auto jobs = ReadI32(); auto str = ReadSTR(); auto fn = ReadSTR(); auto args = ReadBFL(); // send RESULT back to client Send("R", "L", GrepServer_exec(jobs, str, fn, args)); } // define a service as link between the token "GREP" and the callback "GrepServer::GREP" void ServerSetup() { ServiceCreate("GREP",MqServiceICB(&GrepServer::GREP_service)); } }; // "client" application class GrepClient : public MqContextC { friend class MqFactoryCT<GrepClient>; // define the factory constructor GrepClient(MK_TYP const typ, MqContextC* tmpl=NULL) : MqContextC(typ, tmpl) { }; private: static void PrintResults (MkBufferListC *bfl) { bfl->Sort(); printf("POSITIONS --- ( num=%d ) -----------------\n", bfl->Size()); for (MK_NUM idx=0; idx<bfl->Size(); idx++) { printf("%-10lld, ", bfl->IndexGetBUF(idx)->GetI64()); if (((idx+1) % 8) == 0) { printf("\n"); } } printf("\n"); printf("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n"); bfl->Delete(); }; public: void GrepClient_exec (MkBufferListC &args) { // CLIENT parse arguments auto fn = strdup(args.CheckOptionSTR("--filename")); auto jobs = args.CheckOptionI32("--jobs",1); auto str = strdup(args.CheckOptionSTR("--search")); if (*fn == '\0' or *str == '\0') throw std::invalid_argument( "usage: mpgrep --filename FILE --search STRING --jobs NUM:1" ); LogV(__func__,0,"SETUP: jobs=%-4d, str=%-20s, fn=%s\n",jobs,str,fn); MkBufferListC *RESULTS; if (args.SearchC("--tcp") != -1 || args.SearchC("--uds") != -1) { LinkCreate(args); // start the GREP Send("W","GREP:ICC[]",jobs,str,fn); RESULTS = ReadBFL()->Dup(); } else { // extract FIRST argument (the executable) from args // TODO: args.IndexExtract→leek MqMsgque::InitSetArg0VA(args.IndexExtract(0)->GetSTR(),NULL); // start LOCAL command-server RESULTS = GrepServer().GrepServer_exec(jobs,str,fn,&args)->Dup(); } PrintResults(RESULTS); } }; // package-main int MK_CDECL main(int argc, MK_STRN argv[]) { MqMsgque::Setup(); // setup commandline arguments for later use MkBufferListC args = {argc, argv}; // [factory_example] // configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting MqFactoryCT<GrepClient>::Add("GrepClient")->Default(); MqFactoryCT<GrepServer>::Add("GrepServer"); MqFactoryCT<GrepWorker>::Add("GrepWorker"); // [factory_example] // inspect commandline-arguments for the "factory" to choose… auto fct = MqFactoryCT<MqContextC>::GetCalled(args); // and create the initial instance auto ctx = fct->New(); // depend on "ctx" start server or client try { if (ctx->ConfigGetIsServer()) { // SERVER enter eventloop ctx->LinkCreate(args); ctx->ProcessEvent(MQ_WAIT_FOREVER); } else { // CLIENT call exec dynamic_cast<GrepClient*>(ctx)->GrepClient_exec(args); } } catch (const std::exception& e) { ctx->ErrorCatch(e); } ctx->Exit(); } /// [server_example]