22 importlib.reload(test_objects)
23 from test_objects
import *
33 atlas.util.debug.debug_flag = 0
36 import atlas.transport.TCP.server
37 import atlas.transport.TCP.client
38 from atlas.transport.connection
import args2address
40 class TestServer(atlas.transport.TCP.server.SocketServer):
43 while self.
waiting or self.clients2send:
44 self.process_communication()
47 print(self.
waiting, self.clients2send)
50 def talk_op(self, op):
53 self.server.str_op = str(op)
60 self.reply_operation(op, reply)
61 self.server.waiting = 0
65 def sound_op(self, op):
75 self.send_operation(op)
79 self.process_communication()
82 tserver =
TestServer(
"test server", args2address(sys.argv), TestConnection)
86 tclient =
TestClient(
"test client", args2address(sys.argv))
87 tclient.connect_and_negotiate()
89 assert(tclient.str_op==
'{\012\011arg: {\012\011\011arg: {\012\011\011\011say: "Hello Joe!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["sound"]\012}\012')
94 assert(tserver.str_op==
'{\012\011arg: {\012\011\011say: "Hello world!"\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["talk"]\012}\012')