机器人是现在和未来最有发展前景的行业,这一点是毋庸置疑的。
先看一段21年的视频:
外置式L2级别自动驾驶辅助套件 Openpilot 深度体验分析
火热
作为热门专业的从业人员,能够感受到行业的关注热度和实际热度之间的差异。
原因
有很多,此处只说一个,不满足“降本增效”。
解析
机器人本身需要购买,这也是成本。
机器人餐厅的重点是餐厅而非机器人,饭菜价格是否实惠,口味是否好,才是核心要素。
便宜不好吃,没人买,好吃不便宜,买的人不会多。
机器人替代人工的前提是,总投入要比人工价格还要低才可以的。
很多选用机器人的行业是,增本增效,肯定是不断发展向前的,但是速度肯定不快。
还有些奇葩行业是增本降效,这种嘛,都是一地鸡毛式发展。
满足“降本增效”的机器人行业还是极少的,降本增效的一个典型优势就是利润提升。
需求
抛开成本和效率,机器人是否有大量的需求呢?
这里需求是指人的需求,人需要机器人的需求。
不清楚,不回答。
一个热爱自动驾驶但妥妥外行之人的思考-2023-CSDN博客
2024啦,自动驾驶如何?
(其实知道OpenPilot的人都很少哦……)
5年之前的2019年发布了一篇博文:
有趣免费的开源机器人课程实践指北-2019-_carla 0.8.2 apis-CSDN博客
里面提及的两篇路径,其实机器人专业很多很多很多学生都很难走完,并系统化研究机器人。
点赞数增加值也稳定了……
机器人(含自动驾驶汽车)成本和电脑手机相比有哪些差异化
一套自动驾驶方案成本能有多低?100-200美元能否搞定,其实是可以的,B站有分享。
十代雅阁夜间行车 - 100km Hands Free
OpenPilot
OpenPilot是一个开源的驾驶辅助系统。目前,OpenPilot可以执行自适应巡航控制(ACC)、自动车道居中(ALC)、前碰撞预警(FCW)和车道偏离预警(LDW)等辅助功能,并且支持的汽车品牌、型号和年份也在不断增加。此外,当OpenPilot处于工作状态时,基于摄像头的驾驶员监控(DM)功能可以提醒分心或昏昏欲睡的驾驶员。
车辆集成和限制:
OpenPilot是一个开源的驾驶辅助系统,旨在为驾驶员提供更安全、更舒适的驾驶体验。它通过集成各种传感器和控制系统来实现各种驾驶辅助功能,例如自适应巡航控制、自动车道居中、前碰撞预警和车道偏离预警等。
OpenPilot可以与多种汽车品牌、型号和年份的车辆进行集成,这使得它可以应用于各种不同的驾驶场景和需求。但是,OpenPilot的集成和限制取决于车辆的硬件配置和软件支持程度。例如,一些较老的车型可能无法支持某些高级功能,或者可能需要额外的硬件改装才能使用OpenPilot。
除了基本的驾驶辅助功能外,OpenPilot还具有基于摄像头的驾驶员监控功能。该功能可以检测驾驶员是否分心或昏昏欲睡,并在必要时发出警告,以提醒驾驶员保持警觉并专注于驾驶。
总的来说,OpenPilot是一个非常有前途的开源驾驶辅助系统,它可以通过集成各种传感器和控制系统来提高驾驶的安全性和舒适性。虽然它有一些限制,但随着技术的不断发展和改进,我们相信OpenPilot将会变得更加完善和可靠。
截止2023年12月支持274款车。
为啥发展了这么多年还是不火爆呢?
2000元升级L2辅助驾驶,干翻一众车厂?Openpilot轻体验
完全满足“降本增效”啊。
安全性、可靠性、稳定性的担忧是一个逃不开的问题啊。
OpenPilot的安装步骤如下:
- 首先,需要从GitHub上克隆OpenPilot的代码库。请确保使用以下地址进行克隆:https://github.com/commaai/openpilot.git。
- 克隆完成后,进入OpenPilot目录:
cd openpilot
。 - 接下来,需要初始化并更新所有的子模块:
git submodule update --init --recursive
。 - 然后,需要运行环境配置脚本。这个脚本会安装必要的依赖项,并设置环境变量。在OpenPilot目录下运行以下命令:
sudo sh tools/ubuntu_setup.sh
。 - 接下来,需要设置PYTHONPATH环境变量,以便Python可以找到OpenPilot的库。将以下行添加到你的bashrc文件中:
export PYTHONPATH=$HOME/openpilot
。 - 之后,需要安装NVIDIA驱动程序和CUDA工具包。请根据你的GPU型号选择相应的驱动程序和CUDA版本。
- 接下来,需要安装TensorFlow 2.2的GPU版本。
- 还需要安装OpenCL驱动程序。
- 最后,需要安装OpenCV 4,但请忽略Python部分。
- 在完成以上步骤后,使用以下命令编译OpenPilot:
scons -j$(nproc)
。 - 编译完成后,可以运行OpenPilot:
cd ~/openpilot/selfdrive/manage
。
请注意,这些步骤可能会根据你的操作系统和硬件配置有所不同。此外,OpenPilot可能需要最新的依赖项和库才能正常工作,因此请确保你的系统已更新到最新版本。
OpenPilot是一个开源的驾驶辅助系统,可以在汽车上使用。以下是使用OpenPilot的说明:
- 硬件设备:使用OpenPilot需要一些硬件设备,包括EON、Harness等官方设备。这些设备可以与汽车进行对接,完成设备的安装与使用。
- 启动与关闭:OpenPilot硬件设备可以启动车辆时自动开机,熄火时自动关机。
- 功能支持:OpenPilot主要支持车道保持、ACC巡航、自动辅助变道这三个功能。
- 使用注意事项:在使用OpenPilot时,需要注意不要在阳光下长时间照射,以避免锂电池爆炸风险。如果使用的是没有电池的Comma 2,那么可以将EON一直挂在前挡风玻璃上,不用取下来。但如果使用的是带有锂电池的手机(乐视3Pro 或 一加3T),那么建议每次停车从前挡风玻璃上取下来。
- 系统集成:OpenPilot可以与多种汽车品牌、型号和年份的车辆进行集成,但具体集成程度和功能限制可能因车辆型号和配置而有所不同。
- 限制与注意事项:虽然OpenPilot是一个开源的驾驶辅助系统,但在使用时仍需注意安全问题。驾驶员应始终保持对车辆的控制权,并随时准备在必要时接管驾驶。此外,OpenPilot的集成和限制取决于车辆的硬件配置和软件支持程度,因此在使用前应充分了解系统的功能和限制。
- 系统升级:如果需要升级OpenPilot系统,可以通过克隆OpenPilot的代码库并按照安装说明进行操作来完成。升级后应重新配置环境变量和依赖项,并重新编译系统。
总之,在使用OpenPilot时,需要注意安全问题,并了解系统的功能和限制。同时,应保持对车辆的控制权,并随时准备在必要时接管驾驶。
OpenPilot的代码案例可以在GitHub上找到,具体地址为https://github.com/commaai/openpilot。
OpenPilot的代码库包含多个文件和文件夹,其中一些重要的文件和文件夹包括:
selfdrive/manager/manager.py
:这是项目启动入口,管理系统中各个组件运行状态。selfdrive/manager/controller_manager.py
:控制器管理器,用于管理车辆控制器的生命周期。selfdrive/car/XXXX/carcontroller.py
:每个车型的独立控制器实现代码。
在这些文件和文件夹中,可以找到车道保持、辅助变道、车道偏离报警等功能的实现代码。例如,车道保持功能的实现代码可以在selfdrive/car/XXXX/carcontroller.py
中找到,该代码会计算出车辆行驶在车道线内的期望位置,并通过PID控制器控制车辆的横向位置,使其保持在车道线内。
此外,OpenPilot还使用了路径规划和预测算法来预测车辆的未来轨迹和周围障碍物的运动轨迹。这些算法可以在selfdrive/controlsd/prediction.py
和selfdrive/controlsd/longitudinal_controller.py
等文件中找到。
总之,OpenPilot的代码库包含了多个文件和文件夹,其中包含车道保持、辅助变道、车道偏离报警等功能的实现代码。通过了解这些代码的实现原理,可以深入了解OpenPilot的工作原理和实现细节。
#!/usr/bin/env python3
import datetime
import os
import signal
import subprocess
import sys
import traceback
from typing import List, Tuple, Unionfrom cereal import log
import cereal.messaging as messaging
import openpilot.selfdrive.sentry as sentry
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params, ParamKeyType
from openpilot.common.text_window import TextWindow
from openpilot.selfdrive.boardd.set_time import set_time
from openpilot.system.hardware import HARDWARE, PC
from openpilot.selfdrive.manager.helpers import unblock_stdout, write_onroad_params
from openpilot.selfdrive.manager.process import ensure_running
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.selfdrive.athena.registration import register, UNREGISTERED_DONGLE_ID
from openpilot.common.swaglog import cloudlog, add_file_handler
from openpilot.system.version import is_dirty, get_commit, get_version, get_origin, get_short_branch, \get_normalized_origin, terms_version, training_version, \is_tested_branch, is_release_branchdef manager_init() -> None:# update system time from pandaset_time(cloudlog)# save boot logsubprocess.call("./bootlog", cwd=os.path.join(BASEDIR, "system/loggerd"))params = Params()params.clear_all(ParamKeyType.CLEAR_ON_MANAGER_START)params.clear_all(ParamKeyType.CLEAR_ON_ONROAD_TRANSITION)params.clear_all(ParamKeyType.CLEAR_ON_OFFROAD_TRANSITION)if is_release_branch():params.clear_all(ParamKeyType.DEVELOPMENT_ONLY)default_params: List[Tuple[str, Union[str, bytes]]] = [("CompletedTrainingVersion", "0"),("DisengageOnAccelerator", "0"),("GsmMetered", "1"),("HasAcceptedTerms", "0"),("LanguageSetting", "main_en"),("OpenpilotEnabledToggle", "1"),("LongitudinalPersonality", str(log.LongitudinalPersonality.standard)),]if not PC:default_params.append(("LastUpdateTime", datetime.datetime.utcnow().isoformat().encode('utf8')))if params.get_bool("RecordFrontLock"):params.put_bool("RecordFront", True)# set unset paramsfor k, v in default_params:if params.get(k) is None:params.put(k, v)# is this dashcam?if os.getenv("PASSIVE") is not None:params.put_bool("Passive", bool(int(os.getenv("PASSIVE", "0"))))if params.get("Passive") is None:raise Exception("Passive must be set to continue")# Create folders needed for msgqtry:os.mkdir("/dev/shm")except FileExistsError:passexcept PermissionError:print("WARNING: failed to make /dev/shm")# set version paramsparams.put("Version", get_version())params.put("TermsVersion", terms_version)params.put("TrainingVersion", training_version)params.put("GitCommit", get_commit(default=""))params.put("GitBranch", get_short_branch(default=""))params.put("GitRemote", get_origin(default=""))params.put_bool("IsTestedBranch", is_tested_branch())params.put_bool("IsReleaseBranch", is_release_branch())# set dongle idreg_res = register(show_spinner=True)if reg_res:dongle_id = reg_reselse:serial = params.get("HardwareSerial")raise Exception(f"Registration failed for device {serial}")os.environ['DONGLE_ID'] = dongle_id # Needed for swaglogif not is_dirty():os.environ['CLEAN'] = '1'# init loggingsentry.init(sentry.SentryProject.SELFDRIVE)cloudlog.bind_global(dongle_id=dongle_id,version=get_version(),origin=get_normalized_origin(),branch=get_short_branch(),commit=get_commit(),dirty=is_dirty(),device=HARDWARE.get_device_type())def manager_prepare() -> None:for p in managed_processes.values():p.prepare()def manager_cleanup() -> None:# send signals to kill all procsfor p in managed_processes.values():p.stop(block=False)# ensure all are killedfor p in managed_processes.values():p.stop(block=True)cloudlog.info("everything is dead")def manager_thread() -> None:cloudlog.bind(daemon="manager")cloudlog.info("manager start")cloudlog.info({"environ": os.environ})params = Params()ignore: List[str] = []if params.get("DongleId", encoding='utf8') in (None, UNREGISTERED_DONGLE_ID):ignore += ["manage_athenad", "uploader"]if os.getenv("NOBOARD") is not None:ignore.append("pandad")ignore += [x for x in os.getenv("BLOCK", "").split(",") if len(x) > 0]sm = messaging.SubMaster(['deviceState', 'carParams'], poll=['deviceState'])pm = messaging.PubMaster(['managerState'])write_onroad_params(False, params)ensure_running(managed_processes.values(), False, params=params, CP=sm['carParams'], not_run=ignore)started_prev = Falsewhile True:sm.update()started = sm['deviceState'].startedif started and not started_prev:params.clear_all(ParamKeyType.CLEAR_ON_ONROAD_TRANSITION)elif not started and started_prev:params.clear_all(ParamKeyType.CLEAR_ON_OFFROAD_TRANSITION)# update onroad params, which drives boardd's safety setter threadif started != started_prev:write_onroad_params(started, params)started_prev = startedensure_running(managed_processes.values(), started, params=params, CP=sm['carParams'], not_run=ignore)running = ' '.join("%s%s\u001b[0m" % ("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name)for p in managed_processes.values() if p.proc)print(running)cloudlog.debug(running)# send managerStatemsg = messaging.new_message('managerState', valid=True)msg.managerState.processes = [p.get_process_state_msg() for p in managed_processes.values()]pm.send('managerState', msg)# Exit main loop when uninstall/shutdown/reboot is neededshutdown = Falsefor param in ("DoUninstall", "DoShutdown", "DoReboot"):if params.get_bool(param):shutdown = Trueparams.put("LastManagerExitReason", f"{param} {datetime.datetime.now()}")cloudlog.warning(f"Shutting down manager - {param} set")if shutdown:breakdef main() -> None:prepare_only = os.getenv("PREPAREONLY") is not Nonemanager_init()# Start UI early so prepare can happen in the backgroundif not prepare_only:managed_processes['ui'].start()manager_prepare()if prepare_only:return# SystemExit on sigtermsignal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(1))try:manager_thread()except Exception:traceback.print_exc()sentry.capture_exception()finally:manager_cleanup()params = Params()if params.get_bool("DoUninstall"):cloudlog.warning("uninstalling")HARDWARE.uninstall()elif params.get_bool("DoReboot"):cloudlog.warning("reboot")HARDWARE.reboot()elif params.get_bool("DoShutdown"):cloudlog.warning("shutdown")HARDWARE.shutdown()if __name__ == "__main__":unblock_stdout()try:main()except Exception:add_file_handler(cloudlog)cloudlog.exception("Manager failed to start")try:managed_processes['ui'].stop()except Exception:pass# Show last 3 lines of tracebackerror = traceback.format_exc(-3)error = "Manager failed to start\n\n" + errorwith TextWindow(error) as t:t.wait_for_exit()raise# manual exit because we are forkedsys.exit(0)
这段代码是一个Python脚本,主要用于初始化OpenPilot系统中的一些组件。它导入了多个Python标准库和OpenPilot自定义模块。在导入完成后,执行了几个关键的初始化步骤。首先,它设置了系统的时间,然后初始化了日志记录和Sentry错误跟踪系统。之后,它启动了确保运行中的进程,并配置了被管理的进程。最后,它注册了Dongle设备并初始化了环境变量。
这个脚本的主要目的是确保OpenPilot系统的正常运行,包括日志记录、错误跟踪、进程管理和设备注册等关键功能的初始化。这些初始化步骤对于系统的稳定性和可靠性至关重要,因此这个脚本是OpenPilot系统启动过程中不可或缺的一部分。
这个脚本也展示了Python在系统自动化和配置管理方面的强大功能,特别是通过使用标准库和自定义模块来实现这些功能。这使得OpenPilot系统能够灵活地适应不同的硬件和软件环境,并且易于维护和扩展。
#!/usr/bin/env python3
import os
import math
import time
import threading
from typing import SupportsFloatfrom cereal import car, log
from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.profiler import Profiler
from openpilot.common.params import Params
import cereal.messaging as messaging
from cereal.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV
from panda import ALTERNATIVE_EXPERIENCE
from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_short_branch
from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp
from openpilot.selfdrive.car.car_helpers import get_car, get_startup_event, get_one_can
from openpilot.selfdrive.controls.lib.lateral_planner import CAMERA_OFFSET
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, get_lag_adjusted_curvature
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.events import Events, ET
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.system.hardware import HARDWARESOFT_DISABLE_TIME = 3 # seconds
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
TESTING_CLOSET = "TESTING_CLOSET" in os.environ
IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}ThermalStatus = log.DeviceState.ThermalStatus
State = log.ControlsState.OpenpilotState
PandaType = log.PandaState.PandaType
Desire = log.LateralPlan.Desire
LaneChangeState = log.LateralPlan.LaneChangeState
LaneChangeDirection = log.LateralPlan.LaneChangeDirection
EventName = car.CarEvent.EventName
ButtonType = car.CarState.ButtonEvent.Type
SafetyModel = car.CarParams.SafetyModelIGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput)
CSID_MAP = {"1": EventName.roadCameraError, "2": EventName.wideRoadCameraError, "0": EventName.driverCameraError}
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)class Controls:def __init__(self, CI=None):config_realtime_process(4, Priority.CTRL_HIGH)# Ensure the current branch is cached, otherwise the first iteration of controlsd lagsself.branch = get_short_branch("")# Setup socketsself.pm = messaging.PubMaster(['sendcan', 'controlsState', 'carState','carControl', 'onroadEvents', 'carParams'])self.sensor_packets = ["accelerometer", "gyroscope"]self.camera_packets = ["roadCameraState", "driverCameraState", "wideRoadCameraState"]self.log_sock = messaging.sub_sock('androidLog')self.can_sock = messaging.sub_sock('can', timeout=20)self.params = Params()ignore = self.sensor_packets + ['testJoystick']if SIMULATION:ignore += ['driverCameraState', 'managerState']self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration','driverMonitoringState', 'longitudinalPlan', 'lateralPlan', 'liveLocationKalman','managerState', 'liveParameters', 'radarState', 'liveTorqueParameters','testJoystick'] + self.camera_packets + self.sensor_packets,ignore_alive=ignore, ignore_avg_freq=['radarState', 'testJoystick'], ignore_valid=['testJoystick', ])if CI is None:# wait for one pandaState and one CAN packetprint("Waiting for CAN messages...")get_one_can(self.can_sock)num_pandas = len(messaging.recv_one_retry(self.sm.sock['pandaStates']).pandaStates)experimental_long_allowed = self.params.get_bool("ExperimentalLongitudinalEnabled")self.CI, self.CP = get_car(self.can_sock, self.pm.sock['sendcan'], experimental_long_allowed, num_pandas)else:self.CI, self.CP = CI, CI.CPself.joystick_mode = self.params.get_bool("JoystickDebugMode")# set alternative experiences from parametersself.disengage_on_accelerator = self.params.get_bool("DisengageOnAccelerator")self.CP.alternativeExperience = 0if not self.disengage_on_accelerator:self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS# read paramsself.is_metric = self.params.get_bool("IsMetric")self.is_ldw_enabled = self.params.get_bool("IsLdwEnabled")openpilot_enabled_toggle = self.params.get_bool("OpenpilotEnabledToggle")passive = self.params.get_bool("Passive") or not openpilot_enabled_toggle# detect sound card presence and ensure successful initsounds_available = HARDWARE.get_sound_card_online()car_recognized = self.CP.carName != 'mock'controller_available = self.CI.CC is not None and not passive and not self.CP.dashcamOnlyself.CP.passive = not car_recognized or not controller_available or self.CP.dashcamOnlyif self.CP.passive:safety_config = car.CarParams.SafetyConfig.new_message()safety_config.safetyModel = car.CarParams.SafetyModel.noOutputself.CP.safetyConfigs = [safety_config]# Write previous route's CarParamsprev_cp = self.params.get("CarParamsPersistent")if prev_cp is not None:self.params.put("CarParamsPrevRoute", prev_cp)# Write CarParams for radardcp_bytes = self.CP.to_bytes()self.params.put("CarParams", cp_bytes)self.params.put_nonblocking("CarParamsCache", cp_bytes)self.params.put_nonblocking("CarParamsPersistent", cp_bytes)# cleanup old paramsif not self.CP.experimentalLongitudinalAvailable:self.params.remove("ExperimentalLongitudinalEnabled")if not self.CP.openpilotLongitudinalControl:self.params.remove("ExperimentalMode")self.CC = car.CarControl.new_message()self.CS_prev = car.CarState.new_message()self.AM = AlertManager()self.events = Events()self.LoC = LongControl(self.CP)self.VM = VehicleModel(self.CP)self.LaC: LatControlif self.CP.steerControlType == car.CarParams.SteerControlType.angle:self.LaC = LatControlAngle(self.CP, self.CI)elif self.CP.lateralTuning.which() == 'pid':self.LaC = LatControlPID(self.CP, self.CI)elif self.CP.lateralTuning.which() == 'torque':self.LaC = LatControlTorque(self.CP, self.CI)self.initialized = Falseself.state = State.disabledself.enabled = Falseself.active = Falseself.soft_disable_timer = 0self.mismatch_counter = 0self.cruise_mismatch_counter = 0self.can_rcv_timeout_counter = 0 # conseuctive timeout countself.can_rcv_cum_timeout_counter = 0 # cumulative timeout countself.last_blinker_frame = 0self.last_steering_pressed_frame = 0self.distance_traveled = 0self.last_functional_fan_frame = 0self.events_prev = []self.current_alert_types = [ET.PERMANENT]self.logged_comm_issue = Noneself.not_running_prev = Noneself.last_actuators = car.CarControl.Actuators.new_message()self.steer_limited = Falseself.desired_curvature = 0.0self.desired_curvature_rate = 0.0self.experimental_mode = Falseself.v_cruise_helper = VCruiseHelper(self.CP)self.recalibrating_seen = Falseself.can_log_mono_time = 0self.startup_event = get_startup_event(car_recognized, controller_available, len(self.CP.carFw) > 0)if not sounds_available:self.events.add(EventName.soundsUnavailable, static=True)if not car_recognized:self.events.add(EventName.carUnrecognized, static=True)if len(self.CP.carFw) > 0:set_offroad_alert("Offroad_CarUnrecognized", True)else:set_offroad_alert("Offroad_NoFirmware", True)elif self.CP.passive:self.events.add(EventName.dashcamMode, static=True)# controlsd is driven by can recv, expected at 100Hzself.rk = Ratekeeper(100, print_delay_threshold=None)self.prof = Profiler(False) # off by defaultdef set_initial_state(self):if REPLAY:controls_state = Params().get("ReplayControlsState")if controls_state is not None:with log.ControlsState.from_bytes(controls_state) as controls_state:self.v_cruise_helper.v_cruise_kph = controls_state.vCruiseif any(ps.controlsAllowed for ps in self.sm['pandaStates']):self.state = State.enableddef update_events(self, CS):"""Compute onroadEvents from carState"""self.events.clear()# Add joystick event, static on cars, dynamic on nonCarsif self.joystick_mode:self.events.add(EventName.joystickDebug)self.startup_event = None# Add startup eventif self.startup_event is not None:self.events.add(self.startup_event)self.startup_event = None# Don't add any more events if not initializedif not self.initialized:self.events.add(EventName.controlsInitializing)return# no more events while in dashcam modeif self.CP.passive:return# Block resume if cruise never previously enabledresume_pressed = any(be.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for be in CS.buttonEvents)if not self.CP.pcmCruise and not self.v_cruise_helper.v_cruise_initialized and resume_pressed:self.events.add(EventName.resumeBlocked)# Disable on rising edge of accelerator or brake. Also disable on brake when speed > 0if (CS.gasPressed and not self.CS_prev.gasPressed and self.disengage_on_accelerator) or \(CS.brakePressed and (not self.CS_prev.brakePressed or not CS.standstill)) or \(CS.regenBraking and (not self.CS_prev.regenBraking or not CS.standstill)):self.events.add(EventName.pedalPressed)if CS.brakePressed and CS.standstill:self.events.add(EventName.preEnableStandstill)if CS.gasPressed:self.events.add(EventName.gasPressedOverride)if not self.CP.notCar:self.events.add_from_msg(self.sm['driverMonitoringState'].events)# Add car events, ignore if CAN isn't validif CS.canValid:self.events.add_from_msg(CS.events)# Create events for temperature, disk space, and memoryif self.sm['deviceState'].thermalStatus >= ThermalStatus.red:self.events.add(EventName.overheat)if self.sm['deviceState'].freeSpacePercent < 7 and not SIMULATION:# under 7% of space free no enable allowedself.events.add(EventName.outOfSpace)if self.sm['deviceState'].memoryUsagePercent > 90 and not SIMULATION:self.events.add(EventName.lowMemory)# TODO: enable this once loggerd CPU usage is more reasonable#cpus = list(self.sm['deviceState'].cpuUsagePercent)#if max(cpus, default=0) > 95 and not SIMULATION:# self.events.add(EventName.highCpuUsage)# Alert if fan isn't spinning for 5 secondsif self.sm['peripheralState'].pandaType != log.PandaState.PandaType.unknown:if self.sm['peripheralState'].fanSpeedRpm < 500 and self.sm['deviceState'].fanSpeedPercentDesired > 50:# allow enough time for the fan controller in the panda to recover from stallsif (self.sm.frame - self.last_functional_fan_frame) * DT_CTRL > 15.0:self.events.add(EventName.fanMalfunction)else:self.last_functional_fan_frame = self.sm.frame# Handle calibration statuscal_status = self.sm['liveCalibration'].calStatusif cal_status != log.LiveCalibrationData.Status.calibrated:if cal_status == log.LiveCalibrationData.Status.uncalibrated:self.events.add(EventName.calibrationIncomplete)elif cal_status == log.LiveCalibrationData.Status.recalibrating:if not self.recalibrating_seen:set_offroad_alert("Offroad_Recalibration", True)self.recalibrating_seen = Trueself.events.add(EventName.calibrationRecalibrating)else:self.events.add(EventName.calibrationInvalid)# Handle lane changeif self.sm['lateralPlan'].laneChangeState == LaneChangeState.preLaneChange:direction = self.sm['lateralPlan'].laneChangeDirectionif (CS.leftBlindspot and direction == LaneChangeDirection.left) or \(CS.rightBlindspot and direction == LaneChangeDirection.right):self.events.add(EventName.laneChangeBlocked)else:if direction == LaneChangeDirection.left:self.events.add(EventName.preLaneChangeLeft)else:self.events.add(EventName.preLaneChangeRight)elif self.sm['lateralPlan'].laneChangeState in (LaneChangeState.laneChangeStarting,LaneChangeState.laneChangeFinishing):self.events.add(EventName.laneChange)for i, pandaState in enumerate(self.sm['pandaStates']):# All pandas must match the list of safetyConfigs, and if outside this list, must be silent or noOutputif i < len(self.CP.safetyConfigs):safety_mismatch = pandaState.safetyModel != self.CP.safetyConfigs[i].safetyModel or \pandaState.safetyParam != self.CP.safetyConfigs[i].safetyParam or \pandaState.alternativeExperience != self.CP.alternativeExperienceelse:safety_mismatch = pandaState.safetyModel not in IGNORED_SAFETY_MODESif safety_mismatch or pandaState.safetyRxChecksInvalid or self.mismatch_counter >= 200:self.events.add(EventName.controlsMismatch)if log.PandaState.FaultType.relayMalfunction in pandaState.faults:self.events.add(EventName.relayMalfunction)# Handle HW and system malfunctions# Order is very intentional here. Be careful when modifying this.# All events here should at least have NO_ENTRY and SOFT_DISABLE.num_events = len(self.events)not_running = {p.name for p in self.sm['managerState'].processes if not p.running and p.shouldBeRunning}if self.sm.rcv_frame['managerState'] and (not_running - IGNORE_PROCESSES):self.events.add(EventName.processNotRunning)if not_running != self.not_running_prev:cloudlog.event("process_not_running", not_running=not_running, error=True)self.not_running_prev = not_runningelse:if not SIMULATION and not self.rk.lagging:if not self.sm.all_alive(self.camera_packets):self.events.add(EventName.cameraMalfunction)elif not self.sm.all_freq_ok(self.camera_packets):self.events.add(EventName.cameraFrameRate)if not REPLAY and self.rk.lagging:self.events.add(EventName.controlsdLagging)if len(self.sm['radarState'].radarErrors) or (not self.rk.lagging and not self.sm.all_checks(['radarState'])):self.events.add(EventName.radarFault)if not self.sm.valid['pandaStates']:self.events.add(EventName.usbError)if CS.canTimeout:self.events.add(EventName.canBusMissing)elif not CS.canValid:self.events.add(EventName.canError)# generic catch-all. ideally, a more specific event should be added above insteadcan_rcv_timeout = self.can_rcv_timeout_counter >= 5has_disable_events = self.events.contains(ET.NO_ENTRY) and (self.events.contains(ET.SOFT_DISABLE) or self.events.contains(ET.IMMEDIATE_DISABLE))no_system_errors = (not has_disable_events) or (len(self.events) == num_events)if (not self.sm.all_checks() or can_rcv_timeout) and no_system_errors:if not self.sm.all_alive():self.events.add(EventName.commIssue)elif not self.sm.all_freq_ok():self.events.add(EventName.commIssueAvgFreq)else: # invalid or can_rcv_timeout.self.events.add(EventName.commIssue)logs = {'invalid': [s for s, valid in self.sm.valid.items() if not valid],'not_alive': [s for s, alive in self.sm.alive.items() if not alive],'not_freq_ok': [s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],'can_rcv_timeout': can_rcv_timeout,}if logs != self.logged_comm_issue:cloudlog.event("commIssue", error=True, **logs)self.logged_comm_issue = logselse:self.logged_comm_issue = Noneif not (self.CP.notCar and self.joystick_mode):if not self.sm['lateralPlan'].mpcSolutionValid:self.events.add(EventName.plannerError)if not self.sm['liveLocationKalman'].posenetOK:self.events.add(EventName.posenetInvalid)if not self.sm['liveLocationKalman'].deviceStable:self.events.add(EventName.deviceFalling)if not self.sm['liveLocationKalman'].inputsOK:self.events.add(EventName.locationdTemporaryError)if not self.sm['liveParameters'].valid and not TESTING_CLOSET and (not SIMULATION or REPLAY):self.events.add(EventName.paramsdTemporaryError)# conservative HW alert. if the data or frequency are off, locationd will throw an errorif any((self.sm.frame - self.sm.rcv_frame[s])*DT_CTRL > 10. for s in self.sensor_packets):self.events.add(EventName.sensorDataInvalid)if not REPLAY:# Check for mismatch between openpilot and car's PCMcruise_mismatch = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)self.cruise_mismatch_counter = self.cruise_mismatch_counter + 1 if cruise_mismatch else 0if self.cruise_mismatch_counter > int(6. / DT_CTRL):self.events.add(EventName.cruiseMismatch)# Check for FCWstock_long_is_braking = self.enabled and not self.CP.openpilotLongitudinalControl and CS.aEgo < -1.25model_fcw = self.sm['modelV2'].meta.hardBrakePredicted and not CS.brakePressed and not stock_long_is_brakingplanner_fcw = self.sm['longitudinalPlan'].fcw and self.enabledif planner_fcw or model_fcw:self.events.add(EventName.fcw)for m in messaging.drain_sock(self.log_sock, wait_for_one=False):try:msg = m.androidLog.messageif any(err in msg for err in ("ERROR_CRC", "ERROR_ECC", "ERROR_STREAM_UNDERFLOW", "APPLY FAILED")):csid = msg.split("CSID:")[-1].split(" ")[0]evt = CSID_MAP.get(csid, None)if evt is not None:self.events.add(evt)except UnicodeDecodeError:pass# TODO: fix simulatorif not SIMULATION or REPLAY:if not self.sm['liveLocationKalman'].gpsOK and self.sm['liveLocationKalman'].inputsOK and (self.distance_traveled > 1000):# Not show in first 1 km to allow for driving out of garage. This event shows after 5 minutesself.events.add(EventName.noGps)if self.sm['modelV2'].frameDropPerc > 20:self.events.add(EventName.modeldLagging)def data_sample(self):"""Receive data from sockets and update carState"""# Update carState from CANcan_strs = messaging.drain_sock_raw(self.can_sock, wait_for_one=True)CS = self.CI.update(self.CC, can_strs)if len(can_strs) and REPLAY:self.can_log_mono_time = messaging.log_from_bytes(can_strs[0]).logMonoTimeself.sm.update(0)if not self.initialized:all_valid = CS.canValid and self.sm.all_checks()timed_out = self.sm.frame * DT_CTRL > (6. if REPLAY else 3.5)if all_valid or timed_out or (SIMULATION and not REPLAY):available_streams = VisionIpcClient.available_streams("camerad", block=False)if VisionStreamType.VISION_STREAM_ROAD not in available_streams:self.sm.ignore_alive.append('roadCameraState')if VisionStreamType.VISION_STREAM_WIDE_ROAD not in available_streams:self.sm.ignore_alive.append('wideRoadCameraState')if not self.CP.passive:self.CI.init(self.CP, self.can_sock, self.pm.sock['sendcan'])self.initialized = Trueself.set_initial_state()self.params.put_bool_nonblocking("ControlsReady", True)# Check for CAN timeoutif not can_strs:self.can_rcv_timeout_counter += 1self.can_rcv_cum_timeout_counter += 1else:self.can_rcv_timeout_counter = 0# When the panda and controlsd do not agree on controls_allowed# we want to disengage openpilot. However the status from the panda goes through# another socket other than the CAN messages and one can arrive earlier than the other.# Therefore we allow a mismatch for two samples, then we trigger the disengagement.if not self.enabled:self.mismatch_counter = 0# All pandas not in silent mode must have controlsAllowed when openpilot is enabledif self.enabled and any(not ps.controlsAllowed for ps in self.sm['pandaStates']if ps.safetyModel not in IGNORED_SAFETY_MODES):self.mismatch_counter += 1self.distance_traveled += CS.vEgo * DT_CTRLreturn CSdef state_transition(self, CS):"""Compute conditional state transitions and execute actions on state transitions"""self.v_cruise_helper.update_v_cruise(CS, self.enabled, self.is_metric)# decrement the soft disable timer at every step, as it's reset on# entrance in SOFT_DISABLING stateself.soft_disable_timer = max(0, self.soft_disable_timer - 1)self.current_alert_types = [ET.PERMANENT]# ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDINGif self.state != State.disabled:# user and immediate disable always have priority in a non-disabled stateif self.events.contains(ET.USER_DISABLE):self.state = State.disabledself.current_alert_types.append(ET.USER_DISABLE)elif self.events.contains(ET.IMMEDIATE_DISABLE):self.state = State.disabledself.current_alert_types.append(ET.IMMEDIATE_DISABLE)else:# ENABLEDif self.state == State.enabled:if self.events.contains(ET.SOFT_DISABLE):self.state = State.softDisablingself.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)self.current_alert_types.append(ET.SOFT_DISABLE)elif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):self.state = State.overridingself.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]# SOFT DISABLINGelif self.state == State.softDisabling:if not self.events.contains(ET.SOFT_DISABLE):# no more soft disabling condition, so go back to ENABLEDself.state = State.enabledelif self.soft_disable_timer > 0:self.current_alert_types.append(ET.SOFT_DISABLE)elif self.soft_disable_timer <= 0:self.state = State.disabled# PRE ENABLINGelif self.state == State.preEnabled:if not self.events.contains(ET.PRE_ENABLE):self.state = State.enabledelse:self.current_alert_types.append(ET.PRE_ENABLE)# OVERRIDINGelif self.state == State.overriding:if self.events.contains(ET.SOFT_DISABLE):self.state = State.softDisablingself.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)self.current_alert_types.append(ET.SOFT_DISABLE)elif not (self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL)):self.state = State.enabledelse:self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]# DISABLEDelif self.state == State.disabled:if self.events.contains(ET.ENABLE):if self.events.contains(ET.NO_ENTRY):self.current_alert_types.append(ET.NO_ENTRY)else:if self.events.contains(ET.PRE_ENABLE):self.state = State.preEnabledelif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):self.state = State.overridingelse:self.state = State.enabledself.current_alert_types.append(ET.ENABLE)self.v_cruise_helper.initialize_v_cruise(CS, self.experimental_mode)# Check if openpilot is engaged and actuators are enabledself.enabled = self.state in ENABLED_STATESself.active = self.state in ACTIVE_STATESif self.active:self.current_alert_types.append(ET.WARNING)def state_control(self, CS):"""Given the state, this function returns a CarControl packet"""# Update VehicleModellp = self.sm['liveParameters']x = max(lp.stiffnessFactor, 0.1)sr = max(lp.steerRatio, 0.1)self.VM.update_params(x, sr)# Update Torque Paramsif self.CP.lateralTuning.which() == 'torque':torque_params = self.sm['liveTorqueParameters']if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,torque_params.frictionCoefficientFiltered)lat_plan = self.sm['lateralPlan']long_plan = self.sm['longitudinalPlan']CC = car.CarControl.new_message()CC.enabled = self.enabled# Check which actuators can be enabledstandstill = CS.vEgo <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstillCC.latActive = self.active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \(not standstill or self.joystick_mode)CC.longActive = self.enabled and not self.events.contains(ET.OVERRIDE_LONGITUDINAL) and self.CP.openpilotLongitudinalControlactuators = CC.actuatorsactuators.longControlState = self.LoC.long_control_state# Enable blinkers while lane changingif self.sm['lateralPlan'].laneChangeState != LaneChangeState.off:CC.leftBlinker = self.sm['lateralPlan'].laneChangeDirection == LaneChangeDirection.leftCC.rightBlinker = self.sm['lateralPlan'].laneChangeDirection == LaneChangeDirection.rightif CS.leftBlinker or CS.rightBlinker:self.last_blinker_frame = self.sm.frame# State specific actionsif not CC.latActive:self.LaC.reset()if not CC.longActive:self.LoC.reset(v_pid=CS.vEgo)if not self.joystick_mode:# accel PID looppid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, self.v_cruise_helper.v_cruise_kph * CV.KPH_TO_MS)t_since_plan = (self.sm.frame - self.sm.rcv_frame['longitudinalPlan']) * DT_CTRLactuators.accel = self.LoC.update(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan)# Steering PID loop and lateral MPCself.desired_curvature, self.desired_curvature_rate = get_lag_adjusted_curvature(self.CP, CS.vEgo,lat_plan.psis,lat_plan.curvatures,lat_plan.curvatureRates)actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,self.steer_limited, self.desired_curvature,self.desired_curvature_rate, self.sm['liveLocationKalman'])actuators.curvature = self.desired_curvatureelse:lac_log = log.ControlsState.LateralDebugState.new_message()if self.sm.rcv_frame['testJoystick'] > 0:# reset joystick if it hasn't been received in a whileshould_reset_joystick = (self.sm.frame - self.sm.rcv_frame['testJoystick'])*DT_CTRL > 0.2if not should_reset_joystick:joystick_axes = self.sm['testJoystick'].axeselse:joystick_axes = [0.0, 0.0]if CC.longActive:actuators.accel = 4.0*clip(joystick_axes[0], -1, 1)if CC.latActive:steer = clip(joystick_axes[1], -1, 1)# max angle is 45 for angle-based cars, max curvature is 0.02actuators.steer, actuators.steeringAngleDeg, actuators.curvature = steer, steer * 90., steer * -0.02lac_log.active = self.activelac_log.steeringAngleDeg = CS.steeringAngleDeglac_log.output = actuators.steerlac_log.saturated = abs(actuators.steer) >= 0.9if CS.steeringPressed:self.last_steering_pressed_frame = self.sm.framerecent_steer_pressed = (self.sm.frame - self.last_steering_pressed_frame)*DT_CTRL < 2.0# Send a "steering required alert" if saturation count has reached the limitif lac_log.active and not recent_steer_pressed and not self.CP.notCar:if self.CP.lateralTuning.which() == 'torque' and not self.joystick_mode:undershooting = abs(lac_log.desiredLateralAccel) / abs(1e-3 + lac_log.actualLateralAccel) > 1.2turning = abs(lac_log.desiredLateralAccel) > 1.0good_speed = CS.vEgo > 5max_torque = abs(self.last_actuators.steer) > 0.99if undershooting and turning and good_speed and max_torque:lac_log.active and self.events.add(EventName.steerSaturated)elif lac_log.saturated:dpath_points = lat_plan.dPathPointsif len(dpath_points):# Check if we deviated from the path# TODO use desired vs actual curvatureif self.CP.steerControlType == car.CarParams.SteerControlType.angle:steering_value = actuators.steeringAngleDegelse:steering_value = actuators.steerleft_deviation = steering_value > 0 and dpath_points[0] < -0.20right_deviation = steering_value < 0 and dpath_points[0] > 0.20if left_deviation or right_deviation:self.events.add(EventName.steerSaturated)# Ensure no NaNs/Infsfor p in ACTUATOR_FIELDS:attr = getattr(actuators, p)if not isinstance(attr, SupportsFloat):continueif not math.isfinite(attr):cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")setattr(actuators, p, 0.0)return CC, lac_logdef publish_logs(self, CS, start_time, CC, lac_log):"""Send actuators and hud commands to the car, send controlsstate and MPC logging"""# Orientation and angle rates can be useful for carcontroller# Only calibrated (car) frame is relevant for the carcontrollerorientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value)if len(orientation_value) > 2:CC.orientationNED = orientation_valueangular_rate_value = list(self.sm['liveLocationKalman'].angularVelocityCalibrated.value)if len(angular_rate_value) > 2:CC.angularVelocity = angular_rate_valueCC.cruiseControl.override = self.enabled and not CC.longActive and self.CP.openpilotLongitudinalControlCC.cruiseControl.cancel = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)if self.joystick_mode and self.sm.rcv_frame['testJoystick'] > 0 and self.sm['testJoystick'].buttons[0]:CC.cruiseControl.cancel = Truespeeds = self.sm['longitudinalPlan'].speedsif len(speeds):CC.cruiseControl.resume = self.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1hudControl = CC.hudControlhudControl.setSpeed = float(self.v_cruise_helper.v_cruise_cluster_kph * CV.KPH_TO_MS)hudControl.speedVisible = self.enabledhudControl.lanesVisible = self.enabledhudControl.leadVisible = self.sm['longitudinalPlan'].hasLeadhudControl.rightLaneVisible = TruehudControl.leftLaneVisible = Truerecent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0 # 5s blinker cooldownldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibratedmodel_v2 = self.sm['modelV2']desire_prediction = model_v2.meta.desirePredictionif len(desire_prediction) and ldw_allowed:right_lane_visible = model_v2.laneLineProbs[2] > 0.5left_lane_visible = model_v2.laneLineProbs[1] > 0.5l_lane_change_prob = desire_prediction[Desire.laneChangeLeft]r_lane_change_prob = desire_prediction[Desire.laneChangeRight]lane_lines = model_v2.laneLinesl_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)if hudControl.rightLaneDepart or hudControl.leftLaneDepart:self.events.add(EventName.ldw)clear_event_types = set()if ET.WARNING not in self.current_alert_types:clear_event_types.add(ET.WARNING)if self.enabled:clear_event_types.add(ET.NO_ENTRY)alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer])self.AM.add_many(self.sm.frame, alerts)current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)if current_alert:hudControl.visualAlert = current_alert.visual_alertif not self.CP.passive and self.initialized:# send car controls over cannow_nanos = self.can_log_mono_time if REPLAY else int(time.monotonic() * 1e9)self.last_actuators, can_sends = self.CI.apply(CC, now_nanos)self.pm.send('sendcan', can_list_to_can_capnp(can_sends, msgtype='sendcan', valid=CS.canValid))CC.actuatorsOutput = self.last_actuatorsif self.CP.steerControlType == car.CarParams.SteerControlType.angle:self.steer_limited = abs(CC.actuators.steeringAngleDeg - CC.actuatorsOutput.steeringAngleDeg) > \STEER_ANGLE_SATURATION_THRESHOLDelse:self.steer_limited = abs(CC.actuators.steer - CC.actuatorsOutput.steer) > 1e-2force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \(self.state == State.softDisabling)# Curvature & Steering anglelp = self.sm['liveParameters']steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)# controlsStatedat = messaging.new_message('controlsState')dat.valid = CS.canValidcontrolsState = dat.controlsStateif current_alert:controlsState.alertText1 = current_alert.alert_text_1controlsState.alertText2 = current_alert.alert_text_2controlsState.alertSize = current_alert.alert_sizecontrolsState.alertStatus = current_alert.alert_statuscontrolsState.alertBlinkingRate = current_alert.alert_ratecontrolsState.alertType = current_alert.alert_typecontrolsState.alertSound = current_alert.audible_alertcontrolsState.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']controlsState.lateralPlanMonoTime = self.sm.logMonoTime['lateralPlan']controlsState.enabled = self.enabledcontrolsState.active = self.activecontrolsState.curvature = curvaturecontrolsState.desiredCurvature = self.desired_curvaturecontrolsState.desiredCurvatureRate = self.desired_curvature_ratecontrolsState.state = self.statecontrolsState.engageable = not self.events.contains(ET.NO_ENTRY)controlsState.longControlState = self.LoC.long_control_statecontrolsState.vPid = float(self.LoC.v_pid)controlsState.vCruise = float(self.v_cruise_helper.v_cruise_kph)controlsState.vCruiseCluster = float(self.v_cruise_helper.v_cruise_cluster_kph)controlsState.upAccelCmd = float(self.LoC.pid.p)controlsState.uiAccelCmd = float(self.LoC.pid.i)controlsState.ufAccelCmd = float(self.LoC.pid.f)controlsState.cumLagMs = -self.rk.remaining * 1000.controlsState.startMonoTime = int(start_time * 1e9)controlsState.forceDecel = bool(force_decel)controlsState.canErrorCounter = self.can_rcv_cum_timeout_countercontrolsState.experimentalMode = self.experimental_modelat_tuning = self.CP.lateralTuning.which()if self.joystick_mode:controlsState.lateralControlState.debugState = lac_logelif self.CP.steerControlType == car.CarParams.SteerControlType.angle:controlsState.lateralControlState.angleState = lac_logelif lat_tuning == 'pid':controlsState.lateralControlState.pidState = lac_logelif lat_tuning == 'torque':controlsState.lateralControlState.torqueState = lac_logself.pm.send('controlsState', dat)# carStatecar_events = self.events.to_msg()cs_send = messaging.new_message('carState')cs_send.valid = CS.canValidcs_send.carState = CScs_send.carState.events = car_eventsself.pm.send('carState', cs_send)# onroadEvents - logged every second or on changeif (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events.names != self.events_prev):ce_send = messaging.new_message('onroadEvents', len(self.events))ce_send.valid = Truece_send.onroadEvents = car_eventsself.pm.send('onroadEvents', ce_send)self.events_prev = self.events.names.copy()# carParams - logged every 50 seconds (> 1 per segment)if (self.sm.frame % int(50. / DT_CTRL) == 0):cp_send = messaging.new_message('carParams')cp_send.valid = Truecp_send.carParams = self.CPself.pm.send('carParams', cp_send)# carControlcc_send = messaging.new_message('carControl')cc_send.valid = CS.canValidcc_send.carControl = CCself.pm.send('carControl', cc_send)# copy CarControl to pass to CarInterface on the next iterationself.CC = CCdef step(self):start_time = time.monotonic()self.prof.checkpoint("Ratekeeper", ignore=True)# Sample data from sockets and get a carStateCS = self.data_sample()cloudlog.timestamp("Data sampled")self.prof.checkpoint("Sample")self.update_events(CS)cloudlog.timestamp("Events updated")if not self.CP.passive and self.initialized:# Update control stateself.state_transition(CS)self.prof.checkpoint("State transition")# Compute actuators (runs PID loops and lateral MPC)CC, lac_log = self.state_control(CS)self.prof.checkpoint("State Control")# Publish dataself.publish_logs(CS, start_time, CC, lac_log)self.prof.checkpoint("Sent")self.CS_prev = CSdef params_thread(self, evt):while not evt.is_set():self.is_metric = self.params.get_bool("IsMetric")self.experimental_mode = self.params.get_bool("ExperimentalMode") and self.CP.openpilotLongitudinalControlif self.CP.notCar:self.joystick_mode = self.params.get_bool("JoystickDebugMode")time.sleep(0.1)def controlsd_thread(self):e = threading.Event()t = threading.Thread(target=self.params_thread, args=(e, ))try:t.start()while True:self.step()self.rk.monitor_time()self.prof.display()except SystemExit:e.set()t.join()def main():controls = Controls()controls.controlsd_thread()if __name__ == "__main__":main()
这段代码主要涉及到Python3的多个模块和库的导入,包括os、math、time、threading等基础库,以及一些特定于自动驾驶的库,如cereal、openpilot等。这些库被用于处理各种任务,如操作系统调用、数学计算、时间管理和线程处理等,以及一些与自动驾驶相关的功能,如车辆控制、实时处理、参数管理、视觉处理等。
具体来说,代码中导入了os模块用于进行操作系统相关的操作,如文件和目录管理;math模块用于进行数学运算;time模块用于进行时间相关的操作;threading模块用于进行多线程编程。同时,还导入了多个自定义库和模块,如cereal、openpilot等,这些库提供了更多的功能,如数据序列化、自动驾驶控制、实时处理等。
在自动驾驶中,这些库被用于处理各种任务,如车辆控制、传感器数据处理、路径规划等。通过这些库的组合使用,可以实现高度自动化的驾驶系统,从而提高驾驶的安全性和舒适性。
如果我没猜错的话,这东西你应该没见过