Entry 5064

deszka

   

Submitted by anonymous on July 1, 2010 at 3:37 p.m.
Language: Python. Code size: 6.6 KB.

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor, task
import datetime
import time
import re
from socket import SOL_SOCKET, SO_BROADCAST
import signal, sys

#### configuration settings ####

# node's own IP address
my_ip = ''

# port, which the service will operate on, must be the same for every node
port = 8991

# broadcast address of the subnet
broadcast_ip = '192.168.5.255'

# list of nodes
nodes = ['192.168.5.1', '192.168.5.2', '192.168.5.3']

# priority list of resources, every resource is a node too (must be subset of
# the list above)
resource_order = ['192.168.5.2', '192.168.5.3']

# heartbeat frequency in seconds, how often should send a heartbeat
heartbeat_freq = 0.1

# heartbeat treshold in seconds , the node will be treated as passive after this
# amount of time without heartbeat received
heartbeat_tres = 0.3

# Debug mode
DEBUG = False

def log(msg):
    if DEBUG:
        print str(datetime.datetime.now()) + " " + msg

def printout(msg):
    print str(datetime.datetime.now()) + " " + msg
    
# handles sigint
def handler(signum, frame):
    reactor.stop()
    print
    printout("Killed")
    
class Node(DatagramProtocol):
    """Main class for protocol, descended from twsited's DatagramProtocol"""
    
    def __init__(self):
        log("Creating Node object")
        
        # from configuration settings
        self.my_ip = my_ip
        self.port = port
        self.broadcast_ip = broadcast_ip
        self.nodes = dict(zip(nodes, [0.0] * len(nodes)))
        self.resource_order = resource_order
        self.heartbeat_tres = heartbeat_tres
        self.heartbeat_freq = heartbeat_freq
        
        # dynamic properties
        self.active_nodes = [my_ip]
        self.nodes[my_ip] = time.time()
        self.votes = {}
        self.voted_nodes = []
        self.master = None
        self.election_phase = False
        self.majority = len(nodes) / 2 + 1

        # recurring method-calls
        self.heartbeat_loop = task.LoopingCall(self.sendHeartbeat)
        self.check_nodes_loop = task.LoopingCall(self.checkNodes)
        self.election_loop = task.LoopingCall(self.checkElection)
        self.announce_loop = task.LoopingCall(self.sendAnnounce)
        self.check_master_loop = task.LoopingCall(self.checkMaster)
    
    def startProtocol(self):
        """Socket init, starts recurring method-calls"""
        log("Starting protocol")
        self.transport.socket.setsockopt(SOL_SOCKET, SO_BROADCAST, True)
        self.heartbeat_loop.start(self.heartbeat_freq)
        self.check_nodes_loop.start(self.heartbeat_freq)
        self.election_loop.start(self.heartbeat_freq)
        self.check_master_loop.start(self.heartbeat_freq)
        self.announce_loop.start(self.heartbeat_freq)
        
    def sendHeartbeat(self):
        """Broadcasts a heartbeat"""
        log("Sending heartbeat from %s" % my_ip)
        self.transport.write("hb %s" % my_ip, (self.broadcast_ip, self.port))

    def heartbeatReceived(self, node):
        """Handles heartbeat messages"""
        if node in nodes:
            log("Heartbeat received from: %s" % node)
            self.nodes[node] = time.time()
    
    def checkNodes(self):
        """Checks nodes, active or dead decision"""
        log("Checking nodes")
        now = time.time()
        
        for node in self.nodes:
            node_active = node in self.active_nodes
            if now - self.nodes[node] > self.heartbeat_tres:
                if node_active:
                    self.active_nodes.remove(node)
            else:
                if not node_active:
                    self.active_nodes.append(node)
        log("-> Active nodes: %s" % str(self.active_nodes))
        log("-> Current master: %s" % str(self.master))
    
    def checkElection(self):
        """Switches into election phase, if neccessary"""
        log("Election check")
        if len(self.active_nodes) < self.majority:
            log("-> This node in the minority, dropping master")
            self.master = None
            self.election_phase = False
        elif self.master not in self.active_nodes:
            log("-> Election phase")
            self.master = None
            self.election_phase = True
            self.sendVote()
        
    def sendVote(self):
        """Broadcasts a vote for the prior active node"""
        if self.election_phase:
            for resource in self.resource_order:
                if resource in self.active_nodes:
                    self.transport.write("vote %s" % resource,
                        (self.broadcast_ip, self.port))
                    log("Voting for %s" % resource)
                    break
    
    def voteReceived(self, node, src):
        """Handles vote messages"""
        if self.election_phase:
            if node in self.voted_nodes:
                return
            
            log("Vote accepted from: %s to %s" % (src, node))
            if node not in self.votes:
                self.votes[node] = 1
            else:
                self.votes[node] += 1
            
    def checkMaster(self):
        """Checks majority of votes"""
        if self.election_phase:
            log("Checking master, votes: %s" % str(self.votes))
            if self.votes.has_key(my_ip) and self.votes[my_ip] >= self.majority:
                self.master = my_ip
                self.election_phase = False
                self.votes = {}
                self.voted_nodes = []
                printout("Election ended, I'm master: %s" % self.master)

    def sendAnnounce(self):
        """Announces myself as master"""
        if not self.election_phase and self.master == self.my_ip:
            log("-> Announce me as master: %s" % self.master)
            self.transport.write("master %s" % self.master,
                (self.broadcast_ip, self.port))
    
    def announceReceived(self, node):
        """Handles master announce messages"""
        if self.election_phase and node in self.active_nodes:
            self.election_phase = False
            self.master = node
            log("Master elected: %s" % str(self.master))
  
    def datagramReceived(self, datagram, host):
        """Handles incoming datagrams"""
        msg = str(datagram)
        log("Incoming datagram: %s" % msg)
        if re.match("hb", msg):
            self.heartbeatReceived(msg.split()[-1])
        if re.match("vote", msg):
            self.voteReceived(msg.split()[-1], host)
        if re.match("master", msg):
            self.announceReceived(msg.split()[-1])
                            
def main():
    signal.signal(signal.SIGINT, handler)
    protocol = Node()
    t = reactor.listenUDP(port, protocol)
    reactor.run()

if __name__ == '__main__':
    main()
    

This snippet took 0.03 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).