-
Notifications
You must be signed in to change notification settings - Fork 0
/
server
executable file
·104 lines (89 loc) · 3.19 KB
/
server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import random, sys
from common import *
class MessageStore:
def __init__(self):
self.messages = {}
def add(self, address, message):
queue = self.messages.get(address)
if queue is None:
queue = []
self.messages[address] = queue
queue.append(message)
def get(self, address):
queue = self.messages.get(address)
if queue is not None:
msg = queue.pop(0)
if not queue: del self.messages[address]
return msg
class Server(Handler):
def __init__(self, router, quiet):
self.router = router
self.quiet = quiet
self.tag = 0
self.messages = MessageStore()
def next_tag(self):
result = "tag%s" % self.tag
self.tag += 1
return result
def send(self, address, snd=None):
if snd is None:
links = self.router.outgoing(address)
snd = links.choose()
if snd is None: return 0
count = 0
while snd.credit and snd.queued < 1024:
msg = self.messages.get(address)
if not msg:
snd.drained()
return count
dlv = snd.delivery(self.next_tag())
snd.send(msg)
dlv.settle()
count += 1
if not self.quiet:
print "Sent message(%s): %s" % (address, msg)
return count
def on_link_flow(self, event):
if event.link.is_sender:
snd = event.link
self.send(snd.source.address or snd.target.address, snd)
def on_delivery(self, event):
dlv = event.delivery
if dlv.link.is_receiver and not dlv.partial:
address = dlv.link.target.address
msg = dlv.link.recv(dlv.pending)
self.messages.add(address, msg)
dlv.update(Delivery.ACCEPTED)
dlv.settle()
if not self.quiet:
print "Got message(%s): %s" % (address, msg)
self.send(address)
switches = [a for a in sys.argv[1:] if a.startswith("-")]
args = [a for a in sys.argv[1:] if not a.startswith("-")]
quiet = "-q" in switches
host = args.pop(0) if args and not args[0].isdigit() else "localhost"
port = int(args.pop(0)) if args else 5672
coll = Collector()
router = Router()
drv = Driver(coll, FlowController(1024), Handshaker(), router, Server(router, quiet))
acc = Acceptor(drv, host, port)
drv.run()