From da95fd3019473b1f0dd8d671c22c1e618092cdb2 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Tue, 14 Nov 2023 01:19:39 +0800 Subject: [PATCH] replay: simplify the code for allow/block list (#30449) simplify allow/block list --- tools/cabana/streams/replaystream.cc | 2 +- tools/replay/SConscript | 16 ++---- tools/replay/logreader.cc | 17 ++---- tools/replay/logreader.h | 6 +- tools/replay/main.cc | 5 +- tools/replay/replay.cc | 83 ++++++++++------------------ tools/replay/replay.h | 8 +-- tools/replay/route.cc | 9 +-- tools/replay/route.h | 4 +- tools/replay/tests/test_replay.cc | 2 +- 10 files changed, 50 insertions(+), 102 deletions(-) diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index e94aefec2b2e3d..c61c81d56fe5e6 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -46,7 +46,7 @@ void ReplayStream::mergeSegments() { bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) { replay.reset(new Replay(route, {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"}, - {}, {}, nullptr, replay_flags, data_dir, this)); + {}, nullptr, replay_flags, data_dir, this)); replay->setSegmentCacheLimit(settings.max_cached_minutes); replay->installEventFilter(event_filter, this); QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); diff --git a/tools/replay/SConscript b/tools/replay/SConscript index bce7512e44e1d7..db8447003b1ed8 100644 --- a/tools/replay/SConscript +++ b/tools/replay/SConscript @@ -1,25 +1,21 @@ -import os -Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', - 'cereal', 'transformations') +Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', 'cereal') base_frameworks = qt_env['FRAMEWORKS'] -base_libs = [common, messaging, cereal, visionipc, transformations, 'zmq', - 'capnp', 'kj', 'm', 'ssl', 'crypto', 'pthread'] + qt_env["LIBS"] +base_libs = [common, messaging, cereal, visionipc, 'zmq', + 'capnp', 'kj', 'm', 'ssl', 'crypto', 'pthread', 'qt_util'] + qt_env["LIBS"] if arch == "Darwin": base_frameworks.append('OpenCL') else: base_libs.append('OpenCL') -qt_libs = ['qt_util'] + base_libs qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"] - -replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=qt_libs, FRAMEWORKS=base_frameworks) +replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) Export('replay_lib') -replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + qt_libs +replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + base_libs qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks) if GetOption('extras'): - qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, qt_libs]) + qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, base_libs]) diff --git a/tools/replay/logreader.cc b/tools/replay/logreader.cc index 74aebceae59643..c92ff4753f750b 100644 --- a/tools/replay/logreader.cc +++ b/tools/replay/logreader.cc @@ -1,6 +1,7 @@ #include "tools/replay/logreader.h" #include +#include "tools/replay/filereader.h" #include "tools/replay/util.h" Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(amsg), frame(frame) { @@ -40,9 +41,7 @@ LogReader::~LogReader() { } } -bool LogReader::load(const std::string &url, std::atomic *abort, - const std::set &allow, - bool local_cache, int chunk_size, int retries) { +bool LogReader::load(const std::string &url, std::atomic *abort, bool local_cache, int chunk_size, int retries) { raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort); if (raw_.empty()) return false; @@ -50,15 +49,15 @@ bool LogReader::load(const std::string &url, std::atomic *abort, raw_ = decompressBZ2(raw_, abort); if (raw_.empty()) return false; } - return parse(allow, abort); + return parse(abort); } bool LogReader::load(const std::byte *data, size_t size, std::atomic *abort) { raw_.assign((const char *)data, size); - return parse({}, abort); + return parse(abort); } -bool LogReader::parse(const std::set &allow, std::atomic *abort) { +bool LogReader::parse(std::atomic *abort) { try { kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); while (words.size() > 0 && !(abort && *abort)) { @@ -67,12 +66,6 @@ bool LogReader::parse(const std::set &allow, std::atomicwhich) == allow.end()) { - words = kj::arrayPtr(evt->reader.getEnd(), words.end()); - delete evt; - continue; - } - // Add encodeIdx packet again as a frame packet for the video stream if (evt->which == cereal::Event::ROAD_ENCODE_IDX || evt->which == cereal::Event::DRIVER_ENCODE_IDX || diff --git a/tools/replay/logreader.h b/tools/replay/logreader.h index 77d751a91be3e2..73f822d16cb730 100644 --- a/tools/replay/logreader.h +++ b/tools/replay/logreader.h @@ -6,13 +6,11 @@ #endif #include -#include #include #include #include "cereal/gen/cpp/log.capnp.h" #include "system/camerad/cameras/camera_common.h" -#include "tools/replay/filereader.h" const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const int MAX_CAMERAS = std::size(ALL_CAMERAS); @@ -55,13 +53,13 @@ class LogReader { public: LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); ~LogReader(); - bool load(const std::string &url, std::atomic *abort = nullptr, const std::set &allow = {}, + bool load(const std::string &url, std::atomic *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0); bool load(const std::byte *data, size_t size, std::atomic *abort = nullptr); std::vector events; private: - bool parse(const std::set &allow, std::atomic *abort); + bool parse(std::atomic *abort); std::string raw_; #ifdef HAS_MEMORY_RESOURCE std::unique_ptr mbr_; diff --git a/tools/replay/main.cc b/tools/replay/main.cc index 98a0bb3333d2c1..945cb4cd091425 100644 --- a/tools/replay/main.cc +++ b/tools/replay/main.cc @@ -13,7 +13,6 @@ int main(int argc, char *argv[]) { QCoreApplication app(argc, argv); - const QStringList base_blacklist = {"uiDebug", "userFlag"}; const std::tuple flags[] = { {"dcam", REPLAY_FLAG_DCAM, "load driver camera"}, {"ecam", REPLAY_FLAG_ECAM, "load wide road camera"}, @@ -22,7 +21,7 @@ int main(int argc, char *argv[]) { {"qcam", REPLAY_FLAG_QCAMERA, "load qcamera"}, {"no-hw-decoder", REPLAY_FLAG_NO_HW_DECODER, "disable HW video decoding"}, {"no-vipc", REPLAY_FLAG_NO_VIPC, "do not output video"}, - {"all", REPLAY_FLAG_ALL_SERVICES, "do output all messages including " + base_blacklist.join(", ") + + {"all", REPLAY_FLAG_ALL_SERVICES, "do output all messages including uiDebug, userFlag" ". this may causes issues when used along with UI"} }; @@ -64,7 +63,7 @@ int main(int argc, char *argv[]) { op_prefix.reset(new OpenpilotPrefix(prefix.toStdString())); } - Replay *replay = new Replay(route, allow, block, base_blacklist, nullptr, replay_flags, parser.value("data_dir"), &app); + Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app); if (!parser.value("c").isEmpty()) { replay->setSegmentCacheLimit(parser.value("c").toInt()); } diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 81ecca1b8b6039..1ec484d677b72a 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -9,35 +9,23 @@ #include "common/timing.h" #include "tools/replay/util.h" -Replay::Replay(QString route, QStringList allow, QStringList block, QStringList base_blacklist, SubMaster *sm_, uint32_t flags, QString data_dir, QObject *parent) - : sm(sm_), flags_(flags), QObject(parent) { - std::vector s; +Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, + uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) { + if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) { + block << "uiDebug" << "userFlag"; + } auto event_struct = capnp::Schema::from().asStruct(); sockets_.resize(event_struct.getUnionFields().size()); - for (const auto &it : services) { - auto name = it.second.name.c_str(); - uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue(); - if ((which == cereal::Event::Which::UI_DEBUG || which == cereal::Event::Which::USER_FLAG) && - !(flags & REPLAY_FLAG_ALL_SERVICES) && - !allow.contains(name)) { - continue; - } - - if ((allow.empty() || allow.contains(name)) && !block.contains(name)) { - sockets_[which] = name; - if (!allow.empty() || !block.empty()) { - allow_list.insert((cereal::Event::Which)which); - } - s.push_back(name); + for (const auto &[name, _] : services) { + if (!block.contains(name.c_str()) && (allow.empty() || allow.contains(name.c_str()))) { + uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue(); + sockets_[which] = name.c_str(); } } - if (!allow_list.empty()) { - // the following events are needed for replay to work properly. - allow_list.insert(cereal::Event::Which::INIT_DATA); - allow_list.insert(cereal::Event::Which::CAR_PARAMS); - } - + std::vector s; + std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s), + [](const char *name) { return name != nullptr; }); qDebug() << "services " << s; qDebug() << "loading route " << route; @@ -150,7 +138,7 @@ void Replay::buildTimeline() { const auto &route_segments = route_->segments(); for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) { std::shared_ptr log(new LogReader()); - if (!log->load(it->second.qlog.toStdString(), &exit_, {}, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; + if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; for (const Event *e : log->events) { if (e->which == cereal::Event::Which::CONTROLS_STATE) { @@ -233,30 +221,17 @@ void Replay::segmentLoadFinished(bool success) { } void Replay::queueSegment() { - if (segments_.empty()) return; - - SegmentMap::iterator begin, cur; - begin = cur = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); - int distance = std::max(std::ceil(segment_cache_limit / 2.0) - 1, segment_cache_limit - std::distance(cur, segments_.end())); - for (int i = 0; begin != segments_.begin() && i < distance; ++i) { - --begin; - } - auto end = begin; - for (int i = 0; end != segments_.end() && i < segment_cache_limit; ++i) { - ++end; - } + auto cur = segments_.lower_bound(current_segment_.load()); + if (cur == segments_.end()) return; + auto begin = std::prev(cur, std::min(segment_cache_limit / 2, std::distance(segments_.begin(), cur))); + auto end = std::next(begin, std::min(segment_cache_limit, segments_.size())); // load one segment at a time - for (auto it = cur; it != end; ++it) { - auto &[n, seg] = *it; - if ((seg && !seg->isLoaded()) || !seg) { - if (!seg) { - rDebug("loading segment %d...", n); - seg = std::make_unique(n, route_->at(n), flags_, allow_list); - QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); - } - break; - } + auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); }); + if (it != end && !it->second) { + rDebug("loading segment %d...", it->first); + it->second = std::make_unique(it->first, route_->at(it->first), flags_); + QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); } mergeSegments(begin, end); @@ -293,13 +268,11 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: new_events_->clear(); new_events_->reserve(new_events_size); for (int n : segments_need_merge) { - const auto &e = segments_[n]->log->events; - if (e.size() > 0) { - auto insert_from = e.begin(); - if (new_events_->size() > 0 && (*insert_from)->which == cereal::Event::Which::INIT_DATA) ++insert_from; - auto middle = new_events_->insert(new_events_->end(), insert_from, e.end()); - std::inplace_merge(new_events_->begin(), middle, new_events_->end(), Event::lessThan()); - } + size_t size = new_events_->size(); + const auto &events = segments_[n]->log->events; + std::copy_if(events.begin(), events.end(), std::back_inserter(*new_events_), + [this](auto e) { return e->which < sockets_.size() && sockets_[e->which] != nullptr; }); + std::inplace_merge(new_events_->begin(), new_events_->begin() + size, new_events_->end(), Event::lessThan()); } if (stream_thread_) { @@ -414,7 +387,7 @@ void Replay::stream() { cur_mono_time_ = evt->mono_time; setCurrentSegment(toSeconds(cur_mono_time_) / 60); - if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) { + if (sockets_[cur_which] != nullptr) { // keep time long etime = (cur_mono_time_ - evt_start_ts) / speed_; long rtime = nanos_since_boot() - loop_start_ts; diff --git a/tools/replay/replay.h b/tools/replay/replay.h index e26ef883b67b80..1144da26014293 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include @@ -50,8 +49,8 @@ class Replay : public QObject { Q_OBJECT public: - Replay(QString route, QStringList allow, QStringList block, QStringList base_blacklist, SubMaster *sm = nullptr, - uint32_t flags = REPLAY_FLAG_NONE, QString data_dir = "", QObject *parent = 0); + Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr, + uint32_t flags = REPLAY_FLAG_NONE, QString data_dir = "", QObject *parent = 0); ~Replay(); bool load(); void start(int seconds = 0); @@ -114,8 +113,6 @@ protected slots: } QThread *stream_thread_ = nullptr; - - // logs std::mutex stream_lock_; std::condition_variable stream_cv_; std::atomic updating_events_ = false; @@ -142,7 +139,6 @@ protected slots: std::mutex timeline_lock; QFuture timeline_future; std::vector> timeline; - std::set allow_list; std::string car_fingerprint_; std::atomic speed_ = 1.0; replayEventFilter event_filter = nullptr; diff --git a/tools/replay/route.cc b/tools/replay/route.cc index 15a57e5e422b9c..9168d25b35bc3e 100644 --- a/tools/replay/route.cc +++ b/tools/replay/route.cc @@ -7,9 +7,6 @@ #include #include #include -#include -#include -#include #include "selfdrive/ui/qt/api.h" #include "system/hardware/hw.h" @@ -102,9 +99,7 @@ void Route::addFileToSegment(int n, const QString &file) { // class Segment -Segment::Segment(int n, const SegmentFile &files, uint32_t flags, - const std::set &allow) - : seg_num(n), flags(flags), allow(allow) { +Segment::Segment(int n, const SegmentFile &files, uint32_t flags) : seg_num(n), flags(flags) { // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog const std::array file_list = { (flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam, @@ -135,7 +130,7 @@ void Segment::loadFile(int id, const std::string file) { success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3); } else { log = std::make_unique(); - success = log->load(file, &abort_, allow, local_cache, 0, 3); + success = log->load(file, &abort_, local_cache, 0, 3); } if (!success) { diff --git a/tools/replay/route.h b/tools/replay/route.h index 7207ff4f5cb38c..4dad7a1f37f9e7 100644 --- a/tools/replay/route.h +++ b/tools/replay/route.h @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -55,7 +54,7 @@ class Segment : public QObject { Q_OBJECT public: - Segment(int n, const SegmentFile &files, uint32_t flags, const std::set &allow = {}); + Segment(int n, const SegmentFile &files, uint32_t flags); ~Segment(); inline bool isLoaded() const { return !loading_ && !abort_; } @@ -73,5 +72,4 @@ class Segment : public QObject { std::atomic loading_ = 0; QFutureSynchronizer synchronizer_; uint32_t flags; - std::set allow; }; diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc index d0b6a9db97aaff..1873daaf4ba1ca 100644 --- a/tools/replay/tests/test_replay.cc +++ b/tools/replay/tests/test_replay.cc @@ -161,7 +161,7 @@ TEST_CASE("Remote route") { // helper class for unit tests class TestReplay : public Replay { public: - TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, {}, nullptr, flags) {} + TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {} void test_seek(); void testSeekTo(int seek_to); };