Demo entry 6208580

Test

   

Submitted by anonymous on Oct 12, 2016 at 08:36
Language: Python. Code size: 20.5 kB.

#! /usr/bin/python
# -*- coding: <Latin-1> -*-

import PyDev_CANdela
import ctypes, numpy
import time, struct, re

class Message_Struct(ctypes.Structure):
    """
    Class to create a Struct which descripes a CAN Message.
    """
    _fields_ = [("ProtocolID", ctypes.c_ulong),
                ("RxStatus", ctypes.c_ulong),
                ("TxFlags", ctypes.c_ulong),
                ("Timestamp", ctypes.c_ulong),
                ("DataSize", ctypes.c_ulong),
                ("ExtraDataIndex", ctypes.c_ulong),
                ("Data", ctypes.c_ubyte * 4128)]

class Flow_Control_Par_Struct (ctypes.Structure):
    """
    Class to create a Struct which descripes the Flow Control Parameter.
    """
    _fields_=[("Parameter",ctypes.c_ulong),
              ("Value", ctypes.c_ulong)]

class Flow_Control_Config_Struct (ctypes.Structure):
    """
    Class to create a Struct which descripes the Flow Control Configuration.
    """
    _fields_=[("NumOfParams",ctypes.c_ulong),
              ("ConfigPtr", ctypes.POINTER(Flow_Control_Par_Struct))]



class CAN_J2534_Diagnose:
    """
    Class to use the diagnostic Functions via J2534
    """

    def __init__ (self, debug = False):
        """
        Initialisation function for the Connection Parameters of J2534.

        Syntax:         CAN_J2534_Diagnose()
        Parameter:      -
        Return Value:    -
        Exception:      -
        Description:    Initialisation function for the Connection Parameters of J2534.
        """
        self.RetErrorCode = {0x00:"STATUS_NOERROR",0x01:"ERR_NOT_SUPPORTED",0x02:"ERR_INVALID_CHANNEL_ID",0x03:"ERR_INVALID_PROTOCOL_ID",
                             0x04:"ERR_NULL_PARAMETER",0x05:"ERR_INVALID_IOCTL_VALUE",0x06:"ERR_INVALID_FLAGS",0x07:"ERR_FAILED",
                             0x08:"ERR_DEVICE_NOT_CONNECTED",0x09:"ERR_TIMEOUT",0x0A:"ERR_INVALID_MSG",0x0B:"ERR_INVALID_TIME_INTERVAL",
                             0x0C:"ERR_EXCEEDED_LIMIT",0x0D:"ERR_INVALID_MSG_ID",0x0E:"ERR_DEVICE_IN_USE",0x0F:"ERR_INVALID_IOCTL_ID",
                             0x10:"ERR_BUFFER_EMPTY",0x11:"ERR_BUFFER_FULL",0x12:"ERR_BUFFER_OVERFLOW",0x13:"ERR_PIN_INVALID",
                             0x14:"ERR_CHANNEL_IN_USE",0x15:"ERR_MSG_PROTOCOL_ID",0x16:"ERR_INVALID_FILTER_ID",0x17:"ERR_NO_FLOW_CONTROL",
                             0x18:"ERR_NOT_UNIQUE",0x19:"ERR_INVALID_BAUDRATE",0x1A:"ERR_INVALID_DEVICE_ID"}

        self.NULL               = ctypes.c_void_p(None)
        self.pDeviceID          = ctypes.c_ulong(0x00)
        self.ProtocollID        = ctypes.c_ulong(0x06)
        self.pChannelID         = ctypes.c_ulong(0x00)
        self.TxFlags            = ctypes.c_ulong(0x100)
        self.RxStatus           = ctypes.c_ulong(0x100)
        self.Timestamp          = ctypes.c_ulong(0x00)
        self.FilterType         = ctypes.c_ulong(0x03)
        self.pFilterID          = ctypes.c_ulong(0x00)
        self.Flags              = ctypes.c_ulong(0x100)
        self.BaudRate           = ctypes.c_ulong(250000)
        self.IoctlID            = ctypes.c_ulong(0x08)
        self.ISO15765_BS        = ctypes.c_ulong(0x1E) #ISO15765 specific, the block size for segmented transfers.
        self.ISO15765_STMIN     = ctypes.c_ulong(0x1F) #ISO15765 specific, the separation time for segmented transfers
        self.ISO15765_WFT_MAX   = ctypes.c_ulong(0x25)
        self.SET_CONFIG         = ctypes.c_ulong(0x02)
        self.GET_CONFIG         = ctypes.c_ulong(0x01)

        self.Negativ_Respond    = "7F"
        self.sendID             = [0x0C,0xDA,0x03,0xF9]
        self.receiveID          = [0x0C,0xDA,0xF9,0x03]
        self.Timeout_Write      = ctypes.c_ulong(100)
        self.Timeout_Read       = ctypes.c_ulong(500)
        self.NumMsgs_Write      = ctypes.c_ulong(1)
        self.emptyData          = []



        self._initialize_J2534_Functions()


    def _initialize_J2534_Functions(self):
        """
        This function initialize the Functions which are defined in the Hardware specific .dll File

        Syntax:         _initialize_J2534_Functions()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function initialize the Functions which are defined in the Hardware specific .dll File
        """

        self.candll=ctypes.WinDLL(r"I:\cit-elektronik-w30\Niggemeyer\TN_Praxisarbeit\TN_Praxisarbeit_WS1617\pycanlibxl\PCANPT32.dll")


        self.candll.PassThruOpen.argtypes   		=  (
													   ctypes.c_void_p,                 # NULL
													   ctypes.POINTER(ctypes.c_ulong)   # pDeviceID
					   								   )

        self.candll.PassThruConnect.argtypes 		=  (
													   ctypes.c_ulong,                  # pDeviceID
													   ctypes.c_ulong,                  # ProtocollID
													   ctypes.c_ulong,                  # Flags
													   ctypes.c_ulong,                  # BaudRate
													   ctypes.POINTER(ctypes.c_ulong)   # pChannelID
													   )

        self.candll.PassThruStartMsgFilter.argtypes =  (
								                       ctypes.c_ulong,                  # pChannelID
													   ctypes.c_ulong,                  # FilterType
													   ctypes.Array,                    # CAN_MsgMask_Array
							 						   ctypes.Array,                    # CAN_MsgPattern_Array
													   ctypes.Array,                    # CAN_FlowControlMsg_Array
													   ctypes.POINTER(ctypes.c_ulong)   # pFilterID
													   )

        self.candll.PassThruIoctl.argtypes 			= (
								                       ctypes.c_ulong,                  # pChannelID
								  					   ctypes.c_ulong,                  # SET_CONFIG
													   ctypes.POINTER(Flow_Control_Config_Struct),
													   ctypes.c_void_p                  # NULL
													   )


        self.candll.PassThruWriteMsgs.argtypes 		=  (
								                       ctypes.c_ulong,                  # pChannelID
													   ctypes.Array,                    # CAN_Msg_Array
													   ctypes.POINTER(ctypes.c_ulong),  # Number_of_Msg
													   ctypes.c_ulong                   # Timeout
													   )

        self.candll.PassThruReadMsgs.argtypes 		=  (
								                       ctypes.c_ulong,                  # pChannelID
													   ctypes.Array,                    # CAN_Msg_Array_Read
													   ctypes.POINTER(ctypes.c_ulong),  # Number_of_Msg
													   ctypes.c_ulong                   # Timeout
													   )

        #candll.PassThruDisconnect      			= (ctypes.c_ulong)                  # pDeviceID
        #candll.PassThruClose.argtypes  			= (ctypes.c_ulong)                  # pChannelID

        self._Build_Flow_Control_Config_Struct()


    def _Build_Flow_Control_Config_Struct(self):
        """
        This function creates a Flow_Control_Config_Struct which contains the Parameter of the Flow Control

        Syntax:         _Build_Flow_Control_Config_Struct()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function creates a Flow_Control_Config_Struct which contains the Parameter of the Flow Control.
                        ISO15765_BS:    Bock size
                        ISO15765_STMIN: Minimum separation time
                        ISO15765_WFT_MAX: Max wait flow control frames allowed
        """
        Flow_Control_Par_Struct_0 					= Flow_Control_Par_Struct(self.ISO15765_BS, 0x00)
        Flow_Control_Par_Struct_1 					= Flow_Control_Par_Struct(self.ISO15765_STMIN, 0x00)
        Flow_Control_Par_Struct_2 					= Flow_Control_Par_Struct(self.ISO15765_WFT_MAX, 0x00)
        Flow_Control_Par_Struct_Temp_Array 			= Flow_Control_Par_Struct * 3
        Flow_Control_Par_Struct_Array 				= Flow_Control_Par_Struct_Temp_Array(Flow_Control_Par_Struct_0, Flow_Control_Par_Struct_1,Flow_Control_Par_Struct_2)
        self.Flow_Control_Config_Struct_0 			= Flow_Control_Config_Struct(0x02,Flow_Control_Par_Struct_Array)
        #self.Flow_Control_Config_Struct_Temp_Array = Flow_Control_Config_Struct*1
        #self.Flow_Control_Config_Struct_Array 		= self.Flow_Control_Config_Struct_Temp_Array(self.Flow_Control_Config_Struct_0)



        self._Build_Filter_Config_Arrays();


    def _Build_Filter_Config_Arrays(self):
        """
        This function creates Build_Message Structs which contains the Parameter of the CAN Filter

        Syntax:         _Build_Filter_Config_Arrays()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function creates Build_Message Structs which contains the Parameter of the CAN Filter

        """
        MsgPattern = self._Build_Message(self.receiveID, [])
        CAN_Temp_MsgPattern_Array = Message_Struct * 1
        self.CAN_MsgPattern_Array = CAN_Temp_MsgPattern_Array(MsgPattern) #alle Nachrichten auf dem Bus die nicht mit der ID in dem Array uebereinstimmen werden abgewiesen

        FlowControlMsg = self._Build_Message(self.sendID, [])
        CAN_Temp_FlowControlMsg_Array = Message_Struct * 1
        self.CAN_FlowControlMsg_Array = CAN_Temp_FlowControlMsg_Array(FlowControlMsg) #wie soll ich auf eine Nachricht mit der ID des Testers reagieren? Flow Control

        MsgMask = self._Build_Message([0xFF,0xFF, 0xFF, 0xFF], [])
        CAN_Temp_MsgMask_Array = Message_Struct * 1
        self.CAN_MsgMask_Array = CAN_Temp_MsgMask_Array(MsgMask)

        self._load_Candela_File()

    def _load_Candela_File(self):
        """
        This function creates an object of class DevCANdela and load the .cdd File

        Syntax:         self._load_Candela_File()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function creates an object of class DevCANdela and load the .cdd File

        """
        self.candela = PyDev_CANdela.DevCANdela()
        self.candela.open("I:\cit-elektronik-w30\Niggemeyer\TN_Praxisarbeit\TN_Praxisarbeit_WS1617\Tetris_Parametermigration\cdd\KwpTCU_CIT.cdd")

    def Open_CAN_Session(self):
        """
        This function open a CAN Session and set the Filter and Flow Control Parameter

        Syntax:         self.Open_CAN_Session()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function open a CAN Session and set the Filter and Flow Control Parameter

        """
        print "Open\t\t\t\t"        +self.RetErrorCode[self.candll.PassThruOpen		(
																					self.NULL,      	# spacer
																					self.pDeviceID      # return Device ID
																					)]

        print "Connect\t\t\t\t"     +self.RetErrorCode[self.candll.PassThruConnect	(
																					self.pDeviceID,     # use Device ID to connect
																					self.ProtocollID,   # ProtocollID = 0x06 = ISO15765
																					self.Flags,         # Falgs => Bit8 high => 0x100 => Extended ID
																					self.BaudRate,      # Baudrate = 250000
																					self.pChannelID     # return Channel ID
															   					  	)]

        print "Set Filter\t\t\t"    +self.RetErrorCode[self.candll.PassThruStartMsgFilter	(
																							self.pChannelID ,                   # use Channel ID to set Filter
																							self.FilterType ,                   # Filtertype = 0x03 = Flow Control Filter
																							self.CAN_MsgMask_Array ,            # set Mask to 			0xFFFFFFFF
																							self.CAN_MsgPattern_Array ,         # set Pattern to 		0x0CDAF903
																							self.CAN_FlowControlMsg_Array ,     # set FlowControl Mask  0x0CDA03F9
																							self.pFilterID                      # return Filter ID
																							)]

        print "Set Ioctl Config\t"  +self.RetErrorCode[self.candll.PassThruIoctl	(
																					self.pChannelID,                    # use Channel ID to set Config
																					self.SET_CONFIG,                    # set IOCTL to input config
																					self.Flow_Control_Config_Struct_0,  # set "Blocksize" and "St min" for Flow Control
																					self.NULL                           # spacer
																					)]




    def Close_CAN_Session(self):
        """
        This function close a CAN Session

        Syntax:         self.Close_CAN_Session()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function close a CAN Session

        """
        print "Disconnect\t\t\t"    +self.RetErrorCode[self.candll.PassThruDisconnect(self.pChannelID)]
        print "Close\t\t\t\t"       +self.RetErrorCode[self.candll.PassThruClose(self.pDeviceID)]

    def Start_Diagnostic_Session(self):
        """
        This function starts a Diagnostic Session

        Syntax:         self.Start_Diagnostic_Session()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function starts a Diagnostic Session
                        Diagnostic Session: Read Identification e.g. SoftwareVersion
        """
        self._Start_Session(0)

    def Start_SetData_Session(self):
        """
        This function starts a SetData Session

        Syntax:         self.Start_SetData_Session()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function starts a SetData Session
                        SetData Session: Read Identification e.g. SoftwareVersion
                                         Write some Parametrisation
        """
        self._Start_Session(1)

    def Start_Developer_Session(self):
        """
        This function starts a Developer Session

        Syntax:         self.Start_Developer_Session()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function starts a Developer Session
                        Developer Session:  Read and Write everything
        """
        self._Start_Session(3)

    def Start_LowLevel_Session(self):
        """
        This function starts a LowLevel Session

        Syntax:         self.Start_LowLevel_Session()
        Parameter:      -
        Return Value:   -
        Exception:      -
        Description:    This function starts a LowLevel Session
                        LowLevel Session: trigger Hardware e.g. trigger digital Output
        """
        self._Start_Session(4)



    def _Start_Session(self,Prio):
        """
        This function starts a Session an handles the Authorisation

        Syntax:         self._Start_Session()
        Parameter:      Prio - int - Prioritization
        Return Value:   -
        Exception:      -
        Description:    This function starts a Session an handles the Authorisation
                        Prio 0: Diagnostic
                        Prio 1: SetData
                        Prio 3: Developer
                        Prio 4: LowLevel
        """
        self.sendRequest('Stop_Session_Stop', self.emptyData)

        if Prio == 4:
            self.sendRequest('StartLowLevelSession_Start', self.emptyData)

        else:
            self.sendRequest('StartDiagnosticSession_Start', self.emptyData)

        self.sendRequest('ECU_PIN_Read', self.emptyData)
        Seed_PrioX_RequestSeed = "Seed_Prio"+str(Prio)+"_RequestSeed"
        Seed_PrioX_RequestSeed_Response = self.sendRequest(Seed_PrioX_RequestSeed, self.emptyData)

        Encode_int = int(Seed_PrioX_RequestSeed_Response[2]+Seed_PrioX_RequestSeed_Response[3],16)
        key_int = Encode_int^self.candela.getKey(Prio)
        ##print "%.4X" %(key_int)
        key_int_list = [(key_int&0xFF00)/0xFF,key_int%0x100]
        ##print "%.2X %.2X" %(key_int_list[0], key_int_list[1])
        Key_PrioX_SendKey = "Key_Prio"+str(Prio)+"_SendKey"
        self.sendRequest(Key_PrioX_SendKey,key_int_list)

    def Stop_Session(self):
        self.sendRequest("Stop_Session_Stop",[])


    def sendRequest(self, DiagSession, data):
        """
        This function sends a request to the GCU and returns the response

        Syntax:         Response_List = sendRequest(Request_String)
        Parameter:      DiagSession - string - Sessionname e.g. "Stop_Session_Stop"
        Return Value:   List of Response Values
        Exception:      -
        Description:    This function sends a request to the GCU and returns the response
        """

        SendID_list = self.candela.getId(DiagSession)+data

        CAN_Send_Msg = self._Build_Message(self.sendID, SendID_list)
        CAN_Temp_Msg_Array = Message_Struct * 1
        self.CAN_Msg_Array = CAN_Temp_Msg_Array(CAN_Send_Msg)

        CAN_Read_Msg = self._Build_Message([], [])
        CAN_Temp_Msg_Array_Read = Message_Struct * 1
        self.CAN_Msg_Array_Read = CAN_Temp_Msg_Array_Read(CAN_Read_Msg)

        print DiagSession
        print "Write\t\t\t\t"        +self.RetErrorCode[self.candll.PassThruWriteMsgs(self.pChannelID, self.CAN_Msg_Array, self.NumMsgs_Write, self.Timeout_Write)]
        print "Read\t\t\t\t"        +self.RetErrorCode[self.candll.PassThruReadMsgs(self.pChannelID, self.CAN_Msg_Array_Read, ctypes.c_ulong(1), self.Timeout_Read)]
        print "Read\t\t\t\t"        +self.RetErrorCode[self.candll.PassThruReadMsgs(self.pChannelID, self.CAN_Msg_Array_Read, ctypes.c_ulong(1), self.Timeout_Read)]
        print "Read\t\t\t\t"        +self.RetErrorCode[self.candll.PassThruReadMsgs(self.pChannelID, self.CAN_Msg_Array_Read, ctypes.c_ulong(1), self.Timeout_Read)]
        print "\n"

        Response_Raw_Data_List = self._int_to_hex(self.CAN_Msg_Array_Read[0].Data[:self.CAN_Msg_Array_Read[0].DataSize])
        Response_Data_List     = Response_Raw_Data_List[4:len(Response_Raw_Data_List)]

        if not Response_Data_List:
            print "No Response Data"
        else:
            if (Response_Data_List[0] == self.Negativ_Respond):
                Error_Num = int(Response_Data_List[len(Response_Data_List)-1])
                print self.candela.getNegResponseCode(Error_Num)
            else:
                return Response_Data_List


    def _Build_Message(self, messageId,data):
        """
        This function creates a CAN Message

        Syntax:         CAN_Msg = _Build_Message(messageId,data)
        Parameter:      messageId - hex_array*4 - ID GCU e.g. [0x0C,0xDA,0x03,0xF9]
                        data      - hex_array   - Data to send
        Return Value:   List of Response Values
        Exception:      -
        Description:    This function sends a request to the GCU and returns the response
        """
        temp_data = []
        temp_data.extend(messageId)
        temp_data.extend(data)
        DataSize = ctypes.c_ulong(len(temp_data))
        ExtraDataIndex = DataSize
        array_numpy = numpy.zeros(4128, numpy.ubyte)
        array_numpy[:len(temp_data)] = numpy.array(temp_data)
        CAN_Msg = Message_Struct(self.ProtocollID ,self.RxStatus ,self.TxFlags ,self.Timestamp ,DataSize ,ExtraDataIndex ,numpy.ctypeslib.as_ctypes(array_numpy))
        return CAN_Msg


    def _int_to_hex(self, Int_Array):
        """
        This function converts an int array into a hex array

        Syntax:         Hex_Array = _int_to_hex(Int_List)
        Parameter:      Int_Array - Array - e.g. [12, 15, 16, 18]
        Return Value:   Hex Array - Array - e.g. [0xC, 0xF, 0x10, 0x12]
        Exception:      -
        Description:    This function converts an int array into a hex array
        """
        Hex_Array = []
        for x in Int_Array:
            Hex_Array.append("%.2X" %(x))
        return Hex_Array


if __name__ == '__main__':
    Session = CAN_J2534_Diagnose()
    Session.Open_CAN_Session()
    Session.Start_Developer_Session()
    Session.sendRequest("Set_Parameter_Start",[01])
    Adaptive_Kennlinie = Session.sendRequest("Parameter_Application_Read",[])
    Session.sendRequest("Set_Parameter_Stop",[02])
    Session.Stop_Session()
    Session.Close_CAN_Session()
    print "I am ok"

This snippet took 0.02 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).