Demo entry 6771809

sheng

   

Submitted by anonymous on Nov 20, 2018 at 22:14
Language: Python. Code size: 13.2 kB.

import pika
from multiprocessing import Process, Lock, Manager
import json
import uuid
import sys
import time


class Node(object):
    def __init__(self, node_id, holder, node_total):
        super(Node, self).__init__()

        self._node_id = int(node_id)
        self._queue_id = 'queue_' + str(self._node_id)
        self._exchange_id = 'exchange_' + str(self._node_id)
        self._holder = holder
        self._using = False
        self._asked = False
        self._corr_id = str(uuid.uuid4())

        self._node_total = int(node_total)
        ## process
        self._fil_attente = Manager().list([])
        self.lock = Lock()

        ## run
    def run(self):

        self.connect()
        self.init_queue()
        self.init_exchange()
        self.init_callback_queue()
        #print(self._fil_attente)

    def connect(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

    def init_queue(self):
	for i in range(1, self._node_total +1):
            self._queue_name = "queue_" + str(i)
            self.channel.queue_declare(self._queue_id,durable=True)
        self.channel.basic_consume(self.on_receive, queue=self._queue_id)

    def init_exchange(self):
	print("begin to the server RabbitMQ")
	self.channel.exchange_declare(exchange=self._exchange_id, exchange_type="direct")
	
        for e in range(1, self._node_total +1):
            exchange_id = "exchange_" + str(e)
            #self.channel.exchange_declare(exchange=exchange_id, exchange_type="direct")
            if e == int(self._holder):
		print('finish to create the queue and the exchange')
		
                #self.channel.queue_bind(exchange=self._exchange_id, queue='queue_' +
                #		self._holder, routing_key='request_bind' + self._exchange_id)
		self.channel.queue_bind(exchange=self._exchange_id, queue='queue_' + \
                                        self._holder, routing_key=str(self._node_id)+'->' + str(self._holder))


    def init_callback_queue(self):
        result = self.channel.queue_declare(exclusive=True, durable=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)

    def make_request(self, message):
        
        self.lock.acquire()
	time.sleep(2)
	
        #print(type(self._fil_attente))
        if (message['message_type'] == 'REQUEST'):
            self._fil_attente.append(self._node_id)
        
	#print(self._fil_attente[0])
	#print(len(self._fil_attente))
	#print(self._holder != '0')
	#print(len(self._fil_attente) != 0)
	#print(not self._asked)
        if self._holder != '0' and len(self._fil_attente) != 0 and not self._asked:
        #if self._holder != '0' and self._fil_attente != None:
	    print(str(self._node_id)+" begins to make request and sends this request to my holder %s"% self._holder)	    
	    #print('make request')
            message_json = json.dumps(message)
            self.channel.basic_publish(exchange=self._exchange_id, routing_key= \
                                       str(self._node_id)+'->' + str(self._holder), body=message_json,
                                       properties=pika.BasicProperties(reply_to=self.callback_queue,
                                                                       correlation_id=self._corr_id,
                                                                       delivery_mode=2, ))
            self._asked = True
            #self.channel.queue_unbind(exchange=self._exchange_id, queue='queue_' + self._holder)
	    #ch.basic_ack(delivery_tag=method.delivery_tag)
	self.lock.release()


    def transfer_request(self, message):
        
        self.lock.acquire()
	print(str(self._node_id)+" transfers request because I am not the root")
        #print(type(self._fil_attente))     
	#print(self._fil_attente[0])
	#print(len(self._fil_attente))
	#print(self._holder != '0')
	#print(len(self._fil_attente) != 0)
	#print(not self._asked)
        if self._holder != '0' and len(self._fil_attente) != 0 and not self._asked:
            message_json = json.dumps(message)
            self.channel.basic_publish(exchange=self._exchange_id, routing_key= \
                                       str(self._node_id)+'->' + str(self._holder), body=message_json,
                                       properties=pika.BasicProperties(reply_to=self.callback_queue,
                                                                       correlation_id=self._corr_id,
                                                                       delivery_mode=2, ))

            #self.channel.queue_unbind(exchange=self._exchange_id, queue='queue_' + self._holder)
	    #ch.basic_ack(delivery_tag=method.delivery_tag)
	self.lock.release()
	#print(self._fil_attente)

    def on_response(self, ch, method, properties, body):

	if self._corr_id == properties.correlation_id and body:
	    self.response = body
	    response_content = json.loads(body)
	    # self.lock.acquire()
	    
	    print("get response contente : %s" % self.response)
	    print("current node is : "+str(self._node_id)+"   my holder is : "+ str(self._holder) )
	    print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
	#else:
	   #print("recovery")
	   #self.recovery()
    '''
    def recovery(self):
	
	print("~~~~~~~~~~~~~~The node %s broke down and begins to rebuild.~~~~~~~~~~~~~~"% str(self._node_id))
        time.sleep(5)
        #message = {'id': self._node_id, 'message_type': 'REQUEST'}
        #node.make_request(message)
        
        self.lock.acquire()
	#time.sleep(2)
	
        #print(type(self._fil_attente))
        if (message['message_type'] == 'RECOVER'):
            self._fil_attente.append(self._node_id)
        

        if self._holder != '0' and len(self._fil_attente) != 0 and not self._asked:
	    print("~~~~~~~~~~~~~~The node %s succeed in rebuilding.~~~~~~~~~~~~~~"% str(self._node_id))
	    print(str(self._node_id)+" begins to make request and sends this request to my holder %s"% self._holder)	    
            message_json = json.dumps(message)
            self.channel.basic_publish(exchange=self._exchange_id, routing_key=str(self._node_id)+'->' + str(self._holder), body=message_json,
                                       properties=pika.BasicProperties(reply_to=self.callback_queue,
                                                                       correlation_id=self._corr_id,
                                                                       delivery_mode=2, ))
            self._asked = True
	self.lock.release()
	
    '''	

    def on_receive(self, ch, method, properties, body):
	
        message_received = json.loads(body)
	
	#print(message_received)
        if message_received['message_type'] == "REQUEST":
	    self.lock.acquire()
            self._fil_attente.append(message_received['id'])
            response = {'id': self._node_id, 'message_type': 'response'}
            response_json = json.dumps(response)
            self.channel.basic_publish(exchange="", routing_key=properties.reply_to, body=response_json,
                                       properties=pika.BasicProperties(correlation_id=properties.correlation_id,
                                                                       delivery_mode=2, ))

            ch.basic_ack(delivery_tag=method.delivery_tag)
	    self.lock.release()
            #print("current holder is : " + self._holder )
	    if self._holder != '0':
		 #print('here')
		 new_message = {'id': self._node_id, 'message_type': 'TRANSFER'}
		 print(str(self._node_id)+" creates new message TRANSFERT")
		 self.transfer_request(new_message)
            else:
		print(str(self._node_id)+ " assign the token")
		self.assign_token()
	if message_received['message_type'] == "TRANSFER":
	    self.lock.acquire()
            self._fil_attente.append(message_received['id'])
            response = {'id': self._node_id, 'message_type': 'response'}
            response_json = json.dumps(response)
            self.channel.basic_publish(exchange="", routing_key=properties.reply_to, body=response_json,
                                       properties=pika.BasicProperties(correlation_id=properties.correlation_id,
                                                                       delivery_mode=2, ))

            ch.basic_ack(delivery_tag=method.delivery_tag)
	    self.lock.release()
            #print("current holder:" + self._holder )
	    if self._holder != '0':
		 #print('here')
		 new_message = {'id': self._node_id, 'message_type': 'TRANSFER'}
		 print(str(self._node_id)+" creates new message TRANSFERT")
		 new_message_json = json.dumps(new_message)
		 self.channel.basic_publish(exchange=self._exchange_id, routing_key= \
                                            str(self._node_id)+'->' + str(self._holder), body=new_message_json,
                                       properties=pika.BasicProperties(correlation_id=properties.correlation_id,
                                                                       delivery_mode=2, ))
            else:
		print(str(self._node_id)+ " assign the token")
		self.assign_token()
	    
	

        if message_received['message_type'] == "Token":
	    self.lock.acquire()
            response = {'id': self._node_id, 'message_type': 'response'}
            response_json = json.dumps(response)
            self.channel.basic_publish(exchange="", routing_key=properties.reply_to, body=response_json,
                                       properties=pika.BasicProperties(correlation_id=properties.correlation_id,
                                                                       delivery_mode=2, ))

            ch.basic_ack(delivery_tag=method.delivery_tag)
	    self.lock.release()
	    #print("next assign")
	    self.assign_token()
	
    def enter_CS(self):
	print("~~~~~~~~~~~~~~~get into critical section~~~~~~~~~~~~~~~~~~~")
	
	

	time.sleep(2)
	print("############# I have finished to use resource in the critical section #############")
	self._using = False
	self._asked = False
	#message = {'id': self._node_id, 'message_type': 'release'}
	print("~~~~~~~~~~~~~release the critical section~~~~~~~~~~~~~~~~~~~~~~~~~")
	print("%s node begins to assign the token to another node"% str(self._node_id))

    def assign_token(self):
	self.lock.acquire()
	if(not self._using and len(self._fil_attente) != 0):
		next_node = self._fil_attente.pop()
                self._holder = str(next_node) 
		self._asked = False
		print("pop my holder: " + str(next_node) + " (if holder is 0, it's myself)")
		if next_node == '0':
			self._using = True
		elif next_node == self._node_id:
			print("I have got the token !")
			print("begins to enter the critical section !")
			self._holder = '0'
			print("current node is : "+str(self._node_id)+"   my holder is myself " )
			self.enter_CS()
		else: 
			self.channel.queue_bind(exchange=self._exchange_id, queue= \
                                                'queue_' + str(next_node), routing_key=str(self._node_id)+'->' + str(self._holder))
			print(str(self._node_id)+" continue to transfer token")
			message_token = {'id': self._node_id, 'message_type': 'Token'}
			message_json = json.dumps(message_token)
            		self.channel.basic_publish(exchange=self._exchange_id, routing_key=\
                                                   str(self._node_id)+'->' + str(self._holder), body=message_json,
                                       properties=pika.BasicProperties(reply_to=self.callback_queue,
                                                                       correlation_id=self._corr_id,
                                                                       delivery_mode=2, ))
			#ch.basic_ack(delivery_tag=method.delivery_tag)
	self.lock.release()
        
	
if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("your input of parameters is wrong")
    node_id = sys.argv[1]
    holder = sys.argv[2]
    total = sys.argv[3]
    node = Node(node_id, holder, total)
    #node = Node(node_id,holder)
    node.run()

    p1 = Process(target=node.channel.start_consuming, args=())
    p1.start()
    while (True):
        method = raw_input("select your fonction:\n 1->REQUEST,2->QUIT:\n")
        if method == "" or (method != "1" and method != "2"):
            print("Enter error")
            continue
        if(method == '2'):
            print("quit the connection")
            p1.terminate()
            node.connection.close()
            sys.exit(0)
        if method == '1':
            message = {'id': node_id, 'message_type': 'REQUEST'}
            node.make_request(message)
        else:
            continue	
	'''
        if method == '3':
            message = {'id': node_id, 'message_type': 'RECOVERY'}
            node.make_request(message)
	'''
                 

This snippet took 0.02 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).