Skip to content

Commit

Permalink
replay: simplify the code for allow/block list (commaai#30449)
Browse files Browse the repository at this point in the history
simplify allow/block list
  • Loading branch information
deanlee authored Nov 13, 2023
1 parent d3b91f2 commit da95fd3
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 102 deletions.
2 changes: 1 addition & 1 deletion tools/cabana/streams/replaystream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void ReplayStream::mergeSegments() {

bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) {
replay.reset(new Replay(route, {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"},
{}, {}, nullptr, replay_flags, data_dir, this));
{}, nullptr, replay_flags, data_dir, this));
replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);
Expand Down
16 changes: 6 additions & 10 deletions tools/replay/SConscript
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
import os
Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc',
'cereal', 'transformations')
Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', 'cereal')

base_frameworks = qt_env['FRAMEWORKS']
base_libs = [common, messaging, cereal, visionipc, transformations, 'zmq',
'capnp', 'kj', 'm', 'ssl', 'crypto', 'pthread'] + qt_env["LIBS"]
base_libs = [common, messaging, cereal, visionipc, 'zmq',
'capnp', 'kj', 'm', 'ssl', 'crypto', 'pthread', 'qt_util'] + qt_env["LIBS"]

if arch == "Darwin":
base_frameworks.append('OpenCL')
else:
base_libs.append('OpenCL')

qt_libs = ['qt_util'] + base_libs
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]

replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"]

replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=qt_libs, FRAMEWORKS=base_frameworks)
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib')
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + qt_libs
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'yuv', 'ncurses'] + base_libs
qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks)

if GetOption('extras'):
qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, qt_libs])
qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, base_libs])
17 changes: 5 additions & 12 deletions tools/replay/logreader.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "tools/replay/logreader.h"

#include <algorithm>
#include "tools/replay/filereader.h"
#include "tools/replay/util.h"

Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
Expand Down Expand Up @@ -40,25 +41,23 @@ LogReader::~LogReader() {
}
}

bool LogReader::load(const std::string &url, std::atomic<bool> *abort,
const std::set<cereal::Event::Which> &allow,
bool local_cache, int chunk_size, int retries) {
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (raw_.empty()) return false;

if (url.find(".bz2") != std::string::npos) {
raw_ = decompressBZ2(raw_, abort);
if (raw_.empty()) return false;
}
return parse(allow, abort);
return parse(abort);
}

bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
raw_.assign((const char *)data, size);
return parse({}, abort);
return parse(abort);
}

bool LogReader::parse(const std::set<cereal::Event::Which> &allow, std::atomic<bool> *abort) {
bool LogReader::parse(std::atomic<bool> *abort) {
try {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0 && !(abort && *abort)) {
Expand All @@ -67,12 +66,6 @@ bool LogReader::parse(const std::set<cereal::Event::Which> &allow, std::atomic<b
#else
Event *evt = new Event(words);
#endif
if (!allow.empty() && allow.find(evt->which) == allow.end()) {
words = kj::arrayPtr(evt->reader.getEnd(), words.end());
delete evt;
continue;
}

// Add encodeIdx packet again as a frame packet for the video stream
if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
Expand Down
6 changes: 2 additions & 4 deletions tools/replay/logreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
#endif

#include <memory>
#include <set>
#include <string>
#include <vector>

#include "cereal/gen/cpp/log.capnp.h"
#include "system/camerad/cameras/camera_common.h"
#include "tools/replay/filereader.h"

const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
Expand Down Expand Up @@ -55,13 +53,13 @@ class LogReader {
public:
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
~LogReader();
bool load(const std::string &url, std::atomic<bool> *abort = nullptr, const std::set<cereal::Event::Which> &allow = {},
bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event*> events;

private:
bool parse(const std::set<cereal::Event::Which> &allow, std::atomic<bool> *abort);
bool parse(std::atomic<bool> *abort);
std::string raw_;
#ifdef HAS_MEMORY_RESOURCE
std::unique_ptr<std::pmr::monotonic_buffer_resource> mbr_;
Expand Down
5 changes: 2 additions & 3 deletions tools/replay/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ int main(int argc, char *argv[]) {

QCoreApplication app(argc, argv);

const QStringList base_blacklist = {"uiDebug", "userFlag"};
const std::tuple<QString, REPLAY_FLAGS, QString> flags[] = {
{"dcam", REPLAY_FLAG_DCAM, "load driver camera"},
{"ecam", REPLAY_FLAG_ECAM, "load wide road camera"},
Expand All @@ -22,7 +21,7 @@ int main(int argc, char *argv[]) {
{"qcam", REPLAY_FLAG_QCAMERA, "load qcamera"},
{"no-hw-decoder", REPLAY_FLAG_NO_HW_DECODER, "disable HW video decoding"},
{"no-vipc", REPLAY_FLAG_NO_VIPC, "do not output video"},
{"all", REPLAY_FLAG_ALL_SERVICES, "do output all messages including " + base_blacklist.join(", ") +
{"all", REPLAY_FLAG_ALL_SERVICES, "do output all messages including uiDebug, userFlag"
". this may causes issues when used along with UI"}
};

Expand Down Expand Up @@ -64,7 +63,7 @@ int main(int argc, char *argv[]) {
op_prefix.reset(new OpenpilotPrefix(prefix.toStdString()));
}

Replay *replay = new Replay(route, allow, block, base_blacklist, nullptr, replay_flags, parser.value("data_dir"), &app);
Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
if (!parser.value("c").isEmpty()) {
replay->setSegmentCacheLimit(parser.value("c").toInt());
}
Expand Down
83 changes: 28 additions & 55 deletions tools/replay/replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,23 @@
#include "common/timing.h"
#include "tools/replay/util.h"

Replay::Replay(QString route, QStringList allow, QStringList block, QStringList base_blacklist, SubMaster *sm_, uint32_t flags, QString data_dir, QObject *parent)
: sm(sm_), flags_(flags), QObject(parent) {
std::vector<const char *> s;
Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_,
uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
block << "uiDebug" << "userFlag";
}
auto event_struct = capnp::Schema::from<cereal::Event>().asStruct();
sockets_.resize(event_struct.getUnionFields().size());
for (const auto &it : services) {
auto name = it.second.name.c_str();
uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue();
if ((which == cereal::Event::Which::UI_DEBUG || which == cereal::Event::Which::USER_FLAG) &&
!(flags & REPLAY_FLAG_ALL_SERVICES) &&
!allow.contains(name)) {
continue;
}

if ((allow.empty() || allow.contains(name)) && !block.contains(name)) {
sockets_[which] = name;
if (!allow.empty() || !block.empty()) {
allow_list.insert((cereal::Event::Which)which);
}
s.push_back(name);
for (const auto &[name, _] : services) {
if (!block.contains(name.c_str()) && (allow.empty() || allow.contains(name.c_str()))) {
uint16_t which = event_struct.getFieldByName(name).getProto().getDiscriminantValue();
sockets_[which] = name.c_str();
}
}

if (!allow_list.empty()) {
// the following events are needed for replay to work properly.
allow_list.insert(cereal::Event::Which::INIT_DATA);
allow_list.insert(cereal::Event::Which::CAR_PARAMS);
}

std::vector<const char *> s;
std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s),
[](const char *name) { return name != nullptr; });
qDebug() << "services " << s;
qDebug() << "loading route " << route;

Expand Down Expand Up @@ -150,7 +138,7 @@ void Replay::buildTimeline() {
const auto &route_segments = route_->segments();
for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) {
std::shared_ptr<LogReader> log(new LogReader());
if (!log->load(it->second.qlog.toStdString(), &exit_, {}, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;
if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;

for (const Event *e : log->events) {
if (e->which == cereal::Event::Which::CONTROLS_STATE) {
Expand Down Expand Up @@ -233,30 +221,17 @@ void Replay::segmentLoadFinished(bool success) {
}

void Replay::queueSegment() {
if (segments_.empty()) return;

SegmentMap::iterator begin, cur;
begin = cur = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
int distance = std::max<int>(std::ceil(segment_cache_limit / 2.0) - 1, segment_cache_limit - std::distance(cur, segments_.end()));
for (int i = 0; begin != segments_.begin() && i < distance; ++i) {
--begin;
}
auto end = begin;
for (int i = 0; end != segments_.end() && i < segment_cache_limit; ++i) {
++end;
}
auto cur = segments_.lower_bound(current_segment_.load());
if (cur == segments_.end()) return;

auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit, segments_.size()));
// load one segment at a time
for (auto it = cur; it != end; ++it) {
auto &[n, seg] = *it;
if ((seg && !seg->isLoaded()) || !seg) {
if (!seg) {
rDebug("loading segment %d...", n);
seg = std::make_unique<Segment>(n, route_->at(n), flags_, allow_list);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
}
break;
}
auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); });
if (it != end && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
}

mergeSegments(begin, end);
Expand Down Expand Up @@ -293,13 +268,11 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
new_events_->clear();
new_events_->reserve(new_events_size);
for (int n : segments_need_merge) {
const auto &e = segments_[n]->log->events;
if (e.size() > 0) {
auto insert_from = e.begin();
if (new_events_->size() > 0 && (*insert_from)->which == cereal::Event::Which::INIT_DATA) ++insert_from;
auto middle = new_events_->insert(new_events_->end(), insert_from, e.end());
std::inplace_merge(new_events_->begin(), middle, new_events_->end(), Event::lessThan());
}
size_t size = new_events_->size();
const auto &events = segments_[n]->log->events;
std::copy_if(events.begin(), events.end(), std::back_inserter(*new_events_),
[this](auto e) { return e->which < sockets_.size() && sockets_[e->which] != nullptr; });
std::inplace_merge(new_events_->begin(), new_events_->begin() + size, new_events_->end(), Event::lessThan());
}

if (stream_thread_) {
Expand Down Expand Up @@ -414,7 +387,7 @@ void Replay::stream() {
cur_mono_time_ = evt->mono_time;
setCurrentSegment(toSeconds(cur_mono_time_) / 60);

if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) {
if (sockets_[cur_which] != nullptr) {
// keep time
long etime = (cur_mono_time_ - evt_start_ts) / speed_;
long rtime = nanos_since_boot() - loop_start_ts;
Expand Down
8 changes: 2 additions & 6 deletions tools/replay/replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <tuple>
#include <vector>
Expand Down Expand Up @@ -50,8 +49,8 @@ class Replay : public QObject {
Q_OBJECT

public:
Replay(QString route, QStringList allow, QStringList block, QStringList base_blacklist, SubMaster *sm = nullptr,
uint32_t flags = REPLAY_FLAG_NONE, QString data_dir = "", QObject *parent = 0);
Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr,
uint32_t flags = REPLAY_FLAG_NONE, QString data_dir = "", QObject *parent = 0);
~Replay();
bool load();
void start(int seconds = 0);
Expand Down Expand Up @@ -114,8 +113,6 @@ protected slots:
}

QThread *stream_thread_ = nullptr;

// logs
std::mutex stream_lock_;
std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false;
Expand All @@ -142,7 +139,6 @@ protected slots:
std::mutex timeline_lock;
QFuture<void> timeline_future;
std::vector<std::tuple<double, double, TimelineType>> timeline;
std::set<cereal::Event::Which> allow_list;
std::string car_fingerprint_;
std::atomic<float> speed_ = 1.0;
replayEventFilter event_filter = nullptr;
Expand Down
9 changes: 2 additions & 7 deletions tools/replay/route.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
#include <QRegExp>
#include <QtConcurrent>
#include <array>
#include <memory>
#include <set>
#include <string>

#include "selfdrive/ui/qt/api.h"
#include "system/hardware/hw.h"
Expand Down Expand Up @@ -102,9 +99,7 @@ void Route::addFileToSegment(int n, const QString &file) {

// class Segment

Segment::Segment(int n, const SegmentFile &files, uint32_t flags,
const std::set<cereal::Event::Which> &allow)
: seg_num(n), flags(flags), allow(allow) {
Segment::Segment(int n, const SegmentFile &files, uint32_t flags) : seg_num(n), flags(flags) {
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const std::array file_list = {
(flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
Expand Down Expand Up @@ -135,7 +130,7 @@ void Segment::loadFile(int id, const std::string file) {
success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3);
} else {
log = std::make_unique<LogReader>();
success = log->load(file, &abort_, allow, local_cache, 0, 3);
success = log->load(file, &abort_, local_cache, 0, 3);
}

if (!success) {
Expand Down
4 changes: 1 addition & 3 deletions tools/replay/route.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <map>
#include <memory>
#include <set>
#include <string>

#include <QDateTime>
Expand Down Expand Up @@ -55,7 +54,7 @@ class Segment : public QObject {
Q_OBJECT

public:
Segment(int n, const SegmentFile &files, uint32_t flags, const std::set<cereal::Event::Which> &allow = {});
Segment(int n, const SegmentFile &files, uint32_t flags);
~Segment();
inline bool isLoaded() const { return !loading_ && !abort_; }

Expand All @@ -73,5 +72,4 @@ class Segment : public QObject {
std::atomic<int> loading_ = 0;
QFutureSynchronizer<void> synchronizer_;
uint32_t flags;
std::set<cereal::Event::Which> allow;
};
2 changes: 1 addition & 1 deletion tools/replay/tests/test_replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ TEST_CASE("Remote route") {
// helper class for unit tests
class TestReplay : public Replay {
public:
TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, {}, nullptr, flags) {}
TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {}
void test_seek();
void testSeekTo(int seek_to);
};
Expand Down

0 comments on commit da95fd3

Please sign in to comment.