theLink 10.0
Loading...
Searching...
No Matches
Example: MpGrep

mpgrep is a massive parallel grep tool to search for strings in large blobs

INTRODUCTION

The aim of the tool is to search for strings , this sounds simple but become problematic if large blobs are involved.

  • This is typical of spy or data recovery companies.

This tool divides the blobs into blocks and starts a separate process or thread for each block.

  • Best results are reached if the # of workers is equal to the # of real (not hyperthread) processors.

A couple of features from atlmqmsgque are used to archiev this aim:

FACTORIES

Add client, server and worker code into a single executable.

Example from mpgrep.atl using multiple application-entry-points in a single executable

    # configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting
    MqFactoryC::Default [MqFactoryC::Add ::GrepClient]
    MqFactoryC::Add ::GrepServer
    MqFactoryC::Add ::GrepWorker

SERVER

Useable as client or as server, local or remote.

client & local
The client start a server with the @ pipe and the server start the workers
  • mpgrep --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread
server & remote
The server is started with --tcp or --file and waits for a client to establish a connection.
  • mpgrep GrepServer --tcp --port 2345 --thread
The client establishes a connection to the server and transfers the working-parameters.
  • mpgrep --tcp --port 2345 --filename ../../data/test.data.new --jobs 4 --string hello_world

JOBS

Distribute work in parallel.

Example from mpgrep.atl using an asynchronous-service-call to distribute jobs to multiple workers

    # setup worker
    foreach id $ids {
      set largs [MkBufferListC::Dup $args]
      SlaveWorker $myNs $id "GrepWorker" [MkBufferListC::AppendLA $largs --name "wk-cl-$id" @ --name "wk-sv-$id" ]
    }

    # call the "GREP" service on "Worker" using an ASYNCRONE service call 
    foreach id $ids {
      [SlaveGet $myNs $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str
    }

    # wait for all jobs finished
    foreach id $ids {
      ProcessEvent $myNs ONCE
    }


CODE client & server & worker

#!/usr/bin/env atlsh
#+
#:   @file         NHI1/example/atl/mpgrep.atl
#:   @brief        tag: nhi1-release-250425
#:   @copyright    (C) NHI - #1 - Project - Group
#:                 This software has NO permission to copy,
#:                 please contact AUTHOR for additional information
#:

# example: 
# > time Nhi1Exec grep.tcl --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --spawn

package require lib_85
package require atlmqmsgque

proc PrintResults {lst} {
  puts "POSITIONS --- ( num=[llength $lst] ) -----------------"
  set idx 1
  foreach p $lst {
    puts -nonewline [format {%-10s, } $p]
    if {($idx % 8) == 0} { puts "" }
    incr idx
  }
  puts ""
  puts "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
}

# "worker" application
::myooX::ClassN ::GrepWorker {
  SuperI ::MqContextC

  # service to serve all incoming requests for token "GREP"
  proc GREP_Service { myNs } {
    # get job configuration
    set  fn      [ ReadSTR $myNs ]
    set  startZ  [ ReadI64 $myNs ]
    set  endZ    [ ReadI64 $myNs ]
    set  st      [ ReadSTR $myNs ]

    LogC $myNs [format "START: startZ=%-10ld, endZ=%-10ld, st=%-10s, fn=%s\n" $startZ $endZ $st $fn]

    # read BLOCK of data from "fn" 
    set blkZ  [expr {$endZ-$startZ}]
    set bk    [expr {1024 * 1024 * 128}]

    set FH    [open $fn rb]
    seek $FH  $startZ start
    set pos   $startZ
    set poL   [list]
    set ovZ   [expr {[string length $st] - 1}];   # overlap of blocks
    set step  [expr {$bk + $ovZ}];                # per iteration

    while {true} {
      if {($pos + $step) > $endZ} { set step  [expr {$endZ - $pos}] }
      set dt  [read $FH $step]]
#LogC $myNs [format "pos=%-10d, step=%-10d, todo=%-10d, readZ=%-10d\n" $pos $step [expr {$endZ-$pos}] [string length $dt]]
      foreach r [regexp -all -inline -indices "$st" $dt] {
        foreach {p w} $r break
        lappend poL [expr {$pos + $p}]
      }
      seek $FH -$ovZ current
      incr pos $bk
      if {$pos >= $endZ} break
    }
    close $FH

    # return start INDICES as I64 integer
    SendSTART $myNs
    foreach po $poL {
      SendI64 $myNs  $po
    }
    SendRETURN $myNs
  }

  # define a service as link between the token "GREP" and the callback "GREP_Service"
  proc serverSetup { myNs } {
    ServiceCreate $myNs "GREP" GREP_Service
  }

  # factory startup (constructor)
  GrepWorker { $myNs {tmpl ""} } {
    MqContextC $myN $tmpl
    ConfigSetServerSetup $myNs serverSetup
  }
} 

# "server" application
::myooX::ClassN ::GrepServer {
  SuperI ::MqContextC::export variable

  # callback used to receive the job data
  proc callback { myNs } {
    [SlaveGetMaster $myNs] variable RESULTS
    set vals  [ReadLIST $myNs]
    lappend RESULTS {*}$vals
    LogC $myNs [format "END: num=%-10lu\n" [llength $vals]]
  }

  proc GREP { myNs } {
    variable $myNs RESULTS
    set jobs [ ReadI32 $myNs ]
    set str  [ ReadSTR $myNs ]
    set fn   [ ReadSTR $myNs ]
    set args [ ReadBFL $myNs ]
    LogC $myNs [format "START: jobs=%-4d, str=%-20s, fn=%s, args=%s\n" $jobs $str $fn [MqContextC::ToString $args]]

    # setup id's
    set ids [list]
    for {set i 0} {$i < $jobs} {incr i} {
      lappend ids [expr {$i+10}]
    }

    # setup job start position
    set strZ  [string length $str]
    set fnZ   [file size $fn]
    set blk   [expr {$fnZ / $jobs}]
    set start 0
    foreach id [lrange $ids 0 end-1] {
      set startB($id) $start
      incr start      $blk
      set endB($id)   [expr {$start + $strZ -1}];   # overlap blocks because of string maybe on the split
    }
    set id  [lindex $ids end]
    set startB($id) $start
    set endB($id)   $fnZ

    # [jobs_example]
    # setup worker
    foreach id $ids {
      set largs [MkBufferListC::Dup $args]
      SlaveWorker $myNs $id "GrepWorker" [MkBufferListC::AppendLA $largs --name "wk-cl-$id" @ --name "wk-sv-$id" ]
    }

    # call the "GREP" service on "Worker" using an ASYNCRONE service call 
    foreach id $ids {
      [SlaveGet $myNs $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str
    }

    # wait for all jobs finished
    foreach id $ids {
      ProcessEvent $myNs ONCE
    }
    # [jobs_example]

    # send RESULT back to client
    SendSTART $myNs
    foreach rs $RESULTS {
      SendI64 $myNs $rs
    }
    SendRETURN $myNs
    #ToDo: Send $myNs "R" "W*" {*}$RESULTS

    # cleanup
    myooX::DestroyN $args
  }

  # define a service as link between the token "HLWO" and the callback "MyFirstService"
  proc serverSetup { myNs } {
    ServiceCreate $myNs "GREP" GREP
  }

  # factory startup (constructor)
  proc GrepServer { myNs {tmpl ""}} {
    MqContextC $myNs $tmpl
    ConfigSetServerSetup $myNs serverSetup
    variable $myNs RESULTS [list]
  }
} 

# "client" application
::myooX::ClassN ::GrepClient {
  SuperI ::MqContextC

  # factory startup (constructor)
  proc GrepClient { myNs {tmpl ""} } {
    MqContextC $myNs $tmpl
  }
} 

# create a context using the static atlmqmsgque CTOR method "Create"
# only used to start the initial process
MqMsgque::Main {
  set ctx [MqContextC::Create]
  try {
    set args  [MkBufferListC::CreateLA {*}$argv]

    # [factory_example]
    # configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting
    MqFactoryC::Default [MqFactoryC::Add ::GrepClient]
    MqFactoryC::Add ::GrepServer
    MqFactoryC::Add ::GrepWorker
    # [factory_example]

    # choose context
    set ctx [MqFactoryC::New [MqFactoryC::GetCalledL $args]]

    if {[MqContextC::ConfigGetIsServer $ctx]} {
      # SERVER enter eventloop 
      MqContextC::LinkCreate $ctx $args
      MqContextC::ProcessEvent $ctx MQ_WAIT_FOREVER
    } else {
      # CLIENT parse arguments
      set fn    [MkBufferListC::CheckOptionSTR $args "--filename"]
      set jobs  [MkBufferListC::CheckOptionI32 $args "--jobs" ]
      set str   [MkBufferListC::CheckOptionSTR $args "--string" ]
      MqContextC::LogC $ctx [format "SETUP: jobs=%-4d, str=%-20s, fn=$fn\n" $jobs $str $fn]

      MqContextC::LinkCreate $ctx [MkBufferListC::AppendLA $args @ "GrepServer" ]
      set RESULTS [MkBufferListC::ToList [MkBufferListC::Sort [MqContextC::Send $ctx "W" "GREP:ICCL@*" $jobs $str $fn $args]]]

      PrintResults $RESULTS
    }
    
  } on error {} {
    MqContextC::ErrorCatch $ctx
  } finally {
    # delete the context using the atlmqmsgque APPLICATION-DTOR method "Exit"
    MqContextC::Exit $ctx
  }
}