mpgrep is a massive parallel grep tool to search for strings in large blobs
The aim of the tool is to search for strings , this sounds simple but become problematic if large blobs are involved.
This tool divides the blobs into blocks and starts a separate process or thread for each block.
A couple of features from tclmqmsgque are used to archiev this aim:
Add client, server and worker code into a single executable.
Example from mpgrep.tcl
→ using multiple application-entry-points in a single executable
# configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting [MqFactoryC Add "GrepClient"] Default MqFactoryC Add "GrepServer" MqFactoryC Add "GrepWorker"
Useable as client or as server, local or remote.
@
pipe and the server start the workers mpgrep --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread
mpgrep GrepServer --tcp --port 2345 --thread
mpgrep --tcp --port 2345 --filename ../../data/test.data.new --jobs 4 --string hello_world
Distribute work in parallel.
Example from mpgrep.tcl
→ using an asynchronous-service-call to distribute jobs to multiple workers
# setup worker foreach id $ids { set largs [$args Dup] my SlaveWorker $id "GrepWorker" [$largs AppendLA --name "wk-cl-$id" @ --name "wk-sv-$id" ] } # call the "GREP" service on "Worker" using an ASYNCRONE service call foreach id $ids { [my SlaveGet $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str } # wait for all jobs finished foreach id $ids { my ProcessEvent ONCE }
#!/usr/bin/env tclsh #+ #: @file NHI1/example/tcl/mpgrep.tcl #: @brief mpgrep.tcl - 26 Jun 2024 - aotto1968 #: @copyright (C) NHI - #1 - Project - Group #: This software has NO permissions to copy, #: please contact AUTHOR for additional information #: @version 08a242faddae5101924d8fe811888e78892a82d9 #: @date Wed Jun 26 14:26:21 2024 +0200 #: @author aotto1968 <aotto1968@t-online.de> #: # example: # > time Nhi1Exec grep.tcl --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --spawn package require tclmsgque::MqMsgque namespace import tclmsgque::MqMsgque::* namespace import tclmsgque::MkKernel::* proc PrintResults {lst} { puts "POSITIONS --- ( num=[llength $lst] ) -----------------" set idx 1 foreach p $lst { puts -nonewline [format {%-10s, } $p] if {($idx % 8) == 0} { puts "" } incr idx } puts "" puts "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" } # "worker" application ::oo::class create GrepWorker { superclass MqContextC # service to serve all incoming requests for token "GREP" method GREP_Service {} { # get job configuration set fn [ my ReadSTR ] set startZ [ my ReadI64 ] set endZ [ my ReadI64 ] set st [ my ReadSTR ] my LogC [format "START: startZ=%-10ld, endZ=%-10ld, st=%-10s, fn=%s\n" $startZ $endZ $st $fn] # read BLOCK of data from "fn" set blkZ [expr {$endZ-$startZ}] set bk [expr {1024 * 1024 * 128}] set FH [open $fn rb] seek $FH $startZ start set pos $startZ set poL [list] set ovZ [expr {[string length $st] - 1}]; # overlap of blocks set step [expr {$bk + $ovZ}]; # per iteration while {true} { if {($pos + $step) > $endZ} { set step [expr {$endZ - $pos}] } set dt [read $FH $step]] #my LogC [format "pos=%-10d, step=%-10d, todo=%-10d, readZ=%-10d\n" $pos $step [expr {$endZ-$pos}] [string length $dt]] foreach r [regexp -all -inline -indices "$st" $dt] { foreach {p w} $r break lappend poL [expr {$pos + $p}] } seek $FH -$ovZ current incr pos $bk if {$pos >= $endZ} break } close $FH # return start INDICES as I64 integer my SendSTART foreach po $poL { my SendI64 $po } my SendRETURN } # define a service as link between the token "GREP" and the callback "GREP_Service" method serverSetup {} { my ServiceCreate "GREP" GREP_Service } # factory startup (constructor) constructor {{tmpl ""}} { next $tmpl my ConfigSetServerSetup serverSetup } } # "server" application ::oo::class create GrepServer { superclass MqContextC export variable # callback used to receive the job data method callback {} { [my SlaveGetMaster] variable RESULTS set vals [my ReadLIST] lappend RESULTS {*}$vals my LogC [format "END: num=%-10lu\n" [llength $vals]] } method GREP {} { my variable RESULTS set jobs [ my ReadI32 ] set str [ my ReadSTR ] set fn [ my ReadSTR ] set args [ my ReadBFL ] my LogC [format "START: jobs=%-4d, str=%-20s, fn=%s, args=%s\n" $jobs $str $fn [$args ToString]] # setup id's set ids [list] for {set i 0} {$i < $jobs} {incr i} { lappend ids [expr {$i+10}] } # setup job start position set strZ [string length $str] set fnZ [file size $fn] set blk [expr {$fnZ / $jobs}] set start 0 foreach id [lrange $ids 0 end-1] { set startB($id) $start incr start $blk set endB($id) [expr {$start + $strZ -1}]; # overlap blocks because of string maybe on the split } set id [lindex $ids end] set startB($id) $start set endB($id) $fnZ # [jobs_example] # setup worker foreach id $ids { set largs [$args Dup] my SlaveWorker $id "GrepWorker" [$largs AppendLA --name "wk-cl-$id" @ --name "wk-sv-$id" ] } # call the "GREP" service on "Worker" using an ASYNCRONE service call foreach id $ids { [my SlaveGet $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str } # wait for all jobs finished foreach id $ids { my ProcessEvent ONCE } # [jobs_example] # send RESULT back to client my SendSTART foreach rs $RESULTS { my SendI64 $rs } my SendRETURN #ToDo: my Send "R" "W*" {*}$RESULTS # cleanup $args Delete } # define a service as link between the token "HLWO" and the callback "MyFirstService" method serverSetup {} { my ServiceCreate "GREP" GREP } # factory startup (constructor) constructor {{tmpl ""}} { next $tmpl my ConfigSetServerSetup serverSetup my variable RESULTS [list] } } # "client" application ::oo::class create GrepClient { superclass MqContextC # factory startup (constructor) constructor {{tmpl ""}} { next $tmpl } } # create a context using the static tclmsgque CTOR method "Create" # only used to start the initial process tclmsgque::MqMsgque Main { set ctx [MqContextC Create] try { set args [MkBufferListC CreateLA {*}$argv] # [factory_example] # configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting [MqFactoryC Add "GrepClient"] Default MqFactoryC Add "GrepServer" MqFactoryC Add "GrepWorker" # [factory_example] # choose context set ctx [[MqFactoryC GetCalledL $args] New] if {[$ctx ConfigGetIsServer]} { # SERVER enter eventloop $ctx LinkCreate $args $ctx ProcessEvent MQ_WAIT_FOREVER } else { # CLIENT parse arguments set fn [$args CheckOptionSTR "--filename"] set jobs [$args CheckOptionI32 "--jobs" ] set str [$args CheckOptionSTR "--string" ] $ctx LogC [format "SETUP: jobs=%-4d, str=%-20s, fn=$fn\n" $jobs $str $fn] $ctx LinkCreate [$args AppendLA @ "GrepServer" ] set RESULTS [ [ [$ctx Send "W" "GREP:ICCL@*" $jobs $str $fn $args] Sort ] ToList] PrintResults $RESULTS } } on error {} { $ctx ErrorCatch } finally { # delete the context using the tclmsgque APPLICATION-DTOR method "Exit" $ctx Exit } }