theLink 10.0
|
mpgrep is a massive parallel grep tool to search for strings in large blobs
The aim of the tool is to search for strings , this sounds simple but become problematic if large blobs are involved.
This tool divides the blobs into blocks and starts a separate process or thread for each block.
A couple of features from atlmqmsgque are used to archiev this aim:
Add client, server and worker code into a single executable.
Example from mpgrep.atl
→ using multiple application-entry-points in a single executable
# configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting MqFactoryC::Default [MqFactoryC::Add ::GrepClient] MqFactoryC::Add ::GrepServer MqFactoryC::Add ::GrepWorker
Useable as client or as server, local or remote.
@
pipe and the server start the workers mpgrep --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread
mpgrep GrepServer --tcp --port 2345 --thread
mpgrep --tcp --port 2345 --filename ../../data/test.data.new --jobs 4 --string hello_world
Distribute work in parallel.
Example from mpgrep.atl
→ using an asynchronous-service-call to distribute jobs to multiple workers
# setup worker foreach id $ids { set largs [MkBufferListC::Dup $args] SlaveWorker $myNs $id "GrepWorker" [MkBufferListC::AppendLA $largs --name "wk-cl-$id" @ --name "wk-sv-$id" ] } # call the "GREP" service on "Worker" using an ASYNCRONE service call foreach id $ids { [SlaveGet $myNs $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str } # wait for all jobs finished foreach id $ids { ProcessEvent $myNs ONCE }
#!/usr/bin/env atlsh #+ #: @file NHI1/example/atl/mpgrep.atl #: @brief tag: nhi1-release-250425 #: @copyright (C) NHI - #1 - Project - Group #: This software has NO permission to copy, #: please contact AUTHOR for additional information #: # example: # > time Nhi1Exec grep.tcl --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --spawn package require lib_85 package require atlmqmsgque proc PrintResults {lst} { puts "POSITIONS --- ( num=[llength $lst] ) -----------------" set idx 1 foreach p $lst { puts -nonewline [format {%-10s, } $p] if {($idx % 8) == 0} { puts "" } incr idx } puts "" puts "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" } # "worker" application ::myooX::ClassN ::GrepWorker { SuperI ::MqContextC # service to serve all incoming requests for token "GREP" proc GREP_Service { myNs } { # get job configuration set fn [ ReadSTR $myNs ] set startZ [ ReadI64 $myNs ] set endZ [ ReadI64 $myNs ] set st [ ReadSTR $myNs ] LogC $myNs [format "START: startZ=%-10ld, endZ=%-10ld, st=%-10s, fn=%s\n" $startZ $endZ $st $fn] # read BLOCK of data from "fn" set blkZ [expr {$endZ-$startZ}] set bk [expr {1024 * 1024 * 128}] set FH [open $fn rb] seek $FH $startZ start set pos $startZ set poL [list] set ovZ [expr {[string length $st] - 1}]; # overlap of blocks set step [expr {$bk + $ovZ}]; # per iteration while {true} { if {($pos + $step) > $endZ} { set step [expr {$endZ - $pos}] } set dt [read $FH $step]] #LogC $myNs [format "pos=%-10d, step=%-10d, todo=%-10d, readZ=%-10d\n" $pos $step [expr {$endZ-$pos}] [string length $dt]] foreach r [regexp -all -inline -indices "$st" $dt] { foreach {p w} $r break lappend poL [expr {$pos + $p}] } seek $FH -$ovZ current incr pos $bk if {$pos >= $endZ} break } close $FH # return start INDICES as I64 integer SendSTART $myNs foreach po $poL { SendI64 $myNs $po } SendRETURN $myNs } # define a service as link between the token "GREP" and the callback "GREP_Service" proc serverSetup { myNs } { ServiceCreate $myNs "GREP" GREP_Service } # factory startup (constructor) GrepWorker { $myNs {tmpl ""} } { MqContextC $myN $tmpl ConfigSetServerSetup $myNs serverSetup } } # "server" application ::myooX::ClassN ::GrepServer { SuperI ::MqContextC::export variable # callback used to receive the job data proc callback { myNs } { [SlaveGetMaster $myNs] variable RESULTS set vals [ReadLIST $myNs] lappend RESULTS {*}$vals LogC $myNs [format "END: num=%-10lu\n" [llength $vals]] } proc GREP { myNs } { variable $myNs RESULTS set jobs [ ReadI32 $myNs ] set str [ ReadSTR $myNs ] set fn [ ReadSTR $myNs ] set args [ ReadBFL $myNs ] LogC $myNs [format "START: jobs=%-4d, str=%-20s, fn=%s, args=%s\n" $jobs $str $fn [MqContextC::ToString $args]] # setup id's set ids [list] for {set i 0} {$i < $jobs} {incr i} { lappend ids [expr {$i+10}] } # setup job start position set strZ [string length $str] set fnZ [file size $fn] set blk [expr {$fnZ / $jobs}] set start 0 foreach id [lrange $ids 0 end-1] { set startB($id) $start incr start $blk set endB($id) [expr {$start + $strZ -1}]; # overlap blocks because of string maybe on the split } set id [lindex $ids end] set startB($id) $start set endB($id) $fnZ # [jobs_example] # setup worker foreach id $ids { set largs [MkBufferListC::Dup $args] SlaveWorker $myNs $id "GrepWorker" [MkBufferListC::AppendLA $largs --name "wk-cl-$id" @ --name "wk-sv-$id" ] } # call the "GREP" service on "Worker" using an ASYNCRONE service call foreach id $ids { [SlaveGet $myNs $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str } # wait for all jobs finished foreach id $ids { ProcessEvent $myNs ONCE } # [jobs_example] # send RESULT back to client SendSTART $myNs foreach rs $RESULTS { SendI64 $myNs $rs } SendRETURN $myNs #ToDo: Send $myNs "R" "W*" {*}$RESULTS # cleanup myooX::DestroyN $args } # define a service as link between the token "HLWO" and the callback "MyFirstService" proc serverSetup { myNs } { ServiceCreate $myNs "GREP" GREP } # factory startup (constructor) proc GrepServer { myNs {tmpl ""}} { MqContextC $myNs $tmpl ConfigSetServerSetup $myNs serverSetup variable $myNs RESULTS [list] } } # "client" application ::myooX::ClassN ::GrepClient { SuperI ::MqContextC # factory startup (constructor) proc GrepClient { myNs {tmpl ""} } { MqContextC $myNs $tmpl } } # create a context using the static atlmqmsgque CTOR method "Create" # only used to start the initial process MqMsgque::Main { set ctx [MqContextC::Create] try { set args [MkBufferListC::CreateLA {*}$argv] # [factory_example] # configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting MqFactoryC::Default [MqFactoryC::Add ::GrepClient] MqFactoryC::Add ::GrepServer MqFactoryC::Add ::GrepWorker # [factory_example] # choose context set ctx [MqFactoryC::New [MqFactoryC::GetCalledL $args]] if {[MqContextC::ConfigGetIsServer $ctx]} { # SERVER enter eventloop MqContextC::LinkCreate $ctx $args MqContextC::ProcessEvent $ctx MQ_WAIT_FOREVER } else { # CLIENT parse arguments set fn [MkBufferListC::CheckOptionSTR $args "--filename"] set jobs [MkBufferListC::CheckOptionI32 $args "--jobs" ] set str [MkBufferListC::CheckOptionSTR $args "--string" ] MqContextC::LogC $ctx [format "SETUP: jobs=%-4d, str=%-20s, fn=$fn\n" $jobs $str $fn] MqContextC::LinkCreate $ctx [MkBufferListC::AppendLA $args @ "GrepServer" ] set RESULTS [MkBufferListC::ToList [MkBufferListC::Sort [MqContextC::Send $ctx "W" "GREP:ICCL@*" $jobs $str $fn $args]]] PrintResults $RESULTS } } on error {} { MqContextC::ErrorCatch $ctx } finally { # delete the context using the atlmqmsgque APPLICATION-DTOR method "Exit" MqContextC::Exit $ctx } }