影响机器人行业爆炸式发展的原因有哪些(OpenPilot)

机器人是现在和未来最有发展前景的行业,这一点是毋庸置疑的。

先看一段21年的视频:

外置式L2级别自动驾驶辅助套件 Openpilot 深度体验分析

 


火热

作为热门专业的从业人员,能够感受到行业的关注热度和实际热度之间的差异。

原因

有很多,此处只说一个,不满足“降本增效”。

解析

机器人本身需要购买,这也是成本。

机器人餐厅的重点是餐厅而非机器人,饭菜价格是否实惠,口味是否好,才是核心要素。

便宜不好吃,没人买,好吃不便宜,买的人不会多。

机器人替代人工的前提是,总投入要比人工价格还要低才可以的。

很多选用机器人的行业是,增本增效,肯定是不断发展向前的,但是速度肯定不快。

还有些奇葩行业是增本降效,这种嘛,都是一地鸡毛式发展。

满足“降本增效”的机器人行业还是极少的,降本增效的一个典型优势就是利润提升。

需求

抛开成本和效率,机器人是否有大量的需求呢?

这里需求是指人的需求,人需要机器人的需求。

不清楚,不回答。


一个热爱自动驾驶但妥妥外行之人的思考-2023-CSDN博客

2024啦,自动驾驶如何?

(其实知道OpenPilot的人都很少哦……)

5年之前的2019年发布了一篇博文:

有趣免费的开源机器人课程实践指北-2019-_carla 0.8.2 apis-CSDN博客 

里面提及的两篇路径,其实机器人专业很多很多很多学生都很难走完,并系统化研究机器人。

点赞数增加值也稳定了……

机器人(含自动驾驶汽车)成本和电脑手机相比有哪些差异化 


一套自动驾驶方案成本能有多低?100-200美元能否搞定,其实是可以的,B站有分享。

十代雅阁夜间行车 - 100km Hands Free


OpenPilot

 

OpenPilot是一个开源的驾驶辅助系统。目前,OpenPilot可以执行自适应巡航控制(ACC)、自动车道居中(ALC)、前碰撞预警(FCW)和车道偏离预警(LDW)等辅助功能,并且支持的汽车品牌、型号和年份也在不断增加。此外,当OpenPilot处于工作状态时,基于摄像头的驾驶员监控(DM)功能可以提醒分心或昏昏欲睡的驾驶员。

车辆集成和限制:
OpenPilot是一个开源的驾驶辅助系统,旨在为驾驶员提供更安全、更舒适的驾驶体验。它通过集成各种传感器和控制系统来实现各种驾驶辅助功能,例如自适应巡航控制、自动车道居中、前碰撞预警和车道偏离预警等。

OpenPilot可以与多种汽车品牌、型号和年份的车辆进行集成,这使得它可以应用于各种不同的驾驶场景和需求。但是,OpenPilot的集成和限制取决于车辆的硬件配置和软件支持程度。例如,一些较老的车型可能无法支持某些高级功能,或者可能需要额外的硬件改装才能使用OpenPilot。

除了基本的驾驶辅助功能外,OpenPilot还具有基于摄像头的驾驶员监控功能。该功能可以检测驾驶员是否分心或昏昏欲睡,并在必要时发出警告,以提醒驾驶员保持警觉并专注于驾驶。

总的来说,OpenPilot是一个非常有前途的开源驾驶辅助系统,它可以通过集成各种传感器和控制系统来提高驾驶的安全性和舒适性。虽然它有一些限制,但随着技术的不断发展和改进,我们相信OpenPilot将会变得更加完善和可靠。

截止2023年12月支持274款车。

为啥发展了这么多年还是不火爆呢?

2000元升级L2辅助驾驶,干翻一众车厂?Openpilot轻体验

完全满足“降本增效”啊。

安全性、可靠性、稳定性的担忧是一个逃不开的问题啊。


OpenPilot的安装步骤如下:

  1. 首先,需要从GitHub上克隆OpenPilot的代码库。请确保使用以下地址进行克隆:https://github.com/commaai/openpilot.git。
  2. 克隆完成后,进入OpenPilot目录:cd openpilot
  3. 接下来,需要初始化并更新所有的子模块:git submodule update --init --recursive
  4. 然后,需要运行环境配置脚本。这个脚本会安装必要的依赖项,并设置环境变量。在OpenPilot目录下运行以下命令:sudo sh tools/ubuntu_setup.sh
  5. 接下来,需要设置PYTHONPATH环境变量,以便Python可以找到OpenPilot的库。将以下行添加到你的bashrc文件中:export PYTHONPATH=$HOME/openpilot
  6. 之后,需要安装NVIDIA驱动程序和CUDA工具包。请根据你的GPU型号选择相应的驱动程序和CUDA版本。
  7. 接下来,需要安装TensorFlow 2.2的GPU版本。
  8. 还需要安装OpenCL驱动程序。
  9. 最后,需要安装OpenCV 4,但请忽略Python部分。
  10. 在完成以上步骤后,使用以下命令编译OpenPilot:scons -j$(nproc)
  11. 编译完成后,可以运行OpenPilot:cd ~/openpilot/selfdrive/manage

请注意,这些步骤可能会根据你的操作系统和硬件配置有所不同。此外,OpenPilot可能需要最新的依赖项和库才能正常工作,因此请确保你的系统已更新到最新版本。

OpenPilot是一个开源的驾驶辅助系统,可以在汽车上使用。以下是使用OpenPilot的说明:

  1. 硬件设备:使用OpenPilot需要一些硬件设备,包括EON、Harness等官方设备。这些设备可以与汽车进行对接,完成设备的安装与使用。
  2. 启动与关闭:OpenPilot硬件设备可以启动车辆时自动开机,熄火时自动关机。
  3. 功能支持:OpenPilot主要支持车道保持、ACC巡航、自动辅助变道这三个功能。
  4. 使用注意事项:在使用OpenPilot时,需要注意不要在阳光下长时间照射,以避免锂电池爆炸风险。如果使用的是没有电池的Comma 2,那么可以将EON一直挂在前挡风玻璃上,不用取下来。但如果使用的是带有锂电池的手机(乐视3Pro 或 一加3T),那么建议每次停车从前挡风玻璃上取下来。
  5. 系统集成:OpenPilot可以与多种汽车品牌、型号和年份的车辆进行集成,但具体集成程度和功能限制可能因车辆型号和配置而有所不同。
  6. 限制与注意事项:虽然OpenPilot是一个开源的驾驶辅助系统,但在使用时仍需注意安全问题。驾驶员应始终保持对车辆的控制权,并随时准备在必要时接管驾驶。此外,OpenPilot的集成和限制取决于车辆的硬件配置和软件支持程度,因此在使用前应充分了解系统的功能和限制。
  7. 系统升级:如果需要升级OpenPilot系统,可以通过克隆OpenPilot的代码库并按照安装说明进行操作来完成。升级后应重新配置环境变量和依赖项,并重新编译系统。

总之,在使用OpenPilot时,需要注意安全问题,并了解系统的功能和限制。同时,应保持对车辆的控制权,并随时准备在必要时接管驾驶。


OpenPilot的代码案例可以在GitHub上找到,具体地址为https://github.com/commaai/openpilot。

OpenPilot的代码库包含多个文件和文件夹,其中一些重要的文件和文件夹包括:

  • selfdrive/manager/manager.py:这是项目启动入口,管理系统中各个组件运行状态。
  • selfdrive/manager/controller_manager.py:控制器管理器,用于管理车辆控制器的生命周期。
  • selfdrive/car/XXXX/carcontroller.py:每个车型的独立控制器实现代码。

在这些文件和文件夹中,可以找到车道保持、辅助变道、车道偏离报警等功能的实现代码。例如,车道保持功能的实现代码可以在selfdrive/car/XXXX/carcontroller.py中找到,该代码会计算出车辆行驶在车道线内的期望位置,并通过PID控制器控制车辆的横向位置,使其保持在车道线内。

此外,OpenPilot还使用了路径规划和预测算法来预测车辆的未来轨迹和周围障碍物的运动轨迹。这些算法可以在selfdrive/controlsd/prediction.pyselfdrive/controlsd/longitudinal_controller.py等文件中找到。

总之,OpenPilot的代码库包含了多个文件和文件夹,其中包含车道保持、辅助变道、车道偏离报警等功能的实现代码。通过了解这些代码的实现原理,可以深入了解OpenPilot的工作原理和实现细节。

#!/usr/bin/env python3
import datetime
import os
import signal
import subprocess
import sys
import traceback
from typing import List, Tuple, Unionfrom cereal import log
import cereal.messaging as messaging
import openpilot.selfdrive.sentry as sentry
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params, ParamKeyType
from openpilot.common.text_window import TextWindow
from openpilot.selfdrive.boardd.set_time import set_time
from openpilot.system.hardware import HARDWARE, PC
from openpilot.selfdrive.manager.helpers import unblock_stdout, write_onroad_params
from openpilot.selfdrive.manager.process import ensure_running
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.selfdrive.athena.registration import register, UNREGISTERED_DONGLE_ID
from openpilot.common.swaglog import cloudlog, add_file_handler
from openpilot.system.version import is_dirty, get_commit, get_version, get_origin, get_short_branch, \get_normalized_origin, terms_version, training_version, \is_tested_branch, is_release_branchdef manager_init() -> None:# update system time from pandaset_time(cloudlog)# save boot logsubprocess.call("./bootlog", cwd=os.path.join(BASEDIR, "system/loggerd"))params = Params()params.clear_all(ParamKeyType.CLEAR_ON_MANAGER_START)params.clear_all(ParamKeyType.CLEAR_ON_ONROAD_TRANSITION)params.clear_all(ParamKeyType.CLEAR_ON_OFFROAD_TRANSITION)if is_release_branch():params.clear_all(ParamKeyType.DEVELOPMENT_ONLY)default_params: List[Tuple[str, Union[str, bytes]]] = [("CompletedTrainingVersion", "0"),("DisengageOnAccelerator", "0"),("GsmMetered", "1"),("HasAcceptedTerms", "0"),("LanguageSetting", "main_en"),("OpenpilotEnabledToggle", "1"),("LongitudinalPersonality", str(log.LongitudinalPersonality.standard)),]if not PC:default_params.append(("LastUpdateTime", datetime.datetime.utcnow().isoformat().encode('utf8')))if params.get_bool("RecordFrontLock"):params.put_bool("RecordFront", True)# set unset paramsfor k, v in default_params:if params.get(k) is None:params.put(k, v)# is this dashcam?if os.getenv("PASSIVE") is not None:params.put_bool("Passive", bool(int(os.getenv("PASSIVE", "0"))))if params.get("Passive") is None:raise Exception("Passive must be set to continue")# Create folders needed for msgqtry:os.mkdir("/dev/shm")except FileExistsError:passexcept PermissionError:print("WARNING: failed to make /dev/shm")# set version paramsparams.put("Version", get_version())params.put("TermsVersion", terms_version)params.put("TrainingVersion", training_version)params.put("GitCommit", get_commit(default=""))params.put("GitBranch", get_short_branch(default=""))params.put("GitRemote", get_origin(default=""))params.put_bool("IsTestedBranch", is_tested_branch())params.put_bool("IsReleaseBranch", is_release_branch())# set dongle idreg_res = register(show_spinner=True)if reg_res:dongle_id = reg_reselse:serial = params.get("HardwareSerial")raise Exception(f"Registration failed for device {serial}")os.environ['DONGLE_ID'] = dongle_id  # Needed for swaglogif not is_dirty():os.environ['CLEAN'] = '1'# init loggingsentry.init(sentry.SentryProject.SELFDRIVE)cloudlog.bind_global(dongle_id=dongle_id,version=get_version(),origin=get_normalized_origin(),branch=get_short_branch(),commit=get_commit(),dirty=is_dirty(),device=HARDWARE.get_device_type())def manager_prepare() -> None:for p in managed_processes.values():p.prepare()def manager_cleanup() -> None:# send signals to kill all procsfor p in managed_processes.values():p.stop(block=False)# ensure all are killedfor p in managed_processes.values():p.stop(block=True)cloudlog.info("everything is dead")def manager_thread() -> None:cloudlog.bind(daemon="manager")cloudlog.info("manager start")cloudlog.info({"environ": os.environ})params = Params()ignore: List[str] = []if params.get("DongleId", encoding='utf8') in (None, UNREGISTERED_DONGLE_ID):ignore += ["manage_athenad", "uploader"]if os.getenv("NOBOARD") is not None:ignore.append("pandad")ignore += [x for x in os.getenv("BLOCK", "").split(",") if len(x) > 0]sm = messaging.SubMaster(['deviceState', 'carParams'], poll=['deviceState'])pm = messaging.PubMaster(['managerState'])write_onroad_params(False, params)ensure_running(managed_processes.values(), False, params=params, CP=sm['carParams'], not_run=ignore)started_prev = Falsewhile True:sm.update()started = sm['deviceState'].startedif started and not started_prev:params.clear_all(ParamKeyType.CLEAR_ON_ONROAD_TRANSITION)elif not started and started_prev:params.clear_all(ParamKeyType.CLEAR_ON_OFFROAD_TRANSITION)# update onroad params, which drives boardd's safety setter threadif started != started_prev:write_onroad_params(started, params)started_prev = startedensure_running(managed_processes.values(), started, params=params, CP=sm['carParams'], not_run=ignore)running = ' '.join("%s%s\u001b[0m" % ("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name)for p in managed_processes.values() if p.proc)print(running)cloudlog.debug(running)# send managerStatemsg = messaging.new_message('managerState', valid=True)msg.managerState.processes = [p.get_process_state_msg() for p in managed_processes.values()]pm.send('managerState', msg)# Exit main loop when uninstall/shutdown/reboot is neededshutdown = Falsefor param in ("DoUninstall", "DoShutdown", "DoReboot"):if params.get_bool(param):shutdown = Trueparams.put("LastManagerExitReason", f"{param} {datetime.datetime.now()}")cloudlog.warning(f"Shutting down manager - {param} set")if shutdown:breakdef main() -> None:prepare_only = os.getenv("PREPAREONLY") is not Nonemanager_init()# Start UI early so prepare can happen in the backgroundif not prepare_only:managed_processes['ui'].start()manager_prepare()if prepare_only:return# SystemExit on sigtermsignal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(1))try:manager_thread()except Exception:traceback.print_exc()sentry.capture_exception()finally:manager_cleanup()params = Params()if params.get_bool("DoUninstall"):cloudlog.warning("uninstalling")HARDWARE.uninstall()elif params.get_bool("DoReboot"):cloudlog.warning("reboot")HARDWARE.reboot()elif params.get_bool("DoShutdown"):cloudlog.warning("shutdown")HARDWARE.shutdown()if __name__ == "__main__":unblock_stdout()try:main()except Exception:add_file_handler(cloudlog)cloudlog.exception("Manager failed to start")try:managed_processes['ui'].stop()except Exception:pass# Show last 3 lines of tracebackerror = traceback.format_exc(-3)error = "Manager failed to start\n\n" + errorwith TextWindow(error) as t:t.wait_for_exit()raise# manual exit because we are forkedsys.exit(0)

这段代码是一个Python脚本,主要用于初始化OpenPilot系统中的一些组件。它导入了多个Python标准库和OpenPilot自定义模块。在导入完成后,执行了几个关键的初始化步骤。首先,它设置了系统的时间,然后初始化了日志记录和Sentry错误跟踪系统。之后,它启动了确保运行中的进程,并配置了被管理的进程。最后,它注册了Dongle设备并初始化了环境变量。
这个脚本的主要目的是确保OpenPilot系统的正常运行,包括日志记录、错误跟踪、进程管理和设备注册等关键功能的初始化。这些初始化步骤对于系统的稳定性和可靠性至关重要,因此这个脚本是OpenPilot系统启动过程中不可或缺的一部分。
这个脚本也展示了Python在系统自动化和配置管理方面的强大功能,特别是通过使用标准库和自定义模块来实现这些功能。这使得OpenPilot系统能够灵活地适应不同的硬件和软件环境,并且易于维护和扩展。 

#!/usr/bin/env python3
import os
import math
import time
import threading
from typing import SupportsFloatfrom cereal import car, log
from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.profiler import Profiler
from openpilot.common.params import Params
import cereal.messaging as messaging
from cereal.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV
from panda import ALTERNATIVE_EXPERIENCE
from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_short_branch
from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp
from openpilot.selfdrive.car.car_helpers import get_car, get_startup_event, get_one_can
from openpilot.selfdrive.controls.lib.lateral_planner import CAMERA_OFFSET
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, get_lag_adjusted_curvature
from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.events import Events, ET
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager, set_offroad_alert
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.system.hardware import HARDWARESOFT_DISABLE_TIME = 3  # seconds
LDW_MIN_SPEED = 31 * CV.MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
TESTING_CLOSET = "TESTING_CLOSET" in os.environ
IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}ThermalStatus = log.DeviceState.ThermalStatus
State = log.ControlsState.OpenpilotState
PandaType = log.PandaState.PandaType
Desire = log.LateralPlan.Desire
LaneChangeState = log.LateralPlan.LaneChangeState
LaneChangeDirection = log.LateralPlan.LaneChangeDirection
EventName = car.CarEvent.EventName
ButtonType = car.CarState.ButtonEvent.Type
SafetyModel = car.CarParams.SafetyModelIGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput)
CSID_MAP = {"1": EventName.roadCameraError, "2": EventName.wideRoadCameraError, "0": EventName.driverCameraError}
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)class Controls:def __init__(self, CI=None):config_realtime_process(4, Priority.CTRL_HIGH)# Ensure the current branch is cached, otherwise the first iteration of controlsd lagsself.branch = get_short_branch("")# Setup socketsself.pm = messaging.PubMaster(['sendcan', 'controlsState', 'carState','carControl', 'onroadEvents', 'carParams'])self.sensor_packets = ["accelerometer", "gyroscope"]self.camera_packets = ["roadCameraState", "driverCameraState", "wideRoadCameraState"]self.log_sock = messaging.sub_sock('androidLog')self.can_sock = messaging.sub_sock('can', timeout=20)self.params = Params()ignore = self.sensor_packets + ['testJoystick']if SIMULATION:ignore += ['driverCameraState', 'managerState']self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration','driverMonitoringState', 'longitudinalPlan', 'lateralPlan', 'liveLocationKalman','managerState', 'liveParameters', 'radarState', 'liveTorqueParameters','testJoystick'] + self.camera_packets + self.sensor_packets,ignore_alive=ignore, ignore_avg_freq=['radarState', 'testJoystick'], ignore_valid=['testJoystick', ])if CI is None:# wait for one pandaState and one CAN packetprint("Waiting for CAN messages...")get_one_can(self.can_sock)num_pandas = len(messaging.recv_one_retry(self.sm.sock['pandaStates']).pandaStates)experimental_long_allowed = self.params.get_bool("ExperimentalLongitudinalEnabled")self.CI, self.CP = get_car(self.can_sock, self.pm.sock['sendcan'], experimental_long_allowed, num_pandas)else:self.CI, self.CP = CI, CI.CPself.joystick_mode = self.params.get_bool("JoystickDebugMode")# set alternative experiences from parametersself.disengage_on_accelerator = self.params.get_bool("DisengageOnAccelerator")self.CP.alternativeExperience = 0if not self.disengage_on_accelerator:self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS# read paramsself.is_metric = self.params.get_bool("IsMetric")self.is_ldw_enabled = self.params.get_bool("IsLdwEnabled")openpilot_enabled_toggle = self.params.get_bool("OpenpilotEnabledToggle")passive = self.params.get_bool("Passive") or not openpilot_enabled_toggle# detect sound card presence and ensure successful initsounds_available = HARDWARE.get_sound_card_online()car_recognized = self.CP.carName != 'mock'controller_available = self.CI.CC is not None and not passive and not self.CP.dashcamOnlyself.CP.passive = not car_recognized or not controller_available or self.CP.dashcamOnlyif self.CP.passive:safety_config = car.CarParams.SafetyConfig.new_message()safety_config.safetyModel = car.CarParams.SafetyModel.noOutputself.CP.safetyConfigs = [safety_config]# Write previous route's CarParamsprev_cp = self.params.get("CarParamsPersistent")if prev_cp is not None:self.params.put("CarParamsPrevRoute", prev_cp)# Write CarParams for radardcp_bytes = self.CP.to_bytes()self.params.put("CarParams", cp_bytes)self.params.put_nonblocking("CarParamsCache", cp_bytes)self.params.put_nonblocking("CarParamsPersistent", cp_bytes)# cleanup old paramsif not self.CP.experimentalLongitudinalAvailable:self.params.remove("ExperimentalLongitudinalEnabled")if not self.CP.openpilotLongitudinalControl:self.params.remove("ExperimentalMode")self.CC = car.CarControl.new_message()self.CS_prev = car.CarState.new_message()self.AM = AlertManager()self.events = Events()self.LoC = LongControl(self.CP)self.VM = VehicleModel(self.CP)self.LaC: LatControlif self.CP.steerControlType == car.CarParams.SteerControlType.angle:self.LaC = LatControlAngle(self.CP, self.CI)elif self.CP.lateralTuning.which() == 'pid':self.LaC = LatControlPID(self.CP, self.CI)elif self.CP.lateralTuning.which() == 'torque':self.LaC = LatControlTorque(self.CP, self.CI)self.initialized = Falseself.state = State.disabledself.enabled = Falseself.active = Falseself.soft_disable_timer = 0self.mismatch_counter = 0self.cruise_mismatch_counter = 0self.can_rcv_timeout_counter = 0      # conseuctive timeout countself.can_rcv_cum_timeout_counter = 0  # cumulative timeout countself.last_blinker_frame = 0self.last_steering_pressed_frame = 0self.distance_traveled = 0self.last_functional_fan_frame = 0self.events_prev = []self.current_alert_types = [ET.PERMANENT]self.logged_comm_issue = Noneself.not_running_prev = Noneself.last_actuators = car.CarControl.Actuators.new_message()self.steer_limited = Falseself.desired_curvature = 0.0self.desired_curvature_rate = 0.0self.experimental_mode = Falseself.v_cruise_helper = VCruiseHelper(self.CP)self.recalibrating_seen = Falseself.can_log_mono_time = 0self.startup_event = get_startup_event(car_recognized, controller_available, len(self.CP.carFw) > 0)if not sounds_available:self.events.add(EventName.soundsUnavailable, static=True)if not car_recognized:self.events.add(EventName.carUnrecognized, static=True)if len(self.CP.carFw) > 0:set_offroad_alert("Offroad_CarUnrecognized", True)else:set_offroad_alert("Offroad_NoFirmware", True)elif self.CP.passive:self.events.add(EventName.dashcamMode, static=True)# controlsd is driven by can recv, expected at 100Hzself.rk = Ratekeeper(100, print_delay_threshold=None)self.prof = Profiler(False)  # off by defaultdef set_initial_state(self):if REPLAY:controls_state = Params().get("ReplayControlsState")if controls_state is not None:with log.ControlsState.from_bytes(controls_state) as controls_state:self.v_cruise_helper.v_cruise_kph = controls_state.vCruiseif any(ps.controlsAllowed for ps in self.sm['pandaStates']):self.state = State.enableddef update_events(self, CS):"""Compute onroadEvents from carState"""self.events.clear()# Add joystick event, static on cars, dynamic on nonCarsif self.joystick_mode:self.events.add(EventName.joystickDebug)self.startup_event = None# Add startup eventif self.startup_event is not None:self.events.add(self.startup_event)self.startup_event = None# Don't add any more events if not initializedif not self.initialized:self.events.add(EventName.controlsInitializing)return# no more events while in dashcam modeif self.CP.passive:return# Block resume if cruise never previously enabledresume_pressed = any(be.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for be in CS.buttonEvents)if not self.CP.pcmCruise and not self.v_cruise_helper.v_cruise_initialized and resume_pressed:self.events.add(EventName.resumeBlocked)# Disable on rising edge of accelerator or brake. Also disable on brake when speed > 0if (CS.gasPressed and not self.CS_prev.gasPressed and self.disengage_on_accelerator) or \(CS.brakePressed and (not self.CS_prev.brakePressed or not CS.standstill)) or \(CS.regenBraking and (not self.CS_prev.regenBraking or not CS.standstill)):self.events.add(EventName.pedalPressed)if CS.brakePressed and CS.standstill:self.events.add(EventName.preEnableStandstill)if CS.gasPressed:self.events.add(EventName.gasPressedOverride)if not self.CP.notCar:self.events.add_from_msg(self.sm['driverMonitoringState'].events)# Add car events, ignore if CAN isn't validif CS.canValid:self.events.add_from_msg(CS.events)# Create events for temperature, disk space, and memoryif self.sm['deviceState'].thermalStatus >= ThermalStatus.red:self.events.add(EventName.overheat)if self.sm['deviceState'].freeSpacePercent < 7 and not SIMULATION:# under 7% of space free no enable allowedself.events.add(EventName.outOfSpace)if self.sm['deviceState'].memoryUsagePercent > 90 and not SIMULATION:self.events.add(EventName.lowMemory)# TODO: enable this once loggerd CPU usage is more reasonable#cpus = list(self.sm['deviceState'].cpuUsagePercent)#if max(cpus, default=0) > 95 and not SIMULATION:#  self.events.add(EventName.highCpuUsage)# Alert if fan isn't spinning for 5 secondsif self.sm['peripheralState'].pandaType != log.PandaState.PandaType.unknown:if self.sm['peripheralState'].fanSpeedRpm < 500 and self.sm['deviceState'].fanSpeedPercentDesired > 50:# allow enough time for the fan controller in the panda to recover from stallsif (self.sm.frame - self.last_functional_fan_frame) * DT_CTRL > 15.0:self.events.add(EventName.fanMalfunction)else:self.last_functional_fan_frame = self.sm.frame# Handle calibration statuscal_status = self.sm['liveCalibration'].calStatusif cal_status != log.LiveCalibrationData.Status.calibrated:if cal_status == log.LiveCalibrationData.Status.uncalibrated:self.events.add(EventName.calibrationIncomplete)elif cal_status == log.LiveCalibrationData.Status.recalibrating:if not self.recalibrating_seen:set_offroad_alert("Offroad_Recalibration", True)self.recalibrating_seen = Trueself.events.add(EventName.calibrationRecalibrating)else:self.events.add(EventName.calibrationInvalid)# Handle lane changeif self.sm['lateralPlan'].laneChangeState == LaneChangeState.preLaneChange:direction = self.sm['lateralPlan'].laneChangeDirectionif (CS.leftBlindspot and direction == LaneChangeDirection.left) or \(CS.rightBlindspot and direction == LaneChangeDirection.right):self.events.add(EventName.laneChangeBlocked)else:if direction == LaneChangeDirection.left:self.events.add(EventName.preLaneChangeLeft)else:self.events.add(EventName.preLaneChangeRight)elif self.sm['lateralPlan'].laneChangeState in (LaneChangeState.laneChangeStarting,LaneChangeState.laneChangeFinishing):self.events.add(EventName.laneChange)for i, pandaState in enumerate(self.sm['pandaStates']):# All pandas must match the list of safetyConfigs, and if outside this list, must be silent or noOutputif i < len(self.CP.safetyConfigs):safety_mismatch = pandaState.safetyModel != self.CP.safetyConfigs[i].safetyModel or \pandaState.safetyParam != self.CP.safetyConfigs[i].safetyParam or \pandaState.alternativeExperience != self.CP.alternativeExperienceelse:safety_mismatch = pandaState.safetyModel not in IGNORED_SAFETY_MODESif safety_mismatch or pandaState.safetyRxChecksInvalid or self.mismatch_counter >= 200:self.events.add(EventName.controlsMismatch)if log.PandaState.FaultType.relayMalfunction in pandaState.faults:self.events.add(EventName.relayMalfunction)# Handle HW and system malfunctions# Order is very intentional here. Be careful when modifying this.# All events here should at least have NO_ENTRY and SOFT_DISABLE.num_events = len(self.events)not_running = {p.name for p in self.sm['managerState'].processes if not p.running and p.shouldBeRunning}if self.sm.rcv_frame['managerState'] and (not_running - IGNORE_PROCESSES):self.events.add(EventName.processNotRunning)if not_running != self.not_running_prev:cloudlog.event("process_not_running", not_running=not_running, error=True)self.not_running_prev = not_runningelse:if not SIMULATION and not self.rk.lagging:if not self.sm.all_alive(self.camera_packets):self.events.add(EventName.cameraMalfunction)elif not self.sm.all_freq_ok(self.camera_packets):self.events.add(EventName.cameraFrameRate)if not REPLAY and self.rk.lagging:self.events.add(EventName.controlsdLagging)if len(self.sm['radarState'].radarErrors) or (not self.rk.lagging and not self.sm.all_checks(['radarState'])):self.events.add(EventName.radarFault)if not self.sm.valid['pandaStates']:self.events.add(EventName.usbError)if CS.canTimeout:self.events.add(EventName.canBusMissing)elif not CS.canValid:self.events.add(EventName.canError)# generic catch-all. ideally, a more specific event should be added above insteadcan_rcv_timeout = self.can_rcv_timeout_counter >= 5has_disable_events = self.events.contains(ET.NO_ENTRY) and (self.events.contains(ET.SOFT_DISABLE) or self.events.contains(ET.IMMEDIATE_DISABLE))no_system_errors = (not has_disable_events) or (len(self.events) == num_events)if (not self.sm.all_checks() or can_rcv_timeout) and no_system_errors:if not self.sm.all_alive():self.events.add(EventName.commIssue)elif not self.sm.all_freq_ok():self.events.add(EventName.commIssueAvgFreq)else:  # invalid or can_rcv_timeout.self.events.add(EventName.commIssue)logs = {'invalid': [s for s, valid in self.sm.valid.items() if not valid],'not_alive': [s for s, alive in self.sm.alive.items() if not alive],'not_freq_ok': [s for s, freq_ok in self.sm.freq_ok.items() if not freq_ok],'can_rcv_timeout': can_rcv_timeout,}if logs != self.logged_comm_issue:cloudlog.event("commIssue", error=True, **logs)self.logged_comm_issue = logselse:self.logged_comm_issue = Noneif not (self.CP.notCar and self.joystick_mode):if not self.sm['lateralPlan'].mpcSolutionValid:self.events.add(EventName.plannerError)if not self.sm['liveLocationKalman'].posenetOK:self.events.add(EventName.posenetInvalid)if not self.sm['liveLocationKalman'].deviceStable:self.events.add(EventName.deviceFalling)if not self.sm['liveLocationKalman'].inputsOK:self.events.add(EventName.locationdTemporaryError)if not self.sm['liveParameters'].valid and not TESTING_CLOSET and (not SIMULATION or REPLAY):self.events.add(EventName.paramsdTemporaryError)# conservative HW alert. if the data or frequency are off, locationd will throw an errorif any((self.sm.frame - self.sm.rcv_frame[s])*DT_CTRL > 10. for s in self.sensor_packets):self.events.add(EventName.sensorDataInvalid)if not REPLAY:# Check for mismatch between openpilot and car's PCMcruise_mismatch = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)self.cruise_mismatch_counter = self.cruise_mismatch_counter + 1 if cruise_mismatch else 0if self.cruise_mismatch_counter > int(6. / DT_CTRL):self.events.add(EventName.cruiseMismatch)# Check for FCWstock_long_is_braking = self.enabled and not self.CP.openpilotLongitudinalControl and CS.aEgo < -1.25model_fcw = self.sm['modelV2'].meta.hardBrakePredicted and not CS.brakePressed and not stock_long_is_brakingplanner_fcw = self.sm['longitudinalPlan'].fcw and self.enabledif planner_fcw or model_fcw:self.events.add(EventName.fcw)for m in messaging.drain_sock(self.log_sock, wait_for_one=False):try:msg = m.androidLog.messageif any(err in msg for err in ("ERROR_CRC", "ERROR_ECC", "ERROR_STREAM_UNDERFLOW", "APPLY FAILED")):csid = msg.split("CSID:")[-1].split(" ")[0]evt = CSID_MAP.get(csid, None)if evt is not None:self.events.add(evt)except UnicodeDecodeError:pass# TODO: fix simulatorif not SIMULATION or REPLAY:if not self.sm['liveLocationKalman'].gpsOK and self.sm['liveLocationKalman'].inputsOK and (self.distance_traveled > 1000):# Not show in first 1 km to allow for driving out of garage. This event shows after 5 minutesself.events.add(EventName.noGps)if self.sm['modelV2'].frameDropPerc > 20:self.events.add(EventName.modeldLagging)def data_sample(self):"""Receive data from sockets and update carState"""# Update carState from CANcan_strs = messaging.drain_sock_raw(self.can_sock, wait_for_one=True)CS = self.CI.update(self.CC, can_strs)if len(can_strs) and REPLAY:self.can_log_mono_time = messaging.log_from_bytes(can_strs[0]).logMonoTimeself.sm.update(0)if not self.initialized:all_valid = CS.canValid and self.sm.all_checks()timed_out = self.sm.frame * DT_CTRL > (6. if REPLAY else 3.5)if all_valid or timed_out or (SIMULATION and not REPLAY):available_streams = VisionIpcClient.available_streams("camerad", block=False)if VisionStreamType.VISION_STREAM_ROAD not in available_streams:self.sm.ignore_alive.append('roadCameraState')if VisionStreamType.VISION_STREAM_WIDE_ROAD not in available_streams:self.sm.ignore_alive.append('wideRoadCameraState')if not self.CP.passive:self.CI.init(self.CP, self.can_sock, self.pm.sock['sendcan'])self.initialized = Trueself.set_initial_state()self.params.put_bool_nonblocking("ControlsReady", True)# Check for CAN timeoutif not can_strs:self.can_rcv_timeout_counter += 1self.can_rcv_cum_timeout_counter += 1else:self.can_rcv_timeout_counter = 0# When the panda and controlsd do not agree on controls_allowed# we want to disengage openpilot. However the status from the panda goes through# another socket other than the CAN messages and one can arrive earlier than the other.# Therefore we allow a mismatch for two samples, then we trigger the disengagement.if not self.enabled:self.mismatch_counter = 0# All pandas not in silent mode must have controlsAllowed when openpilot is enabledif self.enabled and any(not ps.controlsAllowed for ps in self.sm['pandaStates']if ps.safetyModel not in IGNORED_SAFETY_MODES):self.mismatch_counter += 1self.distance_traveled += CS.vEgo * DT_CTRLreturn CSdef state_transition(self, CS):"""Compute conditional state transitions and execute actions on state transitions"""self.v_cruise_helper.update_v_cruise(CS, self.enabled, self.is_metric)# decrement the soft disable timer at every step, as it's reset on# entrance in SOFT_DISABLING stateself.soft_disable_timer = max(0, self.soft_disable_timer - 1)self.current_alert_types = [ET.PERMANENT]# ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDINGif self.state != State.disabled:# user and immediate disable always have priority in a non-disabled stateif self.events.contains(ET.USER_DISABLE):self.state = State.disabledself.current_alert_types.append(ET.USER_DISABLE)elif self.events.contains(ET.IMMEDIATE_DISABLE):self.state = State.disabledself.current_alert_types.append(ET.IMMEDIATE_DISABLE)else:# ENABLEDif self.state == State.enabled:if self.events.contains(ET.SOFT_DISABLE):self.state = State.softDisablingself.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)self.current_alert_types.append(ET.SOFT_DISABLE)elif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):self.state = State.overridingself.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]# SOFT DISABLINGelif self.state == State.softDisabling:if not self.events.contains(ET.SOFT_DISABLE):# no more soft disabling condition, so go back to ENABLEDself.state = State.enabledelif self.soft_disable_timer > 0:self.current_alert_types.append(ET.SOFT_DISABLE)elif self.soft_disable_timer <= 0:self.state = State.disabled# PRE ENABLINGelif self.state == State.preEnabled:if not self.events.contains(ET.PRE_ENABLE):self.state = State.enabledelse:self.current_alert_types.append(ET.PRE_ENABLE)# OVERRIDINGelif self.state == State.overriding:if self.events.contains(ET.SOFT_DISABLE):self.state = State.softDisablingself.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)self.current_alert_types.append(ET.SOFT_DISABLE)elif not (self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL)):self.state = State.enabledelse:self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]# DISABLEDelif self.state == State.disabled:if self.events.contains(ET.ENABLE):if self.events.contains(ET.NO_ENTRY):self.current_alert_types.append(ET.NO_ENTRY)else:if self.events.contains(ET.PRE_ENABLE):self.state = State.preEnabledelif self.events.contains(ET.OVERRIDE_LATERAL) or self.events.contains(ET.OVERRIDE_LONGITUDINAL):self.state = State.overridingelse:self.state = State.enabledself.current_alert_types.append(ET.ENABLE)self.v_cruise_helper.initialize_v_cruise(CS, self.experimental_mode)# Check if openpilot is engaged and actuators are enabledself.enabled = self.state in ENABLED_STATESself.active = self.state in ACTIVE_STATESif self.active:self.current_alert_types.append(ET.WARNING)def state_control(self, CS):"""Given the state, this function returns a CarControl packet"""# Update VehicleModellp = self.sm['liveParameters']x = max(lp.stiffnessFactor, 0.1)sr = max(lp.steerRatio, 0.1)self.VM.update_params(x, sr)# Update Torque Paramsif self.CP.lateralTuning.which() == 'torque':torque_params = self.sm['liveTorqueParameters']if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,torque_params.frictionCoefficientFiltered)lat_plan = self.sm['lateralPlan']long_plan = self.sm['longitudinalPlan']CC = car.CarControl.new_message()CC.enabled = self.enabled# Check which actuators can be enabledstandstill = CS.vEgo <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstillCC.latActive = self.active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \(not standstill or self.joystick_mode)CC.longActive = self.enabled and not self.events.contains(ET.OVERRIDE_LONGITUDINAL) and self.CP.openpilotLongitudinalControlactuators = CC.actuatorsactuators.longControlState = self.LoC.long_control_state# Enable blinkers while lane changingif self.sm['lateralPlan'].laneChangeState != LaneChangeState.off:CC.leftBlinker = self.sm['lateralPlan'].laneChangeDirection == LaneChangeDirection.leftCC.rightBlinker = self.sm['lateralPlan'].laneChangeDirection == LaneChangeDirection.rightif CS.leftBlinker or CS.rightBlinker:self.last_blinker_frame = self.sm.frame# State specific actionsif not CC.latActive:self.LaC.reset()if not CC.longActive:self.LoC.reset(v_pid=CS.vEgo)if not self.joystick_mode:# accel PID looppid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, self.v_cruise_helper.v_cruise_kph * CV.KPH_TO_MS)t_since_plan = (self.sm.frame - self.sm.rcv_frame['longitudinalPlan']) * DT_CTRLactuators.accel = self.LoC.update(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan)# Steering PID loop and lateral MPCself.desired_curvature, self.desired_curvature_rate = get_lag_adjusted_curvature(self.CP, CS.vEgo,lat_plan.psis,lat_plan.curvatures,lat_plan.curvatureRates)actuators.steer, actuators.steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,self.steer_limited, self.desired_curvature,self.desired_curvature_rate, self.sm['liveLocationKalman'])actuators.curvature = self.desired_curvatureelse:lac_log = log.ControlsState.LateralDebugState.new_message()if self.sm.rcv_frame['testJoystick'] > 0:# reset joystick if it hasn't been received in a whileshould_reset_joystick = (self.sm.frame - self.sm.rcv_frame['testJoystick'])*DT_CTRL > 0.2if not should_reset_joystick:joystick_axes = self.sm['testJoystick'].axeselse:joystick_axes = [0.0, 0.0]if CC.longActive:actuators.accel = 4.0*clip(joystick_axes[0], -1, 1)if CC.latActive:steer = clip(joystick_axes[1], -1, 1)# max angle is 45 for angle-based cars, max curvature is 0.02actuators.steer, actuators.steeringAngleDeg, actuators.curvature = steer, steer * 90., steer * -0.02lac_log.active = self.activelac_log.steeringAngleDeg = CS.steeringAngleDeglac_log.output = actuators.steerlac_log.saturated = abs(actuators.steer) >= 0.9if CS.steeringPressed:self.last_steering_pressed_frame = self.sm.framerecent_steer_pressed = (self.sm.frame - self.last_steering_pressed_frame)*DT_CTRL < 2.0# Send a "steering required alert" if saturation count has reached the limitif lac_log.active and not recent_steer_pressed and not self.CP.notCar:if self.CP.lateralTuning.which() == 'torque' and not self.joystick_mode:undershooting = abs(lac_log.desiredLateralAccel) / abs(1e-3 + lac_log.actualLateralAccel) > 1.2turning = abs(lac_log.desiredLateralAccel) > 1.0good_speed = CS.vEgo > 5max_torque = abs(self.last_actuators.steer) > 0.99if undershooting and turning and good_speed and max_torque:lac_log.active and self.events.add(EventName.steerSaturated)elif lac_log.saturated:dpath_points = lat_plan.dPathPointsif len(dpath_points):# Check if we deviated from the path# TODO use desired vs actual curvatureif self.CP.steerControlType == car.CarParams.SteerControlType.angle:steering_value = actuators.steeringAngleDegelse:steering_value = actuators.steerleft_deviation = steering_value > 0 and dpath_points[0] < -0.20right_deviation = steering_value < 0 and dpath_points[0] > 0.20if left_deviation or right_deviation:self.events.add(EventName.steerSaturated)# Ensure no NaNs/Infsfor p in ACTUATOR_FIELDS:attr = getattr(actuators, p)if not isinstance(attr, SupportsFloat):continueif not math.isfinite(attr):cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")setattr(actuators, p, 0.0)return CC, lac_logdef publish_logs(self, CS, start_time, CC, lac_log):"""Send actuators and hud commands to the car, send controlsstate and MPC logging"""# Orientation and angle rates can be useful for carcontroller# Only calibrated (car) frame is relevant for the carcontrollerorientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value)if len(orientation_value) > 2:CC.orientationNED = orientation_valueangular_rate_value = list(self.sm['liveLocationKalman'].angularVelocityCalibrated.value)if len(angular_rate_value) > 2:CC.angularVelocity = angular_rate_valueCC.cruiseControl.override = self.enabled and not CC.longActive and self.CP.openpilotLongitudinalControlCC.cruiseControl.cancel = CS.cruiseState.enabled and (not self.enabled or not self.CP.pcmCruise)if self.joystick_mode and self.sm.rcv_frame['testJoystick'] > 0 and self.sm['testJoystick'].buttons[0]:CC.cruiseControl.cancel = Truespeeds = self.sm['longitudinalPlan'].speedsif len(speeds):CC.cruiseControl.resume = self.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1hudControl = CC.hudControlhudControl.setSpeed = float(self.v_cruise_helper.v_cruise_cluster_kph * CV.KPH_TO_MS)hudControl.speedVisible = self.enabledhudControl.lanesVisible = self.enabledhudControl.leadVisible = self.sm['longitudinalPlan'].hasLeadhudControl.rightLaneVisible = TruehudControl.leftLaneVisible = Truerecent_blinker = (self.sm.frame - self.last_blinker_frame) * DT_CTRL < 5.0  # 5s blinker cooldownldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \and not CC.latActive and self.sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.calibratedmodel_v2 = self.sm['modelV2']desire_prediction = model_v2.meta.desirePredictionif len(desire_prediction) and ldw_allowed:right_lane_visible = model_v2.laneLineProbs[2] > 0.5left_lane_visible = model_v2.laneLineProbs[1] > 0.5l_lane_change_prob = desire_prediction[Desire.laneChangeLeft]r_lane_change_prob = desire_prediction[Desire.laneChangeRight]lane_lines = model_v2.laneLinesl_lane_close = left_lane_visible and (lane_lines[1].y[0] > -(1.08 + CAMERA_OFFSET))r_lane_close = right_lane_visible and (lane_lines[2].y[0] < (1.08 - CAMERA_OFFSET))hudControl.leftLaneDepart = bool(l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close)hudControl.rightLaneDepart = bool(r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close)if hudControl.rightLaneDepart or hudControl.leftLaneDepart:self.events.add(EventName.ldw)clear_event_types = set()if ET.WARNING not in self.current_alert_types:clear_event_types.add(ET.WARNING)if self.enabled:clear_event_types.add(ET.NO_ENTRY)alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer])self.AM.add_many(self.sm.frame, alerts)current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)if current_alert:hudControl.visualAlert = current_alert.visual_alertif not self.CP.passive and self.initialized:# send car controls over cannow_nanos = self.can_log_mono_time if REPLAY else int(time.monotonic() * 1e9)self.last_actuators, can_sends = self.CI.apply(CC, now_nanos)self.pm.send('sendcan', can_list_to_can_capnp(can_sends, msgtype='sendcan', valid=CS.canValid))CC.actuatorsOutput = self.last_actuatorsif self.CP.steerControlType == car.CarParams.SteerControlType.angle:self.steer_limited = abs(CC.actuators.steeringAngleDeg - CC.actuatorsOutput.steeringAngleDeg) > \STEER_ANGLE_SATURATION_THRESHOLDelse:self.steer_limited = abs(CC.actuators.steer - CC.actuatorsOutput.steer) > 1e-2force_decel = (self.sm['driverMonitoringState'].awarenessStatus < 0.) or \(self.state == State.softDisabling)# Curvature & Steering anglelp = self.sm['liveParameters']steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)# controlsStatedat = messaging.new_message('controlsState')dat.valid = CS.canValidcontrolsState = dat.controlsStateif current_alert:controlsState.alertText1 = current_alert.alert_text_1controlsState.alertText2 = current_alert.alert_text_2controlsState.alertSize = current_alert.alert_sizecontrolsState.alertStatus = current_alert.alert_statuscontrolsState.alertBlinkingRate = current_alert.alert_ratecontrolsState.alertType = current_alert.alert_typecontrolsState.alertSound = current_alert.audible_alertcontrolsState.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']controlsState.lateralPlanMonoTime = self.sm.logMonoTime['lateralPlan']controlsState.enabled = self.enabledcontrolsState.active = self.activecontrolsState.curvature = curvaturecontrolsState.desiredCurvature = self.desired_curvaturecontrolsState.desiredCurvatureRate = self.desired_curvature_ratecontrolsState.state = self.statecontrolsState.engageable = not self.events.contains(ET.NO_ENTRY)controlsState.longControlState = self.LoC.long_control_statecontrolsState.vPid = float(self.LoC.v_pid)controlsState.vCruise = float(self.v_cruise_helper.v_cruise_kph)controlsState.vCruiseCluster = float(self.v_cruise_helper.v_cruise_cluster_kph)controlsState.upAccelCmd = float(self.LoC.pid.p)controlsState.uiAccelCmd = float(self.LoC.pid.i)controlsState.ufAccelCmd = float(self.LoC.pid.f)controlsState.cumLagMs = -self.rk.remaining * 1000.controlsState.startMonoTime = int(start_time * 1e9)controlsState.forceDecel = bool(force_decel)controlsState.canErrorCounter = self.can_rcv_cum_timeout_countercontrolsState.experimentalMode = self.experimental_modelat_tuning = self.CP.lateralTuning.which()if self.joystick_mode:controlsState.lateralControlState.debugState = lac_logelif self.CP.steerControlType == car.CarParams.SteerControlType.angle:controlsState.lateralControlState.angleState = lac_logelif lat_tuning == 'pid':controlsState.lateralControlState.pidState = lac_logelif lat_tuning == 'torque':controlsState.lateralControlState.torqueState = lac_logself.pm.send('controlsState', dat)# carStatecar_events = self.events.to_msg()cs_send = messaging.new_message('carState')cs_send.valid = CS.canValidcs_send.carState = CScs_send.carState.events = car_eventsself.pm.send('carState', cs_send)# onroadEvents - logged every second or on changeif (self.sm.frame % int(1. / DT_CTRL) == 0) or (self.events.names != self.events_prev):ce_send = messaging.new_message('onroadEvents', len(self.events))ce_send.valid = Truece_send.onroadEvents = car_eventsself.pm.send('onroadEvents', ce_send)self.events_prev = self.events.names.copy()# carParams - logged every 50 seconds (> 1 per segment)if (self.sm.frame % int(50. / DT_CTRL) == 0):cp_send = messaging.new_message('carParams')cp_send.valid = Truecp_send.carParams = self.CPself.pm.send('carParams', cp_send)# carControlcc_send = messaging.new_message('carControl')cc_send.valid = CS.canValidcc_send.carControl = CCself.pm.send('carControl', cc_send)# copy CarControl to pass to CarInterface on the next iterationself.CC = CCdef step(self):start_time = time.monotonic()self.prof.checkpoint("Ratekeeper", ignore=True)# Sample data from sockets and get a carStateCS = self.data_sample()cloudlog.timestamp("Data sampled")self.prof.checkpoint("Sample")self.update_events(CS)cloudlog.timestamp("Events updated")if not self.CP.passive and self.initialized:# Update control stateself.state_transition(CS)self.prof.checkpoint("State transition")# Compute actuators (runs PID loops and lateral MPC)CC, lac_log = self.state_control(CS)self.prof.checkpoint("State Control")# Publish dataself.publish_logs(CS, start_time, CC, lac_log)self.prof.checkpoint("Sent")self.CS_prev = CSdef params_thread(self, evt):while not evt.is_set():self.is_metric = self.params.get_bool("IsMetric")self.experimental_mode = self.params.get_bool("ExperimentalMode") and self.CP.openpilotLongitudinalControlif self.CP.notCar:self.joystick_mode = self.params.get_bool("JoystickDebugMode")time.sleep(0.1)def controlsd_thread(self):e = threading.Event()t = threading.Thread(target=self.params_thread, args=(e, ))try:t.start()while True:self.step()self.rk.monitor_time()self.prof.display()except SystemExit:e.set()t.join()def main():controls = Controls()controls.controlsd_thread()if __name__ == "__main__":main()

 这段代码主要涉及到Python3的多个模块和库的导入,包括os、math、time、threading等基础库,以及一些特定于自动驾驶的库,如cereal、openpilot等。这些库被用于处理各种任务,如操作系统调用、数学计算、时间管理和线程处理等,以及一些与自动驾驶相关的功能,如车辆控制、实时处理、参数管理、视觉处理等。
具体来说,代码中导入了os模块用于进行操作系统相关的操作,如文件和目录管理;math模块用于进行数学运算;time模块用于进行时间相关的操作;threading模块用于进行多线程编程。同时,还导入了多个自定义库和模块,如cereal、openpilot等,这些库提供了更多的功能,如数据序列化、自动驾驶控制、实时处理等。
在自动驾驶中,这些库被用于处理各种任务,如车辆控制、传感器数据处理、路径规划等。通过这些库的组合使用,可以实现高度自动化的驾驶系统,从而提高驾驶的安全性和舒适性。


如果我没猜错的话,这东西你应该没见过

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/323725.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用python提取word中的所有图片

使用word中提取的方式图片会丢失清晰度&#xff0c;使用python写一个脚本&#xff0c;程序运行将弹出对话框选择一个word文件&#xff0c;然后在弹出一个对话框选择一个文件夹保存word中的文件。将该word中的所有图片都保存成png格式&#xff0c;并命名成image_i的样式。 程序…

《知识扫盲》ROS和ROS2对比

文章摘选自&#xff1a;ROS与ROS2对比 1.ROS问题举例 ROS的设计目标是简化机器人的开发&#xff0c;如何简化呢&#xff1f;ROS为此设计了一整套通信机制&#xff08;话题、服务、参数、动作&#xff09;。 通过这些通信机制&#xff0c;ROS实现了将机器人的各个组件给的连接…

Prometheus-Alertmanage钉钉实现告警

获取钉钉的webhook地址 1、注册企业钉钉 a、注册企业钉钉 浏览器打开钉钉注册页面 填入手机号码&#xff0c;填入获取到的验证码&#xff0c;点注册 填入企业资料并注册 注册成功后&#xff0c;扫描二维码下载钉钉&#xff0c;如下图&#xff1a; b、添加机器人 管理后台 因…

[AutoSar]DaVinci Developer 命名规范

目录 关键词平台说明一、背景二、Component命名约定2.1Component Type Composition&#xff08;CtCo&#xff09;2.2Component Type Application&#xff08; CtAp&#xff09;2.3Component Type SensorActuator&#xff08;CtSa&#xff09;2.4Component Type Complex Driver&…

小型洗衣机哪个牌子质量好?五款内衣洗衣机便宜好用的牌子推荐

随着大家工作的压力越来越大&#xff0c;下了班之后只能想躺平&#xff0c;在洗完澡之后看着还需要手洗的内衣裤真的很头疼。有些小伙伴还有会攒几天再丢进去洗衣机里面一起&#xff0c;而且这样子是非常不好的&#xff0c;用过的内衣裤长时间不清洗容易滋生细菌&#xff0c;而…

pdf转换成word怎么转?一篇文章教你轻松搞定

pdf转换成word怎么转&#xff1f;你是否曾经遇到过需要将PDF文件转换成Word格式的情况&#xff1f;比如&#xff0c;你需要编辑一个文档&#xff0c;或者想将一些电子书或报告复制到Word中以便于编辑或重新排版。在这种情况下&#xff0c;如何将PDF文件转换成Word格式呢&#x…

过滤器亚马逊审核UL900报告标准

过滤器亚马逊审核UL900防火等级检测标准,要符合ISO17025资质实验室出具的报告才能成功的上架亚马逊平台。 过滤器&#xff08;filter&#xff09;是输送介质管道上不可缺少的一种装置&#xff0c;通常安装在减压阀、泄压阀、定水位阀 ,方工过滤器其它设备的进口端设备。过滤器…

申请CSDN博客专家的历程

今天是2024年第一周的周五下午13&#xff1a;55&#xff0c;我怀着非常非常激动的心情写下这篇博客记录这难忘的时刻&#xff1a;我的博客专家认证通过了&#xff01;现在唯一想说的就是非常感谢CSDN平台&#xff0c;我会继续努力&#xff0c;把最好的分享给大家。 与CSDN的缘分…

ReactNative 常见问题及处理办法(加固混淆)

ReactNative 常见问题及处理办法&#xff08;加固混淆&#xff09; 文章目录 摘要 引言 正文ScrollView内无法滑动RN热更新中的文件引用问题RN中获取高度的技巧RN强制横屏UI适配问题低版本RN&#xff08;0.63以下&#xff09;适配iOS14图片无法显示问题RN清理缓存RN navigat…

数据结构—图(上)

文章目录 12.图(上)(1).图的基本概念#1.图的基本定义#2.边的分类#3.数据结构的一些规定#4.子图#5.完全图#6.路径#7.连通性和连通分量#8.度 (2).图的存储方式#1.邻接矩阵#2.邻接表 (3).图的遍历#1.深度优先搜索(Depth First Search)i.走个迷宫ii.DFS的思想iii.代码实现 #2.广度优…

IDAPython详细版(二)

六&#xff1a;操作数 可以使用idc.get_openrand_typed(ea,n)得到操作数的类型。ea是地址&#xff0c;n是索引 这里有8种不同类型的操作数类 0_void 如果一个指令木有任何操作数它将返回0 0_reg 如果一个操作数是一个普通的寄存器将返回此类型。这个值在内部表示为1. o_mem …

数据分析求职-岗位介绍

这是咱们干货开始的第一篇文章&#xff0c;后续我尽量会保持日更的节奏和大家做分享~ 在未来所有分享的内容展开之前&#xff0c;咱们有必要先彻底、深入地了解下数据分析这个岗位。如果你还在犹豫是否要走数据分析的路&#xff0c;或者你已经拿了数据分析的offer想了解下将来…