From d27de43dc821699f9558652add29243608b457db Mon Sep 17 00:00:00 2001 From: rudi Date: Fri, 17 Jan 2025 21:09:24 +0100 Subject: [PATCH] Code optimization and cleanup --- Examples/Adafruit_Servokit_servo_driver.py | 2 +- control_functions.py | 335 +++++++++++++-------- picture_generator.py | 19 -- 3 files changed, 208 insertions(+), 148 deletions(-) diff --git a/Examples/Adafruit_Servokit_servo_driver.py b/Examples/Adafruit_Servokit_servo_driver.py index b4aa024..6e167b5 100644 --- a/Examples/Adafruit_Servokit_servo_driver.py +++ b/Examples/Adafruit_Servokit_servo_driver.py @@ -21,7 +21,7 @@ kit.servo[0].set_pulse_width_range(MIN_PULSE, MAX_PULSE) # The servo angle in degrees. Must be in the range 0 to actuation_range. # Is None when servo is disabled. -kit.servo[0].angle = 90 +kit.servo[0].angle = 88 diff --git a/control_functions.py b/control_functions.py index 952c155..abeadb5 100644 --- a/control_functions.py +++ b/control_functions.py @@ -1,45 +1,41 @@ -from adafruit_hcsr04 import HCSR04 as hcsr04 # Ultrasound sensor -import board # General board pin mapper -from adafruit_servokit import ServoKit # Servo libraries for PWM driver board -import adafruit_pcf8591.pcf8591 as PCF # AD/DA converter board for potentiometer -from adafruit_pcf8591.analog_in import AnalogIn # Analogue in pin library -from adafruit_pcf8591.analog_out import AnalogOut # Analogue out pin library -import statistics as st # Mean and median calculations -import csv # CSV handling -from time import sleep # Sleep/pause -import pandas as pd -from datetime import datetime -import busio -import adafruit_vl6180x -import math - -# laser sensor controls. -# i2c = busio.I2C(board.SCL, board.SDA) -# laser = adafruit_vl6180x.VL6180X(i2c) +from adafruit_servokit import ServoKit # Servo libraries for PWM driver board +import adafruit_pcf8591.pcf8591 as PCF # AD/DA converter board for potentiometer +from adafruit_pcf8591.analog_in import AnalogIn # Analogue in pin library +from adafruit_pcf8591.analog_out import AnalogOut # Analogue out pin library +from adafruit_hcsr04 import HCSR04 as hcsr04 # Ultrasound sensor +import board # General board pin mapper +import statistics as st # Mean and median calculations +import csv # CSV handling +from time import sleep # Sleep/pause +import pandas as pd # Pandas for data manipulation +from datetime import datetime # Datetime for timestamps +import math # Math for particular calculations +import matplotlib.pyplot as plt # Mathplotlib for graphs # Variables to control sensor -TRIGGER_PIN = board.D4 # GPIO pin xx -ECHO_PIN = board.D17 # GPIO pin xx -PIN_TIMEOUT: float = 0.1 # Timeout for echo wait -- don't change -RUN_TIMEOUT: float = 0.0 # Sleep time in function -MIN_DISTANCE: int = 6 # Minimum sensor distance to be considered valid (1 on bar) -MAX_DISTANCE: int = 40 # Maximum sensor distance to be considered valid (35 on bar) +TRIGGER_PIN = board.D4 # GPIO pin xx +ECHO_PIN = board.D17 # GPIO pin xx +PIN_TIMEOUT: float = 0.1 # Timeout for echo wait -- don't change +RUN_TIMEOUT: float = 0.0 # Sleep time in read_distance() function +MIN_DISTANCE: int = 2 # Minimum sensor distance to be considered valid (1 on bar) +MAX_DISTANCE: int = 36 # Maximum sensor distance to be considered valid (35 on bar) # Variables to control servo -KIT = ServoKit(channels=16) # Define the type of board (8, 16) -MIN_PULSE: int = 400 # Defines angle 80, for current PID setup -MAX_PULSE: int = 2500 # Defines angle 100, for current PID setup -OFFSET: int = -1 +KIT = ServoKit(channels=16) # Define the type of board (8, 16) +MIN_PULSE: int = 400 # Defines angle 80, for current PID setup +MAX_PULSE: int = 2500 # Defines angle 100, for current PID setup +OFFSET: int = -2 # Correction nominal angle versus physical angle of the arm KIT.servo[0].set_pulse_width_range(MIN_PULSE, MAX_PULSE) # Variables to control logging. -LOG: bool = True # Log data to files -SCREEN: bool = True # Log data to screen -DEBUG: bool = False # More data to display -TWIN_MODE: bool = True # Run in live or twin mode +LOG: bool = False # Log data to files +LOG_GRAPH: bool = True # Log graph creation +SCREEN: bool = True # Log data to screen +DEBUG: bool = False # More data to display +TWIN_MODE: bool = True # Run in live or twin mode -# Control the number of samples for single distance measurement (average from burst) -MAX_SAMPLES: int = 1 +# Control the number of samples for single distance measurement (average from sample burst) +MAX_SAMPLES: int = 8 # Control the potentiometer # Description: @@ -64,48 +60,78 @@ pcf_out = AnalogOut(pcf, PCF.OUT) pcf_out.value = PCF_VAL # Variables to control PID values (PID formula tweaks) -p_value: float = 0.5 -i_value: float = 0.01 -d_value: float = 0.0 +p_value: float = 1.0 +i_value: float = 0.0 +d_value: float = 0.1 # Initial variables, used in pid_calculations() i_result: float = 0.0 previous_time: float = 0.0 previous_error: float = 0.0 -# Error sum array +# Error sum array values error_sum_max: int = 10 -error_sum_array: list = [0]*error_sum_max +error_sum_array: list = [0] * error_sum_max error_sum_counter: int = 0 - -# Digital twin -previous_speed:float = 0.0 +# Digital twin parameters +previous_speed: float = 0.0 previous_position: float = 0.0 previous_angle: int = 90 +# a: acceleration +# g: gravity (9.81 m/s^2) +# theta: angle of the inclined plane +# u: coefficient of the friction between the cart and the inclined plane. +acceleration: float = 0.0 +gravity: float = 9.81 +friction: float = 0.05 +delta_t: float = 0.2 -#maximum angle the servo can move away from steady position. With 10 the range is between 80 and 100, with steady at 90 -max_angle = 10 +# Maximum angle the servo can move away from steady position. With 10 the range is between 80 (-10) and 100 (+10), +# with steady at 90 (0) +max_angle: int = 5 -# servo slower -current_angle:int = 90 +# Servo slower +current_angle: int = 90 -watch_variable: int = 0 +# Servo memory for boosting the cart if its stuck due to friction +servo_memory_1: int = 0 +servo_memory_2: int = 0 +memory_max: int = 5 -# base time of the system +# Current time of the system, used as base for file creation) base_time: float = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) +# Write base_time in file, to be used by other functions. +with open("pid-balancer_" + "time_file.txt", "w") as time_file: + time_file.write(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S.%f')[:-3]) + + # Write data to any of the logfiles -def log_data(data_file: str, data_line: str, remark: str|None): - log_stamp: str = datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')[:-3] +def log_data(data_file: str, data_line: str, remark: str | None): + log_stamp: str = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S.%f')[:-3] with open("pid-balancer_" + "time_file.txt", "r") as time_file: file_stamp: str = time_file.readline() with open("pid-balancer_" + data_file + "_data_" + file_stamp + ".csv", "a") as data_file: - data_writer = csv.writer(data_file) - data_writer.writerow([log_stamp,data_line, remark]) + data_writer = csv.writer(data_file, delimiter=';', quoting=csv.QUOTE_MINIMAL) + data_writer.writerow([log_stamp, data_line, remark]) + +# Write data to any of the logfiles. This is specifically for one type of logfile that uses multiple data columns +def log_data2(data_file: str, data_line: str, data_line2: str | None): + log_stamp: str = datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S.%f')[:-3] + + with open("pid-balancer_" + "time_file.txt", "r") as time_file: + file_stamp: str = time_file.readline() + + with open("pid-balancer_" + data_file + "_data_" + file_stamp + ".csv", "a") as data_file: + data_writer = csv.writer(data_file, delimiter=';', quoting=csv.QUOTE_MINIMAL) + data_writer.writerow([log_stamp, data_line, data_line2]) + + +# Function to read the SR05 ultrasound sensor data def read_distance_sensor(): start_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) @@ -117,78 +143,87 @@ def read_distance_sensor(): with hcsr04(trigger_pin=TRIGGER_PIN, echo_pin=ECHO_PIN, timeout=PIN_TIMEOUT) as sonar: samples: int = 0 max_samples: int = MAX_SAMPLES - timestamp_last: float = 0.0 + timestamp_last: float = 0.0 timestamp_first: float = 0.0 while samples != max_samples: - sleep(RUN_TIMEOUT) + sleep(RUN_TIMEOUT) # Fixes some sensor driver crashes try: - distance: float = sonar.distance # reading distance from the sonic sensor - # distance: float = laser.range * 10 # reading distance from the laser sensor + distance: float = sonar.distance # Reading distance from the sonic sensor - if MIN_DISTANCE < distance < MAX_DISTANCE: + if MIN_DISTANCE < distance < MAX_DISTANCE: # Only process distances within expected range. + # This drops erroneous readings. log_data(data_file="sensor", data_line=str(distance), remark="") if LOG else None - # print("Distance_in_range: ", distance) if SCREEN else None + print("Distance_in_range: ", distance) if SCREEN else None # For testing if max_samples == 1: - median_distance = distance - mean_timestamp = float(datetime.strftime(datetime.now(),'%Y%m%d%H%M%S.%f')[:-3]) + median_distance: float = distance + mean_timestamp = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')[:-3]) + samples: int = samples + 1 + print("Distance_in_range_rounded: ", round(distance, 4)) if SCREEN else None # For testing else: sample_array.append(distance) - if samples == 0: timestamp_first = float(datetime.strftime(datetime.now(), - '%Y%m%d%H%M%S.%f')[:-3]) + if samples == 0: timestamp_first = float(datetime.strftime(datetime.now(), + '%Y%m%d%H%M%S.%f')[:-3]) if samples == max_samples - 1: timestamp_last = float(datetime.strftime(datetime.now(), - '%Y%m%d%H%M%S.%f')[:-3]) + '%Y%m%d%H%M%S.%f')[:-3]) timestamp_first_float: float = float(timestamp_first) timestamp_last_float: float = float(timestamp_last) median_distance: float = st.median(sample_array) - mean_timestamp: float = st.mean([timestamp_first_float, timestamp_last_float]) - print("Distance_median: ", median_distance) if SCREEN else None - print("Timestamp_mean: ", mean_timestamp) if DEBUG else None - print("Distance_in_range: ", distance) if SCREEN else None + mean_timestamp: float = st.mean([timestamp_first_float, timestamp_last_float]) + if DEBUG: + print("Distance_median: ", median_distance) + print("Timestamp_mean: ", mean_timestamp) + print("Distance_in_range: ", distance) + data_line = str(sample_array) + ',' + str(median_distance) + log_data(data_file="sensor_array", data_line=data_line, remark="") - data_line = str(sample_array) + ',' + str(median_distance) - log_data(data_file="sensor_array", data_line= data_line, - remark="") if LOG else None print("Distance_in_range_rounded: ", round(distance, 4)) if SCREEN else None samples: int = samples + 1 - else: - log_data(data_file="sensor", data_line=str(distance), remark="Distance_out_of_range") if LOG else None + log_data(data_file="sensor", data_line=str(distance), + remark="Distance_out_of_range") if LOG else None print("Distance_out_of_range: ", round(distance, 4)) if SCREEN else None except RuntimeError: log_data(data_file="sensor", data_line="999.999", remark="Timeout") if LOG and DEBUG else None print("Distance_timed_out") if SCREEN else None + # Function process time recorder end_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) data_line = str(start_time - end_time) log_data(data_file="function", data_line=data_line, remark="read_distance_sensor") if LOG else None + # Median distance and Mean time to log writer + data_line = str(median_distance) + data_line2 = str(mean_timestamp) + log_data2(data_file="median_sensor", data_line=data_line, data_line2=data_line2) if LOG_GRAPH else None + return median_distance, mean_timestamp -def read_setpoint(): - start_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) +def read_setpoint(): + # Read the resistance of the potentiometer and convert to centimeters for use with setpoint distance + start_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) while True: raw_value: int = pcf_in_0.value scaled_value: float = (raw_value / PCF_VAL) * pcf_in_0.reference_voltage - log_line = str(scaled_value) + "," + str(raw_value) + "," + str("angle") + log_line = str(scaled_value) + ";" + str(raw_value) + ";" + str("angle") log_data(data_file="potmeter", data_line=log_line, remark="") if LOG else None cm_rounded: int = int(round(scaled_value * POT_PCM, 0)) if DEBUG: - print('Scaled_rounded = ' , round(scaled_value, 4), ' CM_rounded= ', cm_rounded) - print('Scaled_raw= ' , scaled_value, ' CM_raw= ', int(scaled_value * POT_PCM)) + print('Scaled_rounded = ', round(scaled_value, 4), ' CM_rounded= ', cm_rounded) + print('Scaled_raw= ', scaled_value, ' CM_raw= ', int(scaled_value * POT_PCM)) - print('setpoing in cm: ', cm_rounded) if SCREEN else None + print('Setpoint in cm: ', cm_rounded) if SCREEN else None - sleep(POT_INT) + sleep(POT_INT) # Fix for driver crashes end_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) data_line = str(start_time - end_time) @@ -196,67 +231,69 @@ def read_setpoint(): return cm_rounded + def digital_twin(): - # a: acceleration - # g: gravity (9.81 m/s^2) - # theta: angle of the inclined plane - # u: coefficient of the friction between the cart and the inclined plane. - acceleration: float = 0.0 - global previous_position, previous_speed, base_time, watch_variable - gravity: float = 9.81 - friction: float = 0.1 - delta_t: float = 0.1 + # Digital model of the physical model. + + global previous_position, previous_speed, base_time angle = (previous_angle - 90) acceleration = gravity * math.sin(math.radians(angle)) - friction_force = friction * gravity * math.cos(math.radians(angle)) * delta_t - - friction_force = abs(friction_force) + friction_force = abs(friction * gravity * math.cos(math.radians(angle)) * delta_t) work_speed = previous_speed + acceleration * delta_t - watch_variable = watch_variable + 1 - if watch_variable >= 150: - print("breakpoint") - - print("watch_variable", watch_variable) - if friction_force < work_speed: + # To avoid the friction setting the work_speed to a negative value, forced the friction to be lower than the speed. + if friction_force < work_speed * 0.8: if work_speed > 0: - work_speed = work_speed - friction_force + work_speed = work_speed - friction_force elif work_speed < 0: work_speed = work_speed + friction_force else: work_speed = work_speed - current_speed = work_speed - - current_position = previous_position + (current_speed * delta_t) - - - print("angle", angle) - print("friction", friction) - print("acceleration", acceleration) - print("current speed", current_speed) - print("current position", current_position) - print("") - print("----------------") - print("") + current_speed: float = work_speed + current_position: float = previous_position + (current_speed * delta_t) + if SCREEN: + print("Angle", angle) + print("Friction", friction) + print("Acceleration", acceleration) + print("Current speed", current_speed) + print("Current position", current_position) + print("") + print("----------------------------------------------") + print("") base_time = base_time + delta_t - - previous_speed = current_speed previous_position = current_position + if LOG_GRAPH: + # PID position logging + data_line = str(current_position) + log_data(data_file="twin_current_position", data_line=data_line, remark="") + + # PID acceleration logging + data_line = str(acceleration) + log_data(data_file="twin_acceleration", data_line=data_line, remark="") + + # PID speed logging + data_line = str(current_speed) + log_data(data_file="twin_current_speed", data_line=data_line, remark="") + return current_position, base_time + def pid_calculations(): + # Do all the PID calculations and return the new angle for the servo start_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) - global i_result, previous_time, previous_error # Can not be annotated with :float, because variables are global. - global error_sum_counter, error_sum_array # counter for error_sum_array and error_sum_array itself + global i_result, previous_time, previous_error # Can not be annotated with :float, because variables are global. + global error_sum_counter, error_sum_array # counter for error_sum_array and error_sum_array itself global previous_angle + offset_value: int = 0 + if TWIN_MODE: measurement, measurement_time = digital_twin() else: @@ -272,24 +309,23 @@ def pid_calculations(): error_sum_array[error_sum_counter] = (error * (measurement_time - previous_time)) p_result = p_value * error - i_result = i_value * sum(error_sum_array) + i_result = i_value * sum(error_sum_array) d_result = d_value * ((error - previous_error) / (measurement_time - previous_time)) pid_result = offset_value + p_result + i_result + d_result previous_error = error previous_time = measurement_time - - #function to set the 2 max angles. Or set the angle to a specific number = pid_result * max movement + correction - if pid_result >= max_angle: # if PID result is greater than 1, set to 1. 1 = max upward angle + # Code to set the max angles. Or set the angle to a specific number = pid_result * max movement + correction + if pid_result >= max_angle: output_angle = (90 + max_angle) - elif pid_result <= -max_angle: # if PID result is greater than 1, set to 1. 1 = max downward angle - output_angle = (90-max_angle) + elif pid_result <= -max_angle: + output_angle = (90 - max_angle) elif -max_angle < pid_result < max_angle: output_angle = pid_result + 90 else: output_angle = 90 - log_line = str(p_result) + "," + str(i_result) + "," + str(d_result) + "," + str(pid_result) + log_line = str(p_result) + ";" + str(i_result) + ";" + str(d_result) + ";" + str(pid_result) log_data(data_file="pid", data_line=log_line, remark="") if LOG else None if DEBUG: @@ -298,12 +334,12 @@ def pid_calculations(): print("I_result: ", i_result) print("PID_result: ", pid_result) - if error_sum_counter <= error_sum_max-2: + if error_sum_counter <= error_sum_max - 2: # Correction tweak for error sum error_sum_counter = error_sum_counter + 1 else: error_sum_counter = 0 - print("error sum counter", error_sum_counter) if DEBUG else None + print("error sum counter", error_sum_counter) if DEBUG else None end_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) data_line = str(start_time - end_time) @@ -312,23 +348,37 @@ def pid_calculations(): output_angle = round(output_angle) previous_angle = output_angle + # PID angle logging + data_line = str(output_angle) + log_data(data_file="pid_output_angle", data_line=data_line, remark="") if LOG_GRAPH and TWIN_MODE == False else None + log_data(data_file="pid_output_angle_twin", data_line=data_line, + remark="") if LOG_GRAPH and TWIN_MODE == True else None + return output_angle + def control_server_angle(angle): + # Tell the servo to set its position start_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) + print("Current angle: ", angle) if SCREEN else None + servo_angle = angle + OFFSET - KIT.servo[0].angle = servo_angle # Set angle + print("Offset angle: ", servo_angle) if SCREEN else None + + KIT.servo[0].angle = servo_angle # Send angle instruction to the servo + log_line = str(angle) log_data(data_file="servo", data_line=log_line, remark="") if LOG else None - print("angle: ", servo_angle) if SCREEN else None end_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) data_line = str(start_time - end_time) log_data(data_file="function", data_line=data_line, remark="control_server_angle") if LOG else None + def servo_slower(): + # This function restricts the servo to +/- 5 degrees in order to prevent launching the cart start_time = float(datetime.strftime(datetime.now(), '%Y%m%d%H%M%S.%f')) global current_angle @@ -348,10 +398,39 @@ def servo_slower(): return servo_angle + +def graph_plotter(file_name): + # Creates the graphs with Pandas and Mathplotlib using the logiles as input. It must be run manually. + + plt.rcParams['figure.figsize'] = [12, 8] # Set the size of the plot canvas + picture_name = file_name + '.png' # User the name of the logfile as input for the graphical image + file_name_plotter = file_name + ".csv" # Use the logfile as input + + # Run one set of the graph code. + + # df = pd.read_csv(file_name_plotter,delimiter=';', header=None, skiprows=0, decimal=".", names=['Timestamp', 'Distance', 'Timestamp2','Remarks']) + # df = df.drop(columns = ['Timestamp2']) + + df = pd.read_csv(file_name_plotter, delimiter=';', header=None, skiprows=0, decimal=".", + names=['Timestamp', 'Distance', 'Remarks']) + + df = df.drop(columns=['Remarks']) + + plt.figure(figsize=(30, 60)) + df.plot(x='Timestamp', y='Distance') + plt.savefig(picture_name) + plt.show() + + +# -------------------- Main ---------------------------------- try: KIT.servo[0].angle = 90 + # graph_plotter("pid-balancer_pid_output_angle_twin_data_2025-01-17 14:29:29.624") + # graph_plotter("pid-balancer_twin_acceleration_data_2025-01-17 14:29:29.624") + # graph_plotter("pid-balancer_twin_current_position_data_2025-01-17 14:29:29.624") + # graph_plotter("pid-balancer_twin_current_speed_data_2025-01-17 14:29:29.624") while True: - # digital_twin() control_server_angle(pid_calculations()) + print("------------------------------------------\n") except RuntimeError: - print("bbbb") \ No newline at end of file + print("What's up?!") diff --git a/picture_generator.py b/picture_generator.py index 9b0776c..4b9c898 100644 --- a/picture_generator.py +++ b/picture_generator.py @@ -1,22 +1,3 @@ import pandas as pd import matplotlib.pyplot as plt -plt.rcParams['figure.figsize'] = [12, 8] - -data_file = 'files_for_graph/digital_twin_measurments/pid-balancer_pid_data.csv' - -#df = pd.read_csv(data_file,delimiter=';', header=None, skiprows=0, decimal=".", names=['Timestamp', 'Distance', 'Timestamp2','Remarks']) - -df = pd.read_csv(data_file,delimiter=';', header=None, skiprows=0, decimal=".", names=['Timestamp', 'Distance','Remarks']) - -# df = df.loc(['Remarks'] == "") -df = df.drop(columns = ['Remarks']) -#df = df.drop(columns = ['Timestamp2']) - -# df.drop(df.columns[[2]], axis=1, inplace=True) -pd.set_option('display.max_rows', 100) - -plt.figure(figsize=(30,60)) -df.plot(x='Timestamp', y='Distance') -plt.savefig('files_for_graph/digital_twin_measurments/pid-balancer_pid_data.png') -plt.show() \ No newline at end of file