#!/usr/bin/env python3
import asyncio
import can
import sys
car_canbus_if = 'can0'
can_if = can.interface.Bus(bustype='socketcan', channel=car_canbus_if, receive_own_messages=False)
def print_pkg(prefix, msg):
print('%s 0x%03X [%d]' % (prefix, msg.arbitration_id, msg.dlc), end='')
for x in msg.data:
print(' %02X' % x, end='')
print('')
async def can_rx(can_if, loop, verbose=False):
reader = can.AsyncBufferedReader()
listener = [
reader,
]
notifier = can.Notifier(can_if, listener, loop=loop)
while True:
msg = await reader.get_message()
if msg.is_error_frame == True:
continue
if verbose:
print_pkg('\\nR:', msg)
arb_id = msg.arbitration_id
if arb_id in [0x7DF, 0x7E0]:
response = xc60_7e0(msg.data)
elif arb_id == 0x726:
response = xc60_726(msg.data)
if response:
for r in response:
if verbose:
print_pkg('S:', r)
can_if.send(r)
notifier.stop()
can_if.shutdown()
car_stat = {
'rpm': 1400, # human format
'speed': 10, # human format
'coolant': 40, # human format
'air': 500, # human format
'bat_volt': 0x0, # 6v + A*0.05
'bat_soc': 0x0, # A: 0-100
'bat_curr': 0x0, # (256*A + B)/16.0 - 512
'alter_charge_req': 0x0, # A/10.0
}
def form_response(ecu_id, raw_data):
ret = []
for data in raw_data:
response = data
zero_padding = 8-len(data)
for x in range(zero_padding):
response.append(0)
ret.append(can.Message(arbitration_id=ecu_id+8, data=response, extended_id=False))
return ret
def xc60_7e0_s01_response(pid, data):
raw_data = [2+len(data), 0x01+0x40, pid]
raw_data.extend(data)
return form_response(0x7E0, [raw_data])
def xc60_726_s22_response(pid0, pid1, data):
raw_data = [3+len(data), 0x22+0x40, pid0, pid1]
raw_data.extend(data)
return form_response(0x726, [raw_data])
def xc60_7e0(data):
response = None
frame_header = data[0]
service = data[1]
pid = data[2]
if frame_header & 0xF0 == 0:
xc60_7e0.pending_response = None
if service == 0x01:
if pid == 0x00:
response = xc60_7e0_s01_response(pid, (0x98,0x3B,0x80,0x13))
elif pid == 0x05: # Engine coolant, A-40 (Celicus)
coolant = car_stat['coolant']
response = xc60_7e0_s01_response(pid, (coolant+40))
elif pid == 0x0C: # RPM, (256A + B)/4
rpm = car_stat['rpm']
a = (4*rpm) // 256
b = (4*rpm) % 256
response = xc60_7e0_s01_response(pid, (a,b))
elif pid == 0x0D: # Speed
speed = car_stat['speed']
response = xc60_7e0_s01_response(pid, speed)
elif pid == 0x10: # MAF air flow rate, (256A + B)/100
air = car_stat['air']
a = air*100 // 256
b = air*100 % 256
response = xc60_7e0_s01_response(pid, (a,b))
elif pid == 0x20:
response = xc60_7e0_s01_response(pid, (0x80,0x1B,0xA0,0x01))
elif pid == 0x40:
response = xc60_7e0_s01_response(pid, (0x6C,0xD0,0x00,0x8D))
elif pid == 0x60:
response = xc60_7e0_s01_response(pid, (0x00,0x00,0x00,0x00))
elif service == 0x09:
if pid == 0x02:
response = form_response(0x7E0, [[0x10,0x14,0x49,0x02,0x01,0x59,0x56,0x31]])
xc60_7e0.pending_response = form_response(0x7E0, [
[0x21,0x44,0x5A,0x41,0x38,0x42,0x44,0x48],
[0x22,0x32,0x30,0x37,0x36,0x36,0x38,0x32]]
)
elif pid == 0x04:
response = form_response(0x7E0, [[0x10,0x13,0x49,0x04,0x01,0x33,0x32,0x32]])
xc60_7e0.pending_response = form_response(0x7E0, [
[0x21,0x37,0x38,0x37,0x37,0x35,0x41,0x41],
[0x22,0x00,0x00,0x00,0x00,0x00,0x00,0x00]]
)
elif pid == 0x0B: # In-use performance tracking for compression ignition vehicles
response = form_response(0x7E0, [[0x10,0x27,0x49,0x0B,0x12,0x00,0x58,0x01]])
xc60_7e0.pending_response = form_response(0x7E0, [
[0x21,0xFA,0x00,0x06,0x00,0x05,0x00,0x06],
[0x22,0x00,0x05,0x00,0x00,0x00,0x00,0x00],
[0x23,0x71,0x00,0x05,0x00,0xD2,0x00,0x58],
[0x24,0x01,0x74,0x00,0x58,0x01,0x8D,0x00],
[0x25,0x58,0x00,0x00,0x00,0x00,0x00,0x00]]
)
elif frame_header == 0x30: # flow control
response = xc60_7e0.pending_response
xc60_7e0.pending_response = None
return response
xc60_7e0.pending_response = None
def xc60_726(data):
response = None
frame_header = data[0]
service = data[1]
if frame_header & 0xF0 == 0:
if service == 0x22:
pid0 = data[2]
pid1 = data[3]
if (pid0,pid1) == (0x41,0x1D):
alter_charge_req = car_stat['alter_charge_req']
car_stat['alter_charge_req'] += 29
car_stat['alter_charge_req'] &= 0xFF
response = xc60_726_s22_response(pid0,pid1, [alter_charge_req])
elif (pid0,pid1) == (0x40,0x90):
bat_curr = car_stat['bat_curr']
car_stat['bat_curr'] += 4095
car_stat['bat_curr'] %= 65536
bat_curr1 = bat_curr >> 8
bat_curr0 = bat_curr & 0xFF
response = xc60_726_s22_response(pid0,pid1, [bat_curr1, bat_curr0])
elif (pid0,pid1) == (0x40,0x28):
bat_soc = car_stat['bat_soc']
car_stat['bat_soc'] += 17
car_stat['bat_soc'] %= 100
response = xc60_726_s22_response(pid0,pid1, [bat_soc])
elif (pid0,pid1) == (0x40,0x2A):
bat_volt = car_stat['bat_volt']
car_stat['bat_volt'] += 8
car_stat['bat_volt'] &= 0xFF
response = xc60_726_s22_response(pid0,pid1, [bat_volt])
return response
async def car_ui(loop):
while True:
line = await loop.run_in_executor(None, sys.stdin.readline)
#print('Got line:', line, end='')
line = line.strip()
if line == '':
continue
token = line.split('=')
num_token = len(token)
cmd = token[0]
if cmd in ['speed', 'rpm', 'coolant']:
if num_token >= 2:
car_stat[cmd] = int(token[1])
async def main():
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(can_rx(can_if, loop, verbose=True)),
asyncio.ensure_future(car_ui(loop)),
]
await asyncio.gather(*tasks)
# Start async-loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()