Entry 5064
deszka
Submitted by anonymous
on July 1, 2010 at 3:37 p.m.
Language: Python. Code size: 6.6 KB.
from twisted.internet.protocol import DatagramProtocol from twisted.internet import reactor, task import datetime import time import re from socket import SOL_SOCKET, SO_BROADCAST import signal, sys #### configuration settings #### # node's own IP address my_ip = '' # port, which the service will operate on, must be the same for every node port = 8991 # broadcast address of the subnet broadcast_ip = '192.168.5.255' # list of nodes nodes = ['192.168.5.1', '192.168.5.2', '192.168.5.3'] # priority list of resources, every resource is a node too (must be subset of # the list above) resource_order = ['192.168.5.2', '192.168.5.3'] # heartbeat frequency in seconds, how often should send a heartbeat heartbeat_freq = 0.1 # heartbeat treshold in seconds , the node will be treated as passive after this # amount of time without heartbeat received heartbeat_tres = 0.3 # Debug mode DEBUG = False def log(msg): if DEBUG: print str(datetime.datetime.now()) + " " + msg def printout(msg): print str(datetime.datetime.now()) + " " + msg # handles sigint def handler(signum, frame): reactor.stop() print printout("Killed") class Node(DatagramProtocol): """Main class for protocol, descended from twsited's DatagramProtocol""" def __init__(self): log("Creating Node object") # from configuration settings self.my_ip = my_ip self.port = port self.broadcast_ip = broadcast_ip self.nodes = dict(zip(nodes, [0.0] * len(nodes))) self.resource_order = resource_order self.heartbeat_tres = heartbeat_tres self.heartbeat_freq = heartbeat_freq # dynamic properties self.active_nodes = [my_ip] self.nodes[my_ip] = time.time() self.votes = {} self.voted_nodes = [] self.master = None self.election_phase = False self.majority = len(nodes) / 2 + 1 # recurring method-calls self.heartbeat_loop = task.LoopingCall(self.sendHeartbeat) self.check_nodes_loop = task.LoopingCall(self.checkNodes) self.election_loop = task.LoopingCall(self.checkElection) self.announce_loop = task.LoopingCall(self.sendAnnounce) self.check_master_loop = task.LoopingCall(self.checkMaster) def startProtocol(self): """Socket init, starts recurring method-calls""" log("Starting protocol") self.transport.socket.setsockopt(SOL_SOCKET, SO_BROADCAST, True) self.heartbeat_loop.start(self.heartbeat_freq) self.check_nodes_loop.start(self.heartbeat_freq) self.election_loop.start(self.heartbeat_freq) self.check_master_loop.start(self.heartbeat_freq) self.announce_loop.start(self.heartbeat_freq) def sendHeartbeat(self): """Broadcasts a heartbeat""" log("Sending heartbeat from %s" % my_ip) self.transport.write("hb %s" % my_ip, (self.broadcast_ip, self.port)) def heartbeatReceived(self, node): """Handles heartbeat messages""" if node in nodes: log("Heartbeat received from: %s" % node) self.nodes[node] = time.time() def checkNodes(self): """Checks nodes, active or dead decision""" log("Checking nodes") now = time.time() for node in self.nodes: node_active = node in self.active_nodes if now - self.nodes[node] > self.heartbeat_tres: if node_active: self.active_nodes.remove(node) else: if not node_active: self.active_nodes.append(node) log("-> Active nodes: %s" % str(self.active_nodes)) log("-> Current master: %s" % str(self.master)) def checkElection(self): """Switches into election phase, if neccessary""" log("Election check") if len(self.active_nodes) < self.majority: log("-> This node in the minority, dropping master") self.master = None self.election_phase = False elif self.master not in self.active_nodes: log("-> Election phase") self.master = None self.election_phase = True self.sendVote() def sendVote(self): """Broadcasts a vote for the prior active node""" if self.election_phase: for resource in self.resource_order: if resource in self.active_nodes: self.transport.write("vote %s" % resource, (self.broadcast_ip, self.port)) log("Voting for %s" % resource) break def voteReceived(self, node, src): """Handles vote messages""" if self.election_phase: if node in self.voted_nodes: return log("Vote accepted from: %s to %s" % (src, node)) if node not in self.votes: self.votes[node] = 1 else: self.votes[node] += 1 def checkMaster(self): """Checks majority of votes""" if self.election_phase: log("Checking master, votes: %s" % str(self.votes)) if self.votes.has_key(my_ip) and self.votes[my_ip] >= self.majority: self.master = my_ip self.election_phase = False self.votes = {} self.voted_nodes = [] printout("Election ended, I'm master: %s" % self.master) def sendAnnounce(self): """Announces myself as master""" if not self.election_phase and self.master == self.my_ip: log("-> Announce me as master: %s" % self.master) self.transport.write("master %s" % self.master, (self.broadcast_ip, self.port)) def announceReceived(self, node): """Handles master announce messages""" if self.election_phase and node in self.active_nodes: self.election_phase = False self.master = node log("Master elected: %s" % str(self.master)) def datagramReceived(self, datagram, host): """Handles incoming datagrams""" msg = str(datagram) log("Incoming datagram: %s" % msg) if re.match("hb", msg): self.heartbeatReceived(msg.split()[-1]) if re.match("vote", msg): self.voteReceived(msg.split()[-1], host) if re.match("master", msg): self.announceReceived(msg.split()[-1]) def main(): signal.signal(signal.SIGINT, handler) protocol = Node() t = reactor.listenUDP(port, protocol) reactor.run() if __name__ == '__main__': main()
This snippet took 0.03 seconds to highlight.
Back to the Entry List or Home.