theLink 10.0 NHI1 - theKernel - theLink - theConfig - theSq3Lite - theCompiler - theBrain - theGuard
c - tcl - py - rb - jv - cc
Loading...
Searching...
No Matches
pymqmsgque Tutorial

short introduction into pymqmsgque programming

INTRODUCTION

C-API: MQ_C_API - theLink - a API to link multiple package-items together to act as one application …

PHILOSOPHY

The theLink project is an infrastructure that allows multiple package-items to be linked together to act as one application.
To link, you need to distribute the work from one package-item to another package-item and wait for an answer or not.

Philosophy
Write Once → Run Everywhere

The package-item can be a thread, a separate local process that is started by fork or spawn, or even a network of multiple services on multiple hosts.

The package-item can be written in any language that is supported by the Programming-Language-Micro-Kernel (PLMK) support.
It even supports running multiple programming languages in a single piece of software.
Supported Languages are: (C,C++,C#,VB.NET,Java,Python,Ruby,Perl,PHP,Tcl or GO)

Strategy
It takes 4 years to write a programming-language, but it only takes 4 weeks to insert a micro-kernel.

The package-item is connected to one another via a pipe or a socket and is based on packages which are sent from one package-item to another package-item and back.

Conclusion
theLink is used to manage a network of multiple package-items using an API that is available in all major programming-languages.

PROGRAMMING

theLink is responsible for:

  • establishing and managing the package-items, local or remote
  • establishing and managing the connections between package-items
  • establishing and managing the routing between package-items
  • sending and receiving the package data, calling up a service
  • reading and writing data from or into a data-package
  • setup and maintain event-handling and scheduling
  • interception, distribution and processing of error messages

The LibMsgque library is separted into three programming-layers:

  1. The foundation-layer, used by theLink library programmer
  2. The kernel-layer, used by the Programming-Language-Micro-Kernel programmer
  3. The implementation-layer, used by the target-language programmer
foundation-layer
The foundation-layer implement the libmqmsgque library and is also responsible for the quality-target of the entire project.
  • establishing and managing the package-items
  • establishing and managing the connections
  • memory-management and garbage-collection
  • error-handling
  • logging and debugging
  • written in plain C
kernel-layer
The kernel-layer implement the Programming-Language-Micro-Kernel and is also responsible to generate and maintain the target-language-API source-code.
  • implementation of the managed-object technology
  • implementation of the token-stream-compiler technology
  • written in plain C, TCL and the target-language-API
implementation-layer
The implementation-layer is the API used by the target-language-programmer.
Target
!! This documentation describe the implementation-layer and target the Python programmer. !!

SERVICE CALL

C-API: MqContextC_ServiceApi_C_API - MqContextC - create and manage a service …

To provide a service is the main purpose of a server and the main-purpose of a client/server connection is to call a service and to process the result.
A service can be defined on the server or on the client. On the server a service can be initial setup with IServerSetup method and finally cleanup with IServerCleanup.

‍A service is created with the MqServiceCreate and deleted with the MqServiceDelete.

A service can be created and deleted during the entire life-cycle of the server or the client. If the server/client-context is deleted all services of the are deleted also.

‍A MqServiceDelete is not required.

Creating or deleting a service is like granting or revoking the right to access a single feature.

eventloop

To receive a data-package on a service the event-loop have to be active. The event-loop ia always active on an synchronous-service-call . On a server the event-loop is started with MqProcessEvent at startup.
synchronous-service-call
A synchronous-service-call always block and enter the event-loop to wait for an answer
asynchronous-service-call
A asynchronous-service-call always return immediately and the possible result is received on a callback

callback

A callback is the function called by a service and required to receive data from remote.
A callback is created with MqServiceCreate or is part of the service-call.
A callback can be a service-identifer or a service-callback.
service-call with callback
SendEND_AND_CALLBACK, SendEND_AND_SUB, SendEND_AND_TRANSACTION
service-call without callback
SendEND, SendEND_AND_WAIT

Example from MyServer.py define the service SRV1 on the server-link-setup

import sys
from pymqmsgque import *

# package-item
class MyServer(MqContextC):

  # factory startup
  def __init__(self, tmpl=None):
    self.ConfigSetServerSetup(self.ServerSetup)
    super().__init__(tmpl)

  # service to serve all incoming requests for token "HLWO"
  def MyFirstService(self):
    self.SendSTART()
    self.SendSTR(self.ReadSTR() + " World")
    self.SendRETURN()

  # define a service as link between the token "HLWO" and the callback "MyFirstService"
  def ServerSetup(self):
    self.ServiceCreate("HLWO",self.MyFirstService)

# package-main
if __name__ == "__main__":

  # create the "MyServer" factory… and the object
  srv = MqFactoryC.Add(MyServer).New()

  try:
    srv.LinkCreate(sys.argv)
    srv.ProcessEvent(MqWaitOnEventE.FOREVER)
  except Exception as ex:
    srv.ErrorCatch(ex)
  finally:
    srv.Exit()

asynchronous-service-call

A asynchronous-service-call always return immediately and the possible result is received on a callback.

SendEND MqContextC - finish the send-data-block and call synchronous/asynchronous a remote-service
SendEND_AND_CALLBACK finish the send-data-block, call the remote service, do not-wait for the result but expect the result on a callback
SendEND_AND_TRANSACTION finish the send-data-block, call the remote service to do a longterm-transaction-call

synchronous-service-call

A synchronous-service-call always block and enter the event-loop to wait for an answer.

SendEND_AND_WAIT finish the send-data-block, call the remote service and wait for result…
SendEND_AND_SUB finish the send-data-block, call the remote service, do wait for the result and expect multiple sub-results on a callback

SENDING data

C-API: MqContextC_SendApi_C_API - MqContextC - construct an outgoing send-data-package

A data-package is send in two different scenarios:

  1. on a client to call a service on the server
  2. on a server to answer a service call from the client

Sending data is an active task and the opposite of reading data, which is a passive task. Active means that the send process is triggered by the software workflow or by the user.
For each basic type defined in MkBufferC, there is a send function and some help functions.

Send-Safe
Sending data is an atomic task and must not be interrupted between MqSendSTART and MqSendEND. The MqContextC HIGH API is uninterruptible by design.
Basic rule: process first and send all data last.

If timeout != 0 is used, the application enters the event loop and waits in the current process or thread for timeout seconds until the service call is finished.
While waiting for a result, the application can continue to work on other events that are in the same or in a different process or thread.

Recursion-Safe
If another-service-call arrives while waiting for a response from a previous-service-call, the previous-service-call will NOT end until processing of the other-service-call is complete.

Example-1: a service call, send and read a data-package

On a client: perform a service call

send the service-call MqSendSTARTSendTT... → MqSendEND_AND_WAIT
read the result packageReadTT... → ...

on a server: answer a service call

read the service-call ReadTT... → ...
send the result packageMqSendSTARTSendTT... → MqSendRETURN

Important in the code from above is the last command MqSendEND_AND_WAIT because this is just one of five possibilities:

command synchron database result
MqSendEND no no no
MqSendEND_AND_WAIT yes no single return data
MqSendEND_AND_SUB yes no multiple return data
MqSendEND_AND_CALLBACK no no single return data
MqSendEND_AND_TRANSACTION no yes two return data

To send a data-package is one task, to send it to the right receiver is an other one. The right receiver is identified using the token parameter argument. This parameter have to be a 4 character string. You'll probably ask "why 4?" the answer is that this string should be "human" readable and easy to "compare". As solution this string is mapped to a 4 byte integer used to find the proper key/value entry in the service-hash-table on the server. (in short: to search an integer is much faster as to search a string)

Example-2: (in C) At the client, calling the service and wait for an answer

...
enum MkErrorE MyServiceCall(MQ_CTX const ctx) {
// ... do some work
MqSendSTART_E(ctx); // init the Send-Buffer object
MqSendI32_E(ctx,int_value); // 1. argument: a MK_I32 value
MqSendSTR_E(ctx,"num:01"); // 2. argument: a MK_STRN value
MqSendBIN_E(ctx,mypicture,size); // 3. argument: a MK_BIN picture of size length
MqSendEND_AND_WAIT_E(ctx,"SRV1",60); // call service "SRV1" and wait max 60sec for the results
// ... get the results
MK_I32 retI = MqReadI32_e(ctx); // expect ONE integer as result
// ... do some work
return MK_OK;
error: // default error-handler
return MkErrorStack_1X(ctx); // on error, build up the error-stack
}
#define error
#define MkErrorStack_1X(...)
MkErrorE
MK_OK
signed int MK_I32
#define MqReadI32_e(...)
#define MqSendSTR_E(...)
#define MqSendI32_E(...)
#define MqSendBIN_E(...)
#define MqSendSTART_E(...)
#define MqSendEND_AND_WAIT_E(...)
PUBLIC data structure for the pymqmsgque-specific-data

... or using the MqContextC HIGH API

...
enum MkErrorE MyServiceCall(MQ_CTX const ctx) {
// ... do some work
MqSend_E(ctx, "W", "SRV1:ICB", int_value, "num:01", mypicture, size);
// ... get the results
MK_I32 retI = MqReadI32_e(ctx); // expect ONE integer as result
// ... do some work
return MK_OK;
error: // default error-handler
return MkErrorStack_1X(ctx); // on error, build up the error-stack
}
#define MqSend_E(...)

Example-3: (in C) At the server, answer the service call

...
static MkErrorE SRV1(MQ_CTX const ctx, MK_PTR data) { // the name "SRV1" can be free chosen
// ... do some work
MK_I32 myInt = MqReadI32_e(ctx); // read a MK_I32 value
MK_STRN myStr = MqReadSTR_e(ctx); // read a MK_STR value
MK_BUF myPic = MqReadBUF_e(ctx); // read a MkBuffer64S object to store the picture data
// ... do some work
MqSendSTART_E(ctx); // init the Send-Buffer object
MqSendI32_E(ctx,int_ret); // return argument: a MK_I32 value
error: // something is wrong, error back
return MqSendRETURN(ctx); // send the package as an answer of a previous service-call
}
MK_PTRB * MK_PTR
const MK_STRB * MK_STRN
#define MqReadBUF_e(...)
#define MqReadSTR_e(...)
#define MqSendRETURN(...)

... or using the MqContextC HIGH API

...
static MkErrorE SRV1(MQ_CTX const ctx, MK_PTR data) { // the name "SRV1" can be free chosen
// ... do some work
MK_I32 myInt = MqReadI32_e(ctx); // read a MK_I32 value
MK_STRN myStr = MqReadSTR_e(ctx); // read a MK_STR value
MK_BUF myPic = MqReadBUF_e(ctx); // read a MkBuffer64S object to store the picture data
// ... do some work
return MqSend(ctx, "R", "I", int_ret); // send the package as an answer of a previous service-call
}
#define MqSend(...)

READING data

C-API: MqContextC_ReadApi_C_API - MqContextC - extract data from an incoming read-data-package

A data-package is read in two different scenarios:

  • on a server to serve an incoming service-call from the client
  • on a client to process the return-data from a previous service-call

Reading data is a passive-task and the opposite of sending data, which is an active-task. Passive means that the reading process is triggered by an incoming-data-package and not by the software workflow or by the user.
There is a read function and some help functions for each basic type defined in MkBufferC .

Read-Safe
Each service-call has a private read-data-package. This means that during a service-call that is in progress, another service-call can be served without damaging the read-data-package of the current service-call.
Type-Safe
A data-item in a read-data-package is type safe, this mean that every read of a data-item have to match the data-type of the previous write. One exception is available, the cast from and to the string data-type (TYPE=C) is allowed.

Example from server.py read-safety: Make a nested service-call

  def CSV1(self):

    # read the input-data from the CSV1-service-call
    # client → server
    inI = self.ReadI32() + 1

    # call the CSV2-service at the client, wait 10sec for timeout
    # server → client → server
    retI = self.Send("Wt", 10, "CSV2:I@I", inI) + 1

    # answer the CSV1-service-call with the result from the CSV2-service-call
    # server → client
    self.Send("R", "I", retI)

STORAGE

internal storage

C-API: MqContextC_StorageApi_C_API - MqContextC - setup and manage a storage used to persist data-packages

The storage is divided into: INTERNAL and EXTERNAL storage. Only the read-data-package can be stored or dumped into the storage.

The read-data-package is saved into the storage using:

ServiceStorage setup a service listen on a MqContextC_ServiceApi_Identifer and save all read-data-package into the STORAGE
StorageExport export the read-data-package into the STORAGE

The read-data-package is restored from the storage using:

StorageImport import the storage-package into the read-data-package
ProxyForward send the entire read-data-package-data to the link-target
All this usually happen in an Event Handler

Some important facts of the storage-feature:

  • The internal-storage depends on SQLite.
  • The package-storage can be used to save all kind of package-data.
  • The longterm-transaction-package-data is allways saved into storage.
  • By default only an in-memory storage is in use, but this can be changed with MqStorageOpen or Storage
  • An entire service-call can be saved with MqServiceStorage

The following internal storages are supported:

default
The default-storage is set with the configuration parameter --storage fileName and defaults to "#memdb#". If a package have to be saved into the storage and the storage is not open the default-storage id used. The open will always be performed. If an explicit storage is required the default can be changed or a storage can explicit be opened with MqStorageOpen. Keep in mind that the default-storage is a per-context configuration but only one storage per process or thread is currently supported.
memdb
This is the default storage and can be set explicitly with MqStorageOpen using the "#memdb#" parameter.
tmpdb
This storage is like an in-memory-storage but export data to the TEMPORARY filesystem if the application run out of memory. This storage can explicit be set with MqStorageOpen with the parameter "#tmpdb#".
filedb
This storage always work on files. Only this storage is persistent and can explicit be set with MqStorageOpen with the storageFile parameter.

Performance analyse:

  • The performance is tested with: Nhi1Exec perfclient.c --all --storage VALUE @ perfserver.c.
  • The parameter VALUE is set to #memdb#, #tmpdb# or a filedb file.
  • If –storage is not set the filedb database is used together with an internal file-name.
database performance host crash application crash info
memdb 30.000 data lost data lost non persistent
tempdb < 30.000 data lost data lost uses memory and/or temporary file
filedb (mem) 10.000 data lost data safe in memory filesystem
filedb (disc) 50 data safe data safe disc-speed is the key factor

external storage

C-API: MqDumpC_C_API - MqDumpC - the class known as dmp or dump is used to export a pymqmsgque data package as binary

The dump is used to exchange data with external counterparts such as a database or a user-specific infrastructure.

There are 3 different function that deal with a dump:

  1. The MqDumpExport export the read-data-package from the calling context to a dump.
  2. The MqDumpImport imports the dump into the read-data-package of the calling context. All MqContextC READ API functions can be used to read the data from the read-data-package.
  3. The MqProxyForward imports the dump into the send-data-package of the calling context. All MqContextC SEND API functions can be used to write the data into the send-data-package
See also
MqContextC PROXY API

LONG TERM TRANSACTION

C-API: MqSendEND_AND_TRANSACTION_RT - finish the send-data-block, call the remote service to do a longterm-transaction-call

A longterm-transaction is a service-call with guaranteed delivery. guaranteed mean that the transaction is using the MqContextC STORAGE API to keep every step persistent being able to recover a step if something unexpected happen.

To link the result with the calling-environment a private transaction-item is used to save the calling-environment to the local-storage. If the local-storage is persistent, the calling-environment will even survive an application restart.
If the result from the public service-call arrives, the transaction-item will be extracted from the result and the private calling-environment will be initialized from the local-storage.

To create a persistent-transaction the MqContextC STORAGE API have to be setup as persistent. By default, an in-memory MqContextC STORAGE API is used.

The longterm-transaction-call has TWO results…

  • the FIRST result is a acknowledge that the longterm-transaction was stored in the remote database
  • the SECOND result is the result of the service-call

In difference to MqSendEND_AND_WAIT and MqSendEND_AND_CALLBACK a longterm-transaction-call have to survive an application restart. To achieve this goal… two features have to be available to process the results:

transaction-item

The transaction-item is the entry-id from the local internal database and is the public handle of the private data. If the transaction-item-private-data should be persistent (survive an application restart) the internal database have to be persistent using the Storage option.
The transaction-item-private-data requires a MqSendT_STARTMqSendT_END at the beginning of the send-data-package.
The list of data-items in the transaction-item-private-data have to be provided by the programmer and is used to initialise the environment in the callback (for example an external database-entry-id).
In the transaction-callback the transaction-item-private-data have to be extracted with MqReadT_STARTMqReadT_END at the beginning of the read-data-package.

transaction-callback

The transaction-callback have to be a MqContextC_ServiceApi_Identifer defined with MqServiceCreate in the application setup code (like IServerSetup) and have to be available after an application restart.

If an error is raised on the server during the database-insert the function will return this error immediately. During waiting for the return the event-loop is used to process other events. Use IEvent to add your tasks into the event loop.

Parameters
[in]mkrtthe MkRuntimeS instance to work on - the runtime argument, used by MK_RT_CALL (C-only)
[in]ctxthe MqContextS instance to work on
[in]tokenthe MqContextC SERVICE API to identify the service
[in]callbackthe MqContextC_ServiceApi_Identifer of the MqContextC_ServiceApi_Callback
[in]timeoutin seconds until a timeout-error is raised (possible values like ProcessEvent) (MK_TIMEOUT_DEFAULT=MK_TIMEOUT_USER)
Exceptions
MkExceptionC→ The default-exception from the Programming-Language-Micro-Kernel (PLMK)

Example from MyTransaction.py make a logterm-transaction-call using the LOW and the HIGH api

import sys
from pymkkernel import *
from pymqmsgque import *

def callback(ctx):
  ctx.ReadT_START()
  myPrivateHandle = ctx.ReadSTR()
  ctx.ReadT_END()
  myServiceResult = ctx.ReadI32()

  print(f"myPrivateHandle={myPrivateHandle}, myServiceResult={myServiceResult}")

ctx = MqContextC()
try:
  ctx.ConfigSetName("MyTransaction")

  # setup commandline arguments used for parsing
  args = MkBufferListC.CreateLA(sys.argv)

  # check if the '--token' option is available, default "SRVC"
  token = args.CheckOptionSTR("--token", "SRVC")

  # connect to the server
  ctx.LinkCreate(args)

  # register callback
  ctx.ServiceCreate("CLB1", callback)
   
  # send block using the LOW-Api
  ctx.SendSTART()
  ctx.SendT_START()
  ctx.SendSTR("Privat_Data_1")
  ctx.SendT_END()
  ctx.SendI32(11111)
  ctx.SendEND_AND_TRANSACTION(token, "CLB1")
   
  # send block using the HIGN-Api -> same as above, but shorter
  ctx.Send("T", "CLB1", f"{token}:(C)I", "Privat_Data_2", 22222)
   
  # now we wait for exact ONE result of the $ctx
  ctx.ProcessEvent(MqWaitOnEventE.OWN)
except Exception as ex:
  ctx.ErrorCatch(ex);
finally:
  ctx.Exit()

SERVER

A pymqmsgque server requires the following setup:

  1. file: example/py/MyServer.py
  2. an instance of the abstract class ContextCreate
  3. the interface MqFactoryC to create a new application instance
  4. the interface IServerSetup and or IServerCleanup

Example from MyServer.py The minimal server looks like:

import sys
from pymqmsgque import *

# package-item
class MyServer(MqContextC):

  # factory startup
  def __init__(self, tmpl=None):
    self.ConfigSetServerSetup(self.ServerSetup)
    super().__init__(tmpl)

  # service to serve all incoming requests for token "HLWO"
  def MyFirstService(self):
    self.SendSTART()
    self.SendSTR(self.ReadSTR() + " World")
    self.SendRETURN()

  # define a service as link between the token "HLWO" and the callback "MyFirstService"
  def ServerSetup(self):
    self.ServiceCreate("HLWO",self.MyFirstService)

# package-main
if __name__ == "__main__":

  # create the "MyServer" factory… and the object
  srv = MqFactoryC.Add(MyServer).New()

  try:
    srv.LinkCreate(sys.argv)
    srv.ProcessEvent(MqWaitOnEventE.FOREVER)
  except Exception as ex:
    srv.ErrorCatch(ex)
  finally:
    srv.Exit()

The server is started as network visible TCP server listen on PORT 2345 using a THREAD for every new connection request:

  • python MyServer.py –tcp –port 2345 –thread

If you are using UNIX and if you want to setup a high-performance local server then use the build-in UDS (Unix-Domain-Sockets) capability to listen on the FILE /path/to/any/file.uds instead on a network port:

  • python MyServer.py –uds –file /path/to/any/file.uds –thread

Three things are important:

  1. the send style of functions
  2. the ctx.ServiceCreate(token:string[4], callback:callable) function
  3. a connected context of type [constructor,static] MqContextC MqContextC.Create(?tmpl:MqContextC=None?)

Sending data is done using a minimum of 2 steps:

  1. First: start a data package with ctx.SendSTART()
  2. Last: submit the a data package to the link target using one of:

The first three ctx.SendEND(token:string[4], ?timeout:MkTimeoutE|int32=DEFAULT?)... functions are used to call a remote service and the last one is used to answer an incoming service call. In-between ctx.SendSTART() and ctx.SendEND(token:string[4], ?timeout:MkTimeoutE|int32=DEFAULT?) ... other MqContextC SEND API style commands are available to fill the data package with data.

Services are created with the ctx.ServiceCreate(token:string[4], callback:callable) function. The first parameter is a 4 byte Token as public name. 4 byte is required because this string is mapped to a 4 byte integer for speed reason. The second parameter is an object providing the MqContextC_ServiceApi_Callback interface.

The MqContextC_ServiceApi_Callback is called for every incoming service-request which belongs to token.

CLIENT

A pymqmsgque client requires the following setup:

Example from MyClient.py The minimal client looks like:

import sys
from pymqmsgque import *

ctx = MqContextC()
try:
  ctx.ConfigSetName("MyClient")
  ctx.LinkCreate(sys.argv)
  ctx.SendSTART()
  ctx.SendSTR("Hello")
  ctx.SendEND_AND_WAIT("HLWO")
  print(ctx.ReadSTR(), file=sys.stdout)
except Exception as ex:
  ctx.ErrorCatch(ex);
finally:
  ctx.Exit()

Example: To call a network visible TCP server listen on PORT 2345 use:

  • python MyClient.py –tcp –port 2345
  • Hello World

Example: To call a network visible UDP server listen on FILE /path/to/any/file.uds use:

  • python MyClient.py –uds –file /path/to/any/file.uds
  • Hello World

Example: To call a local server started by the client using PIPE communication use:

  • python MyClient.py @ python MyServer.py
  • Hello World

FILTER

The FILTER MODE is used to define a command pipeline.

Example from manfilter.py A minimal filter looks like:

import sys
from pymqmsgque import *

class ManFilter(MqContextC):
  def __init__(self, tmpl=None):
    MqContextC.__init__(self)
  def FTRcmd(ctx):
    ftr = ctx.SlaveGetFilter()
    ftr.SendSTART()
    while ctx.ReadItemExists():
      ftr.SendSTR("<" + ctx.ReadSTR() + ">")
    ftr.SendEND_AND_WAIT("+FTR")
    ctx.SendRETURN()

if __name__ == "__main__":
  srv = ManFilter()
  try:
    srv.ConfigSetIsServer(True)
    srv.LinkCreate(sys.argv)
    srv.ServiceCreate("+FTR", srv.FTRcmd)
    srv.ServiceProxy("+EOF")
    srv.ProcessEvent(MqWaitOnEventE.FOREVER)
  except Exception as ex:
    srv.ErrorCatch(ex);
  finally:
    srv.Exit()

Use manfilter.py in a LibMqMsgque command pipeline:

  • echo -e "1:2:3\na:b:c" | atool split -d : @ python manfilter.py @ atool join -d :