short introduction into pymqmsgque programming
C-API: MQ_C_API - theLink - a API to link multiple package-items together to act as one application …
The theLink project is an infrastructure that allows multiple package-items to be linked together to act as one application.
To link, you need to distribute the work from one package-item to another package-item and wait for an answer or not.
The package-item can be a thread, a separate local process that is started by fork or spawn, or even a network of multiple services on multiple hosts.
The package-item can be written in any language that is supported by the Programming-Language-Micro-Kernel (PLMK) support.
It even supports running multiple programming languages in a single piece of software.
Supported Languages are: (C,C++,C#,VB.NET,Java,Python,Ruby,Perl,PHP,Tcl or GO)
The package-item is connected to one another via a pipe or a socket and is based on packages which are sent from one package-item to another package-item and back.
theLink is responsible for:
The LibMsgque library is separted into three programming-layers:
C-API: MqContextC_ServiceApi_C_API - MqContextC - create and manage a service …
To provide a service is the main purpose of a server and the main-purpose of a client/server connection is to call a service and to process the result.
A service can be defined on the server or on the client. On the server a service can be initial setup with IServerSetup method and finally cleanup with IServerCleanup.
A service is created with the MqServiceCreate and deleted with the MqServiceDelete.
A service can be created and deleted during the entire life-cycle of the server or the client. If the server/client-context is deleted all services of the are deleted also.
A MqServiceDelete is not required.
Creating or deleting a service is like granting or revoking the right to access a single feature.
eventloop
callback
Example from MyServer.py
→ define the service SRV1 on the server-link-setup
import sys from pymqmsgque import * # package-item class MyServer(MqContextC): # factory startup def __init__(self, tmpl=None): self.ConfigSetServerSetup(self.ServerSetup) super().__init__(tmpl) # service to serve all incoming requests for token "HLWO" def MyFirstService(self): self.SendSTART() self.SendSTR(self.ReadSTR() + " World") self.SendRETURN() # define a service as link between the token "HLWO" and the callback "MyFirstService" def ServerSetup(self): self.ServiceCreate("HLWO",self.MyFirstService) # package-main if __name__ == "__main__": # create the "MyServer" factory… and the object srv = MqFactoryC.Add(MyServer).New() try: srv.LinkCreate(sys.argv) srv.ProcessEvent(MqWaitOnEventE.FOREVER) except Exception as ex: srv.ErrorCatch(ex) finally: srv.Exit()
A asynchronous-service-call always return immediately and the possible result is received on a callback.
SendEND | MqContextC - finish the send-data-block and call synchronous/asynchronous a remote-service … |
SendEND_AND_CALLBACK | finish the send-data-block, call the remote service, do not-wait for the result but expect the result on a callback … |
SendEND_AND_TRANSACTION | finish the send-data-block, call the remote service to do a longterm-transaction-call … |
A synchronous-service-call always block and enter the event-loop to wait for an answer.
SendEND_AND_WAIT | finish the send-data-block, call the remote service and wait for result… |
SendEND_AND_SUB | finish the send-data-block, call the remote service, do wait for the result and expect multiple sub-results on a callback … |
C-API: MqContextC_SendApi_C_API - MqContextC - construct an outgoing send-data-package …
A data-package is send in two different scenarios:
Sending data is an active task and the opposite of reading data, which is a passive task. Active means that the send process is triggered by the software workflow or by the user.
For each basic type defined in MkBufferC, there is a send function and some help functions.
If timeout != 0 is used, the application enters the event loop and waits in the current process or thread for timeout seconds until the service call is finished.
While waiting for a result, the application can continue to work on other events that are in the same or in a different process or thread.
Example-1: a service call, send and read a data-package
On a client: perform a service call
send the service-call | MqSendSTART → SendTT... → MqSendEND_AND_WAIT |
read the result package | ReadTT... → ... |
on a server: answer a service call
read the service-call | ReadTT... → ... |
send the result package | MqSendSTART → SendTT... → MqSendRETURN |
Important in the code from above is the last command MqSendEND_AND_WAIT because this is just one of five possibilities:
command | synchron | database | result |
---|---|---|---|
MqSendEND | no | no | no |
MqSendEND_AND_WAIT | yes | no | single return data |
MqSendEND_AND_SUB | yes | no | multiple return data |
MqSendEND_AND_CALLBACK | no | no | single return data |
MqSendEND_AND_TRANSACTION | no | yes | two return data |
To send a data-package is one task, to send it to the right receiver is an other one. The right receiver is identified using the token parameter argument. This parameter have to be a 4 character string. You'll probably ask "why 4?" the answer is that this string should be "human" readable and easy to "compare". As solution this string is mapped to a 4 byte integer used to find the proper key/value entry in the service-hash-table on the server. (in short: to search an integer is much faster as to search a string)
Example-2: (in C) At the client, calling the service and wait for an answer
... or using the MqContextC HIGH API
Example-3: (in C) At the server, answer the service call
... or using the MqContextC HIGH API
C-API: MqContextC_ReadApi_C_API - MqContextC - extract data from an incoming read-data-package …
A data-package is read in two different scenarios:
Reading data is a passive-task and the opposite of sending data, which is an active-task. Passive means that the reading process is triggered by an incoming-data-package and not by the software workflow or by the user.
There is a read function and some help functions for each basic type defined in MkBufferC .
Example from server.py
→ read-safety: Make a nested service-call
def CSV1(self): # read the input-data from the CSV1-service-call # client → server inI = self.ReadI32() + 1 # call the CSV2-service at the client, wait 10sec for timeout # server → client → server retI = self.Send("Wt", 10, "CSV2:I@I", inI) + 1 # answer the CSV1-service-call with the result from the CSV2-service-call # server → client self.Send("R", "I", retI)
C-API: MqContextC_StorageApi_C_API - MqContextC - setup and manage a storage used to persist data-packages …
The storage is divided into: INTERNAL and EXTERNAL storage. Only the read-data-package can be stored or dumped into the storage.
The read-data-package is saved into the storage using:
ServiceStorage | setup a service listen on a MqContextC_ServiceApi_Identifer and save all read-data-package into the STORAGE … |
StorageExport | export the read-data-package into the STORAGE … |
The read-data-package is restored from the storage using:
StorageImport | import the storage-package into the read-data-package … |
ProxyForward | send the entire read-data-package-data to the link-target … |
All this usually happen in an Event Handler |
Some important facts of the storage-feature:
The following internal storages are supported:
Performance analyse:
Nhi1Exec perfclient.c --all --storage VALUE @ perfserver.c
.database | performance | host crash | application crash | info |
---|---|---|---|---|
memdb | 30.000 | data lost | data lost | non persistent |
tempdb | < 30.000 | data lost | data lost | uses memory and/or temporary file |
filedb (mem) | 10.000 | data lost | data safe | in memory filesystem |
filedb (disc) | 50 | data safe | data safe | disc-speed is the key factor |
C-API: MqDumpC_C_API - MqDumpC - the class known as dmp or dump is used to export a pymqmsgque data package as binary …
The dump is used to exchange data with external counterparts such as a database or a user-specific infrastructure.
There are 3 different function that deal with a dump:
C-API: MqSendEND_AND_TRANSACTION_RT - finish the send-data-block, call the remote service to do a longterm-transaction-call …
A longterm-transaction is a service-call with guaranteed delivery. guaranteed mean that the transaction is using the MqContextC STORAGE API to keep every step persistent being able to recover a step if something unexpected happen.
To link the result with the calling-environment a private transaction-item is used to save the calling-environment to the local-storage. If the local-storage is persistent, the calling-environment will even survive an application restart.
If the result from the public service-call arrives, the transaction-item will be extracted from the result and the private calling-environment will be initialized from the local-storage.
To create a persistent-transaction the MqContextC STORAGE API have to be setup as persistent. By default, an in-memory MqContextC STORAGE API is used.
The longterm-transaction-call has TWO results…
In difference to MqSendEND_AND_WAIT and MqSendEND_AND_CALLBACK a longterm-transaction-call have to survive an application restart. To achieve this goal… two features have to be available to process the results:
The transaction-item is the entry-id from the local internal database and is the public handle of the private data. If the transaction-item-private-data should be persistent (survive an application restart) the internal database have to be persistent using the Storage option.
The transaction-item-private-data requires a MqSendT_START … MqSendT_END at the beginning of the send-data-package.
The list of data-items in the transaction-item-private-data have to be provided by the programmer and is used to initialise the environment in the callback (for example an external database-entry-id).
In the transaction-callback the transaction-item-private-data have to be extracted with MqReadT_START … MqReadT_END at the beginning of the read-data-package.
The transaction-callback have to be a MqContextC_ServiceApi_Identifer defined with MqServiceCreate in the application setup code (like IServerSetup) and have to be available after an application restart.
If an error is raised on the server during the database-insert the function will return this error immediately. During waiting for the return the event-loop is used to process other events. Use IEvent to add your tasks into the event loop.
[in] | mkrt | the MkRuntimeS instance to work on - the runtime argument, used by MK_RT_CALL (C-only) |
[in] | ctx | the MqContextS instance to work on |
[in] | token | the MqContextC SERVICE API to identify the service |
[in] | callback | the MqContextC_ServiceApi_Identifer of the MqContextC_ServiceApi_Callback |
[in] | timeout | in seconds until a timeout-error is raised (possible values like ProcessEvent) (MK_TIMEOUT_DEFAULT=MK_TIMEOUT_USER ) |
MkExceptionC | → The default-exception from the Programming-Language-Micro-Kernel (PLMK) |
Example from MyTransaction.py
→ make a logterm-transaction-call using the LOW and the HIGH api
import sys from pymkkernel import * from pymqmsgque import * def callback(ctx): ctx.ReadT_START() myPrivateHandle = ctx.ReadSTR() ctx.ReadT_END() myServiceResult = ctx.ReadI32() print(f"myPrivateHandle={myPrivateHandle}, myServiceResult={myServiceResult}") ctx = MqContextC() try: ctx.ConfigSetName("MyTransaction") # setup commandline arguments used for parsing args = MkBufferListC.CreateLA(sys.argv) # check if the '--token' option is available, default "SRVC" token = args.CheckOptionSTR("--token", "SRVC") # connect to the server ctx.LinkCreate(args) # register callback ctx.ServiceCreate("CLB1", callback) # send block using the LOW-Api ctx.SendSTART() ctx.SendT_START() ctx.SendSTR("Privat_Data_1") ctx.SendT_END() ctx.SendI32(11111) ctx.SendEND_AND_TRANSACTION(token, "CLB1") # send block using the HIGN-Api -> same as above, but shorter ctx.Send("T", "CLB1", f"{token}:(C)I", "Privat_Data_2", 22222) # now we wait for exact ONE result of the $ctx ctx.ProcessEvent(MqWaitOnEventE.OWN) except Exception as ex: ctx.ErrorCatch(ex); finally: ctx.Exit()
A pymqmsgque server requires the following setup:
example/py/MyServer.py
Example from MyServer.py
→ The minimal server looks like:
import sys from pymqmsgque import * # package-item class MyServer(MqContextC): # factory startup def __init__(self, tmpl=None): self.ConfigSetServerSetup(self.ServerSetup) super().__init__(tmpl) # service to serve all incoming requests for token "HLWO" def MyFirstService(self): self.SendSTART() self.SendSTR(self.ReadSTR() + " World") self.SendRETURN() # define a service as link between the token "HLWO" and the callback "MyFirstService" def ServerSetup(self): self.ServiceCreate("HLWO",self.MyFirstService) # package-main if __name__ == "__main__": # create the "MyServer" factory… and the object srv = MqFactoryC.Add(MyServer).New() try: srv.LinkCreate(sys.argv) srv.ProcessEvent(MqWaitOnEventE.FOREVER) except Exception as ex: srv.ErrorCatch(ex) finally: srv.Exit()
The server is started as network visible TCP server listen on PORT 2345
using a THREAD for every new connection request:
If you are using UNIX and if you want to setup a high-performance local server then use the build-in UDS (Unix-Domain-Sockets) capability to listen on the FILE /path/to/any/file.uds
instead on a network port:
Three things are important:
ctx.ServiceCreate(token:string[4], callback:callable)
function[constructor,static] MqContextC MqContextC.Create(?tmpl:MqContextC=None?)
Sending data is done using a minimum of 2 steps:
ctx.SendSTART()
The first three ctx.SendEND(token:string[4], ?timeout:MkTimeoutE|int32=DEFAULT?)... functions are used to call a remote service and the last one is used to answer an incoming service call. In-between ctx.SendSTART() and ctx.SendEND(token:string[4], ?timeout:MkTimeoutE|int32=DEFAULT?) ... other MqContextC SEND API style commands are available to fill the data package with data.
Services are created with the ctx.ServiceCreate(token:string[4], callback:callable)
function. The first parameter is a 4 byte Token as public name. 4 byte is required because this string is mapped to a 4 byte integer for speed reason. The second parameter is an object providing the MqContextC_ServiceApi_Callback interface.
The MqContextC_ServiceApi_Callback is called for every incoming service-request which belongs to token.
A pymqmsgque client requires the following setup:
Example from MyClient.py
→ The minimal client looks like:
import sys from pymqmsgque import * ctx = MqContextC() try: ctx.ConfigSetName("MyClient") ctx.LinkCreate(sys.argv) ctx.SendSTART() ctx.SendSTR("Hello") ctx.SendEND_AND_WAIT("HLWO") print(ctx.ReadSTR(), file=sys.stdout) except Exception as ex: ctx.ErrorCatch(ex); finally: ctx.Exit()
Example: To call a network visible TCP server listen on PORT 2345
use:
Example: To call a network visible UDP server listen on FILE /path/to/any/file.uds
use:
Example: To call a local server started by the client using PIPE communication use:
The FILTER MODE is used to define a command pipeline.
Example from manfilter.py
→ A minimal filter looks like:
import sys from pymqmsgque import * class ManFilter(MqContextC): def __init__(self, tmpl=None): MqContextC.__init__(self) def FTRcmd(ctx): ftr = ctx.SlaveGetFilter() ftr.SendSTART() while ctx.ReadItemExists(): ftr.SendSTR("<" + ctx.ReadSTR() + ">") ftr.SendEND_AND_WAIT("+FTR") ctx.SendRETURN() if __name__ == "__main__": srv = ManFilter() try: srv.ConfigSetIsServer(True) srv.LinkCreate(sys.argv) srv.ServiceCreate("+FTR", srv.FTRcmd) srv.ServiceProxy("+EOF") srv.ProcessEvent(MqWaitOnEventE.FOREVER) except Exception as ex: srv.ErrorCatch(ex); finally: srv.Exit()
Use manfilter.py in a LibMqMsgque command pipeline: