Documentation of the Test-Client-Server tool used for example.test
.
The Naming-Test-Tool tool is used to test the name-resolution-feature of pymqmsgque.
To perform the test 2 tools are used, the testclient and the testserver.
testclient | This tool creates multiple parent and child client-context, all using a |
testserver | This tool listen on the service GTCX and return the unique identification pattern of the server-context in duty. |
All context created, 8x client and 8x server, are connected using the pymqmsgque protocoll and build together a flat structure.
The GOAL for this setup is:
#+ #: -*- coding: utf8 -*- #: #: @file NHI1/example/py/testclient.py #: @brief testclient.py - 26 Jun 2024 - aotto1968 #: @copyright (C) NHI - #1 - Project - Group #: This software has NO permissions to copy, #: please contact AUTHOR for additional information #: @version 08a242faddae5101924d8fe811888e78892a82d9 #: @date Wed Jun 26 14:26:21 2024 +0200 #: @author aotto1968 <aotto1968@t-online.de> #: import sys import os from pymqmsgque import * ## setup the clients server = os.path.join(os.path.dirname(sys.argv[0]), "testserver.py") ## create object c0 = MqContextC() c00 = MqContextC() c01 = MqContextC() c000 = MqContextC() c1 = MqContextC() c10 = MqContextC() c100 = MqContextC() c101 = MqContextC() ## setup object link c0.LinkCreate( "c0", "--postfix", "0", "--prefix", "c", "--debug", os.environ.get("TS_DEBUG"), "@", os.environ.get("PYTHON"), server, "--prefix", "s") c00.LinkCreateChild(c0, "c00", "--postfix", "00" ) c01.LinkCreateChild(c0, "c01", "--postfix", "01" ) c000.LinkCreateChild(c00, "c000", "--postfix", "000" ) c1.LinkCreate( "c1", "--postfix", "1", "--prefix", "c", *sys.argv[1:]) c10.LinkCreateChild(c1, "c10", "--postfix", "10", ) c100.LinkCreateChild(c10, "c100", "--postfix", "100", ) c101.LinkCreateChild(c10, "c101", "--postfix", "101", ) ## my helper def Get(ctx): """ get data from pymqmsgque """ RET = '' ctx.SendSTART() ctx.SendEND_AND_WAIT("GTCX") RET += ctx.ConfigGetName() RET += "+" RET += ctx.ReadSTR() RET += ctx.ReadSTR() RET += ctx.ReadSTR() RET += ctx.ReadSTR() RET += ctx.ReadSTR() RET += ctx.ReadSTR() return RET ## do the tests print(Get(c0)) print(Get(c00)) print(Get(c01)) print(Get(c000)) print(Get(c1)) print(Get(c10)) print(Get(c100)) print(Get(c101)) ## do the cleanup c1.LinkDelete() c0.Exit()
#+ #: -*- coding: utf8 -*- #: #: @file NHI1/example/py/testserver.py #: @brief testserver.py - 26 Jun 2024 - aotto1968 #: @copyright (C) NHI - #1 - Project - Group #: This software has NO permissions to copy, #: please contact AUTHOR for additional information #: @version 08a242faddae5101924d8fe811888e78892a82d9 #: @date Wed Jun 26 14:26:21 2024 +0200 #: @author aotto1968 <aotto1968@t-online.de> #: import sys from pymqmsgque import * class testserver(MqContextC): def __init__(self, tmpl=None): self.ConfigSetServerSetup(self.ServerSetup) MqContextC.__init__(self) def GTCX(ctx): ctx.SendSTART() ctx.SendI32(ctx.LinkGetCtxId()) ctx.SendSTR("+") if (ctx.LinkIsParent()): ctx.SendI32(-1) else: ctx.SendI32(ctx.LinkGetParent().LinkGetCtxId()) ctx.SendSTR("+"); ctx.SendSTR(ctx.ConfigGetName()); ctx.SendSTR(":") ctx.SendRETURN() def ServerSetup(ctx): ctx.ServiceCreate("GTCX",ctx.GTCX) if __name__ == "__main__": srv = MqFactoryC.Add(testserver).New() try: srv.LinkCreate(sys.argv) srv.ProcessEvent(MqWaitOnEventE.FOREVER) except Exception as ex: srv.ErrorCatch(ex); finally: srv.Exit()