Files
Labview-Program/Acoustics Test/raspberry Pi Pico/main.py
2026-05-22 19:08:33 +08:00

432 lines
17 KiB
Python

import sys, machine, time, json
import binascii
config = {'type': 'undefined', 'no': 'undefined', 'count': 0}
config_file = "config.json"
##########################################################
command_info = [
{ "cmd": "ping", "descript": "return OK if mico controller board works normal" },
{ "cmd": "reset", "descript": "reboot mico controller board" },
{ "cmd": "get_config", "descript": "read configuration from mico controller board",
"parameters": [
{ "config_item_name": "config item to read, if not provide will return all config values" }
],
"example": "> get_config count\n < 0\n > get_config\n < {'no': 'undefined', 'type': 'undefined', 'count': 0}"
},
{ "cmd": "set_config", "descript": "set configuration to mico controller board",
"parameters": [
{ "config_item_name": "config item name" },
{ "config_item_value": "config item value " }
],
"example": ">set_config sn 'CONTROLLER-001'"
},
{ "cmd": "gpio_init", "descript": "initial gpio pin",
"parameters": [
{ "pin_num": "gpio pin number" },
{ "gpio direction": "0 - input, 1 - output" },
{ "initial status": "initial status of gpio pin, 0 - low, 1 - high" },
{ "pull control": "UP - enable pull up, DOWN - enable pull down, others - disable pull function"}
],
"example": ">gpio_init 1 1 1 UP"
},
{ "cmd": "gpio_set", "descript": "set gpio pin status",
"parameters": [
{ "pin_num": "gpio pin number" },
{ "pin status": "initial status of gpio pin, 0 - low, 1 - high" }
],
"example": ">gpio_set 1 0"
},
{ "cmd": "gpio_get", "descript": "get gpio pin status",
"parameters": [
{ "pin_num": "gpio pin number" }
],
"example": ">gpio_get 1"
},
{ "cmd": "i2c_cleanup", "descript": "release i2c bus" },
{ "cmd": "i2c_scan", "descript": "scan i2c devices" },
{ "cmd": "i2c_read", "descript": "perform read operation from i2c device",
"parameters": [
{ "device": "i2c slave device address" },
{ "length": "bytes to read" }
],
"example": ">i2c_read 0x20 1\n <succeed: 0xFF"
},
{ "cmd": "i2c_read_add", "descript": "perform read operation from i2c device with register address",
"parameters": [
{ "device": "i2c slave device address" },
{ "addr": "register address" },
{ "length": "bytes to read" }
],
"example": ">i2c_read_add 0x20 0 2\n <succeed: 0xFF 0x01"
},
{ "cmd": "i2c_write", "descript": "perform write operation to i2c device ",
"parameters": [
{ "device": "i2c slave device address" },
{ "data": "bytes to write" }
],
"example": ">i2c_write 0x20 0x01 0x02 0x03 ... 0x10"
},
{ "cmd": "i2c_write_add", "descript": "perform write operation to i2c device with register address",
"parameters": [
{ "device": "i2c slave device address" },
{ "addr": "register address" },
{ "data": "bytes to write" }
],
"example": ">i2c_write 0x20 0x12 0x01 0x02 0x03 ... 0x10"
},
{ "cmd": "spi_setup", "descript": "initial spi bus",
"parameters": [
{ "clk_freq": "ispi clock frequence" },
{ "sck_pin": "sck pin io number" },
{ "mosi_pin": "mosi pin io number" },
{ "miso_pin": "miso pin io number" },
{ "cs_pin": "cs pin io number, -1 if don't want spi controller toggle cs pin" },
{ "polarity": "polarity setting for spi bus" },
{ "phase": "phase setting for spi bus" }
],
"example": ">spi_setup 100000 2 3 0 1 0 1"
},
{ "cmd": "spi_cleanup", "descript": "release spi bus" },
{ "cmd": "spi_read", "descript": "perform read operation from spi device",
"parameters": [
{ "length": "bytes to read" }
],
"example": ">spi_read 3\n <succeed: 0xFF 0xAA 0xBB"
},
{ "cmd": "spi_write", "descript": "perform write operation to spi device ",
"parameters": [
{ "data": "bytes to write" }
],
"example": ">spi_write 0x01 0x02 0x03 ... 0x10"
},
{ "cmd": "spi_write_read", "descript": "perform write and then read to spi device ",
"parameters": [
{ "length": "bytes to read" },
{ "data": "bytes to write" }
],
"example": ">spi_write_read 3 0x01 0x02 0x03 ... 0x10\n <succeed: 0xFF 0xAA 0xBB"
},
]
##########################################################
def load_config():
global config
try:
f = open(config_file,'r')
config = json.load(f)
f.close()
except:
print('config file not exist, load default settings!')
save_config()
def save_config():
f = open(config_file,'w')
json.dump(config, f)
f.close()
def bytes_to_str(bytes):
return ",".join([f"0x{b:02x}" for b in bytes])
def str_to_int(str):
if str.startswith("0x"):
return int(str, 16)
else:
return int(str, 10)
def print_help():
for command in command_info:
print(f"{command['cmd']} - {command['descript']}")
if 'parameters' in command.keys():
print(' parameters:')
for parameter in command['parameters']:
parameter_name = list(parameter.keys())[0]
print(f" {parameter_name}: {parameter[parameter_name]}")
if 'example' in command.keys():
print(' example:')
print(f' {command['example']}')
if 'parameters' in command.keys() or 'example' in command.keys():
print('')
##########################################################
load_config()
i2c_controller = None
spi_controller = None
spi_cs_pin = None
gpio_dict = {}
print('Sonos Micro Controller V1.0')
print("PyLang", sys.version_info, ";", sys.implementation)
print('=======================================')
while True:
input_string = str(input())
args = input_string.split(" ")
if len(args) >= 1:
if args[0] == 'help' or args[0] == '?':
print_help()
elif args[0] == 'ping': # check if command line is available
print('OK')
elif args[0] == 'reset':
machine.reset()
elif args[0] == 'get_config':
print(config)
elif args[0] == 'set_config':
if args[1] != None and args[2] != None:
config[args[1]] = args[2]
save_config()
else:
print('invalid arguments for set command: ', input_string)
######################################################################
## GPIO control commands
######################################################################
elif args[0] == 'gpio_init':
# gpio_init 1 1 1 UP
try:
pin_num = str_to_int(args[1])
pin_direction = machine.Pin.OUT if str_to_int(args[2]) == 1 else machine.Pin.IN
init_status = str_to_int(args[3])
pull_control = None
if len(args) >= 5:
if args[4].upper().strip() == 'UP':
pull_control = machine.Pin.PULL_UP
elif args[4].upper().strip() == 'DOWN':
pull_control = machine.Pin.PULL_DOWN
else:
print('failed: invalid pull control ', args[4])
gpio_dict[pin_num] = machine.Pin(pin_num, pin_direction, pull_control)
if pin_direction == machine.Pin.OUT:
gpio_dict[pin_num].value(init_status)
print('succeed: ', gpio_dict[pin_num])
except Exception as e:
print('failed: ', e)
elif args[0] == 'gpio_set':
try:
pin_num = str_to_int(args[1])
pin_status = str_to_int(args[2])
if pin_num in gpio_dict:
gpio_dict[pin_num].value(pin_status)
print('succeed: ')
else:
print(f'failed: pin not initialized - {pin_num}')
except Exception as e:
print('failed: ', e)
elif args[0] == 'gpio_get':
try:
pin_num = str_to_int(args[1])
if pin_num in gpio_dict:
print('succeed: ', gpio_dict[pin_num].value())
else:
print(f'failed: pin not initialized - {pin_num}')
except Exception as e:
print('failed: ', e)
######################################################################
## I2C controller commands
######################################################################
elif args[0] == 'i2c_setup':
# ====== i2c controller setup ======
# PiPico: i2c_setup 100000 0 1 # 100000 - i2c clock frequence, 0 - sda pin io number, 1 - scl pin io number
# ESP32: i2c_setup 100000 8 9 # 100000 - i2c clock frequence, 8 - sda pin io number, 9 - scl pin io number
try:
if i2c_controller is not None:
i2c_controller = None
clk_freq = str_to_int(args[1])
sda_pin = str_to_int(args[2])
scl_pin = str_to_int(args[3])
i2c_controller = machine.I2C(0, sda=machine.Pin(sda_pin, machine.Pin.OPEN_DRAIN, machine.Pin.PULL_UP), scl=machine.Pin(scl_pin, machine.Pin.OUT, machine.Pin.PULL_UP), freq=clk_freq) # Frequency in Hz
print('succeed: ', i2c_controller)
except Exception as e:
print('failed: ', e)
elif args[0] == 'i2c_cleanup':
# ====== i2c controller setup ======
try:
if i2c_controller is not None:
i2c_controller = None
print('succeed: ')
except Exception as e:
print('failed: ', e)
elif args[0] == 'i2c_scan':
# ====== scan I2C devices ======
try:
devices = i2c_controller.scan()
if len(devices) == 0:
print('failed: ', "no I2C devices found")
else:
print('succeed: ', bytes_to_str(devices))
except Exception as e:
print('failed: ', e)
elif args[0] == 'i2c_read':
# ====== i2c data read ======
# i2c_read 0x20 1 # 0x20 - device_address, 1 - read length
try:
device = str_to_int(args[1])
length = str_to_int(args[2])
data = i2c_controller.readfrom(device, length)
print('succeed: ', data.hex(','))
except Exception as e:
print('failed: ', e)
elif args[0] == 'i2c_read_add':
# ====== i2c data read address ======
# i2c_read_add 0x20 0 2 # 0x20 - device_address, 0 - register address, 2 - read length
try:
device = str_to_int(args[1])
addr = str_to_int(args[2])
length = str_to_int(args[3])
data = i2c_controller.readfrom_mem(device, addr, length)
print('succeed: ', bytes_to_str(data))
except Exception as e:
print('failed: ', e)
elif args[0] == 'i2c_write':
# ====== i2c data write ======
# i2c_write 0x20 0x01 0x02 0x03 ... 0x10 # 0x20 - device_address, 0x01 ~ 0x10 - data to write to device
try:
device = str_to_int(args[1])
data = bytearray()
for i in range(2, len(args)):
data.append(str_to_int(args[i]))
i2c_controller.writeto(device, data)
print('succeed: ')
except Exception as e:
print('failed: ', e)
elif args[0] == 'i2c_write_add':
# ====== i2c data write address ======
# i2c_write_add 0x20 0x12 0x01 0x02 0x03 ... 0x10 # 0x20 - device_address, 0x12 - register address, 0x01 ~ 0x10 - data to write to device
try:
device = str_to_int(args[1])
addr = str_to_int(args[2])
data = bytearray()
for i in range(3, len(args)):
data.append(str_to_int(args[i]))
i2c_controller.writeto_mem(device, addr, data)
print('succeed: ')
except Exception as e:
print('failed: ', e)
######################################################################
## SPI controller commands
######################################################################
elif args[0] == 'spi_setup':
# ====== spi controller setup ======
# PiPico: spi_setup 100000 2 3 0 1 0 1 # param#1: 100000 - spi clock frequence
# # param#2: 2 - sck pin io number
# # param#3: 3 - mosi pin io number
# # param#4: 0 - miso pin io number
# # param#5: 1 - cs pin io number, -1 if don't want spi controller toggle cs pin
# # param#6: 0 - polarity setting
# # param#7: 1 - phase setting
#
try:
if spi_controller is not None:
spi_controller = None
if spi_cs_pin is not None:
spi_cs_pin = None
clk_freq = str_to_int(args[1])
spi_sck_pin = machine.Pin(str_to_int(args[2]))
spi_mosi_pin = machine.Pin(str_to_int(args[3]))
spi_miso_pin = machine.Pin(str_to_int(args[4]))
spi_cs_pin = machine.Pin(str_to_int(args[5]), machine.Pin.OUT)
polarity = str_to_int(args[6])
phase = str_to_int(args[7])
spi_controller = machine.SPI(0, baudrate=clk_freq, polarity=polarity, phase=phase, sck=spi_sck_pin, mosi=spi_mosi_pin, miso=spi_miso_pin)
print('succeed: ', spi_controller)
except Exception as e:
print('failed: ', e)
elif args[0] == 'spi_cleanup':
# ====== spi controller setup ======
try:
if spi_controller is not None:
spi_controller = None
if spi_cs_pin is not None:
spi_cs_pin = None
print('succeed: ')
except Exception as e:
print('failed: ', e)
elif args[0] == 'spi_read':
# ====== spi data read ======
# spi_read 3 # 3 - read length
try:
length = str_to_int(args[1])
# active cs pin if required
if spi_cs_pin is not None:
spi_cs_pin.value(0)
data = spi_controller.read(length)
# deactive cs pin if required
if spi_cs_pin is not None:
spi_cs_pin.value(1)
print('succeed: ', data.hex(','))
except Exception as e:
print('failed: ', e)
elif args[0] == 'spi_write':
# ====== spi data write ======
# spi_write 0x01 0x02 0x03 ... 0x10 # 0x01 ~ 0x10 - data to write to device
try:
data = bytearray()
for i in range(1, len(args)):
data.append(str_to_int(args[i]))
# active cs pin if required
if spi_cs_pin is not None:
spi_cs_pin.value(0)
spi_controller.write(data)
# deactive cs pin if required
if spi_cs_pin is not None:
spi_cs_pin.value(1)
print('succeed: ')
except Exception as e:
print('failed: ', e)
elif args[0] == 'spi_write_read':
# ====== spi data read ======
# spi_write_read 3 0x01 0x02 0x03 ... 0x10 # 3 - read length, 0x01 ~ 0x10 - data to write to device
try:
read_length = str_to_int(args[1])
# active cs pin if required
if spi_cs_pin is not None:
spi_cs_pin.value(0)
write_data = bytearray()
for i in range(2, len(args)):
write_data.append(str_to_int(args[i]))
spi_controller.write(write_data)
read_data = spi_controller.read(read_length)
# deactive cs pin if required
if spi_cs_pin is not None:
spi_cs_pin.value(1)
print('succeed: ', read_data.hex(','))
except Exception as e:
print('failed: ', e)
else:
print('unknow command: ', args[0])