Atlas  0.7.0
Networking protocol for the Worldforge system.
test_server.py
1 #test TCP/IP server
2 
3 #Copyright 2002 by AIR-IX SUUNNITTELU/Ahiplan Oy
4 
5 #This library is free software; you can redistribute it and/or
6 #modify it under the terms of the GNU Lesser General Public
7 #License as published by the Free Software Foundation; either
8 #version 2.1 of the License, or (at your option) any later version.
9 
10 #This library is distributed in the hope that it will be useful,
11 #but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 #Lesser General Public License for more details.
14 
15 #You should have received a copy of the GNU Lesser General Public
16 #License along with this library; if not, write to the Free Software
17 #Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 
19 
20 import test_objects
21 import importlib
22 importlib.reload(test_objects)
23 from test_objects import *
24 
25 import os, time
26 import pdb
27 #pdb.set_trace()
28 print_debug = 0
29 if print_debug:
30  print("test_server:")
31 
32 import atlas.util.debug
33 atlas.util.debug.debug_flag = 0
34 
35 import atlas
36 import atlas.transport.TCP.server
37 import atlas.transport.TCP.client
38 from atlas.transport.connection import args2address
39 
40 class TestServer(atlas.transport.TCP.server.SocketServer):
41  def loop(self):
42  self.waiting = 1
43  while self.waiting or self.clients2send:
44  self.process_communication()
45  self.idle()
46  if print_debug:
47  print(self.waiting, self.clients2send)
48 
49 class TestConnection(atlas.transport.TCP.server.TcpClient):
50  def talk_op(self, op):
51  if print_debug:
52  print(repr(str(op)))
53  self.server.str_op = str(op)
54  reply = atlas.Operation("sound",
55  atlas.Operation("talk",
56  atlas.Object(say="Hello %s!" % op.from_),
57  from_=op.from_),
58  from_=op.from_
59  )
60  self.reply_operation(op, reply)
61  self.server.waiting = 0
62 
63 
64 class TestClient(atlas.transport.TCP.client.TcpClient):
65  def sound_op(self, op):
66  self.waiting = 0
67  self.str_op = str(op)
68  if print_debug:
69  print(repr(str(op)))
70 
71  def loop(self):
72  op = atlas.Operation("talk",
73  atlas.Object(say="Hello world!"),
74  from_="Joe")
75  self.send_operation(op)
76  self.waiting = 1
77  while self.waiting:
78  time.sleep(0.1)
79  self.process_communication()
80 
81 
82 tserver = TestServer("test server", args2address(sys.argv), TestConnection)
83 
84 res = os.fork()
85 if res==0:
86  tclient = TestClient("test client", args2address(sys.argv))
87  tclient.connect_and_negotiate()
88  tclient.loop()
89  assert(tclient.str_op=='{\012\011arg: {\012\011\011arg: {\012\011\011\011say: "Hello Joe!"\012\011\011},\012\011\011from: "Joe",\012\011\011objtype: "op",\012\011\011parents: ["talk"]\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["sound"]\012}\012')
90  if print_debug:
91  print("client exits")
92 else:
93  tserver.loop()
94  assert(tserver.str_op=='{\012\011arg: {\012\011\011say: "Hello world!"\012\011},\012\011from: "Joe",\012\011objtype: "op",\012\011parents: ["talk"]\012}\012')
95  if print_debug:
96  print("server exits")
97  os.wait()
test_server.TestServer
Definition: test_server.py:40
test_server.TestClient.str_op
str_op
Definition: test_server.py:67
test_server.TestClient
Definition: test_server.py:64
test_server.TestServer.waiting
waiting
Definition: test_server.py:42
atlas.Operation
def Operation(parent, arg=Object(), **kw)
Definition: __init__.py:196
atlas.util.debug
Definition: debug.py:1
test_server.TestClient.waiting
waiting
Definition: test_server.py:66
test_server.TestConnection
Definition: test_server.py:49
atlas.Object
Definition: __init__.py:36