Loading...
Searching...
No Matches
Example: MpGrep

mpgrep is a massive parallel grep tool to search for strings in large blobs

INTRODUCTION

The aim of the tool is to search for strings , this sounds simple but become problematic if large blobs are involved.

  • This is typical of spy or data recovery companies.

This tool divides the blobs into blocks and starts a separate process or thread for each block.

  • Best results are reached if the # of workers is equal to the # of real (not hyperthread) processors.

A couple of features from tclmqmsgque are used to archiev this aim:

FACTORIES

Add client, server and worker code into a single executable.

Example from mpgrep.tcl using multiple application-entry-points in a single executable

    # configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting
    [MqFactoryC Add "GrepClient"] Default
    MqFactoryC Add "GrepServer"
    MqFactoryC Add "GrepWorker"

SERVER

Useable as client or as server, local or remote.

client & local
The client start a server with the @ pipe and the server start the workers
  • mpgrep --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --thread
server & remote
The server is started with --tcp or --file and waits for a client to establish a connection.
  • mpgrep GrepServer --tcp --port 2345 --thread
The client establishes a connection to the server and transfers the working-parameters.
  • mpgrep --tcp --port 2345 --filename ../../data/test.data.new --jobs 4 --string hello_world

JOBS

Distribute work in parallel.

Example from mpgrep.tcl using an asynchronous-service-call to distribute jobs to multiple workers

    # setup worker
    foreach id $ids {
      set largs [$args Dup]
      my SlaveWorker $id "GrepWorker" [$largs AppendLA --name "wk-cl-$id" @ --name "wk-sv-$id" ]
    }

    # call the "GREP" service on "Worker" using an ASYNCRONE service call 
    foreach id $ids {
      [my SlaveGet $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str
    }

    # wait for all jobs finished
    foreach id $ids {
      my ProcessEvent ONCE
    }


CODE client & server & worker

#!/usr/bin/env tclsh
#+
#:   @file         NHI1/example/tcl/mpgrep.tcl
#:   @brief        mpgrep.tcl - 26 Jun 2024 - aotto1968
#:   @copyright    (C) NHI - #1 - Project - Group
#:                 This software has NO permissions to copy,
#:                 please contact AUTHOR for additional information
#:   @version      08a242faddae5101924d8fe811888e78892a82d9
#:   @date         Wed Jun 26 14:26:21 2024 +0200
#:   @author       aotto1968 <aotto1968@t-online.de>
#:

# example: 
# > time Nhi1Exec grep.tcl --debug 0 --filename ../../data/test.data.new --jobs 4 --string hello_world --spawn

package require tclmsgque::MqMsgque
namespace import tclmsgque::MqMsgque::*
namespace import tclmsgque::MkKernel::*

proc PrintResults {lst} {
  puts "POSITIONS --- ( num=[llength $lst] ) -----------------"
  set idx 1
  foreach p $lst {
    puts -nonewline [format {%-10s, } $p]
    if {($idx % 8) == 0} { puts "" }
    incr idx
  }
  puts ""
  puts "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
}

# "worker" application
::oo::class create GrepWorker {
  superclass MqContextC

  # service to serve all incoming requests for token "GREP"
  method GREP_Service {} {
    # get job configuration
    set  fn      [ my ReadSTR ]
    set  startZ  [ my ReadI64 ]
    set  endZ    [ my ReadI64 ]
    set  st      [ my ReadSTR ]

    my LogC [format "START: startZ=%-10ld, endZ=%-10ld, st=%-10s, fn=%s\n" $startZ $endZ $st $fn]

    # read BLOCK of data from "fn" 
    set blkZ  [expr {$endZ-$startZ}]
    set bk    [expr {1024 * 1024 * 128}]

    set FH    [open $fn rb]
    seek $FH  $startZ start
    set pos   $startZ
    set poL   [list]
    set ovZ   [expr {[string length $st] - 1}];   # overlap of blocks
    set step  [expr {$bk + $ovZ}];                # per iteration

    while {true} {
      if {($pos + $step) > $endZ} { set step  [expr {$endZ - $pos}] }
      set dt  [read $FH $step]]
#my LogC [format "pos=%-10d, step=%-10d, todo=%-10d, readZ=%-10d\n" $pos $step [expr {$endZ-$pos}] [string length $dt]]
      foreach r [regexp -all -inline -indices "$st" $dt] {
        foreach {p w} $r break
        lappend poL [expr {$pos + $p}]
      }
      seek $FH -$ovZ current
      incr pos $bk
      if {$pos >= $endZ} break
    }
    close $FH

    # return start INDICES as I64 integer
    my SendSTART
    foreach po $poL {
      my SendI64  $po
    }
    my SendRETURN
  }

  # define a service as link between the token "GREP" and the callback "GREP_Service"
  method serverSetup {} {
    my ServiceCreate "GREP" GREP_Service
  }

  # factory startup (constructor)
  constructor {{tmpl ""}} {
    next $tmpl
    my ConfigSetServerSetup serverSetup
  }
} 

# "server" application
::oo::class create GrepServer {
  superclass MqContextC
  export variable

  # callback used to receive the job data
  method callback {} {
    [my SlaveGetMaster] variable RESULTS
    set vals  [my ReadLIST]
    lappend RESULTS {*}$vals
    my LogC [format "END: num=%-10lu\n" [llength $vals]]
  }

  method GREP {} {
    my variable RESULTS
    set jobs [ my ReadI32 ]
    set str  [ my ReadSTR ]
    set fn   [ my ReadSTR ]
    set args [ my ReadBFL ]
    my LogC [format "START: jobs=%-4d, str=%-20s, fn=%s, args=%s\n" $jobs $str $fn [$args ToString]]

    # setup id's
    set ids [list]
    for {set i 0} {$i < $jobs} {incr i} {
      lappend ids [expr {$i+10}]
    }

    # setup job start position
    set strZ  [string length $str]
    set fnZ   [file size $fn]
    set blk   [expr {$fnZ / $jobs}]
    set start 0
    foreach id [lrange $ids 0 end-1] {
      set startB($id) $start
      incr start      $blk
      set endB($id)   [expr {$start + $strZ -1}];   # overlap blocks because of string maybe on the split
    }
    set id  [lindex $ids end]
    set startB($id) $start
    set endB($id)   $fnZ

    # [jobs_example]
    # setup worker
    foreach id $ids {
      set largs [$args Dup]
      my SlaveWorker $id "GrepWorker" [$largs AppendLA --name "wk-cl-$id" @ --name "wk-sv-$id" ]
    }

    # call the "GREP" service on "Worker" using an ASYNCRONE service call 
    foreach id $ids {
      [my SlaveGet $id] Send "C" callback "GREP:CWWC" $fn $startB($id) $endB($id) $str
    }

    # wait for all jobs finished
    foreach id $ids {
      my ProcessEvent ONCE
    }
    # [jobs_example]

    # send RESULT back to client
    my SendSTART
    foreach rs $RESULTS {
      my SendI64 $rs
    }
    my SendRETURN
    #ToDo: my Send "R" "W*" {*}$RESULTS

    # cleanup
    $args Delete
  }

  # define a service as link between the token "HLWO" and the callback "MyFirstService"
  method serverSetup {} {
    my ServiceCreate "GREP" GREP
  }

  # factory startup (constructor)
  constructor {{tmpl ""}} {
    next $tmpl
    my ConfigSetServerSetup serverSetup
    my variable RESULTS [list]
  }
} 

# "client" application
::oo::class create GrepClient {
  superclass MqContextC

  # factory startup (constructor)
  constructor {{tmpl ""}} {
    next $tmpl
  }
} 

# create a context using the static tclmsgque CTOR method "Create"
# only used to start the initial process
tclmsgque::MqMsgque Main {
  set ctx [MqContextC Create]
  try {
    set args  [MkBufferListC CreateLA {*}$argv]

    # [factory_example]
    # configure the APPLICATION-ENTRY-POINTS as FACTORY and set "GrepClient" as the default setting
    [MqFactoryC Add "GrepClient"] Default
    MqFactoryC Add "GrepServer"
    MqFactoryC Add "GrepWorker"
    # [factory_example]

    # choose context
    set ctx [[MqFactoryC GetCalledL $args] New]

    if {[$ctx ConfigGetIsServer]} {
      # SERVER enter eventloop 
      $ctx LinkCreate $args
      $ctx ProcessEvent MQ_WAIT_FOREVER
    } else {
      # CLIENT parse arguments
      set fn    [$args CheckOptionSTR "--filename"]
      set jobs  [$args CheckOptionI32 "--jobs" ]
      set str   [$args CheckOptionSTR "--string" ]
      $ctx LogC [format "SETUP: jobs=%-4d, str=%-20s, fn=$fn\n" $jobs $str $fn]

      $ctx LinkCreate [$args AppendLA @ "GrepServer" ]
      set RESULTS [ [ [$ctx Send "W" "GREP:ICCL@*" $jobs $str $fn $args] Sort ] ToList]

      PrintResults $RESULTS
    }
    
  } on error {} {
    $ctx ErrorCatch
  } finally {
    # delete the context using the tclmsgque APPLICATION-DTOR method "Exit"
    $ctx Exit
  }
}