Skip to content

Commit

Permalink
Plugins prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
ggarber committed Oct 15, 2023
1 parent e8a18a1 commit 14a3ba0
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 10 deletions.
3 changes: 3 additions & 0 deletions worker/include/RTC/DataProducer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "RTC/RTCP/Packet.hpp"
#include "RTC/SctpDictionaries.hpp"
#include "RTC/Shared.hpp"
#include "Utils.hpp"
#include <nlohmann/json.hpp>
#include <string>

Expand Down Expand Up @@ -69,6 +70,8 @@ namespace RTC
// Passed by argument.
const std::string id;

Utils::Event<uint32_t, const uint8_t*, size_t> Data;

private:
// Passed by argument.
RTC::Shared* shared{ nullptr };
Expand Down
15 changes: 15 additions & 0 deletions worker/include/RTC/Plugin.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef MS_RTC_PLUGIN_HPP
#define MS_RTC_PLUGIN_HPP

class Worker;

namespace RTC
{
class Plugin
{
public:
virtual ~Plugin() = default;
};
} // namespace RTC

#endif
6 changes: 6 additions & 0 deletions worker/include/RTC/Router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "RTC/Shared.hpp"
#include "RTC/Transport.hpp"
#include "RTC/WebRtcServer.hpp"
#include "Utils.hpp"
#include <absl/container/flat_hash_map.h>
#include <nlohmann/json.hpp>
#include <string>
Expand Down Expand Up @@ -115,6 +116,8 @@ namespace RTC
public:
// Passed by argument.
const std::string id;
Utils::Event<RTC::Transport*> TransportCreated;
Utils::Event<RTC::Transport*> TransportClosed;

private:
// Passed by argument.
Expand All @@ -124,6 +127,9 @@ namespace RTC
absl::flat_hash_map<std::string, RTC::Transport*> mapTransports;
absl::flat_hash_map<std::string, RTC::RtpObserver*> mapRtpObservers;
// Others.

// TODO: Replace with smart pointers

absl::flat_hash_map<RTC::Producer*, absl::flat_hash_set<RTC::Consumer*>> mapProducerConsumers;
absl::flat_hash_map<RTC::Consumer*, RTC::Producer*> mapConsumerProducer;
absl::flat_hash_map<RTC::Producer*, absl::flat_hash_set<RTC::RtpObserver*>> mapProducerRtpObservers;
Expand Down
9 changes: 9 additions & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,15 @@ namespace RTC
// Passed by argument.
const std::string id;

Utils::Event<RTC::Producer*> ProducerCreated;
Utils::Event<RTC::Producer*> ProducerClosed;
Utils::Event<RTC::Consumer*> ConsumerCreated;
Utils::Event<RTC::Consumer*> ConsumerClosed;
Utils::Event<RTC::DataProducer*> DataProducerCreated;
Utils::Event<RTC::DataProducer*> DataProducerClosed;
Utils::Event<RTC::DataConsumer*> DataConsumerCreated;
Utils::Event<RTC::DataConsumer*> DataConsumerClosed;

protected:
RTC::Shared* shared{ nullptr };
size_t maxMessageSize{ 262144u };
Expand Down
26 changes: 26 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,32 @@ namespace Utils
}
}
};

template <typename... Args>
class Event

This comment has been minimized.

Copy link
@ibc

ibc Oct 16, 2023

I don't think this template/class should be defined in Utils.hpp.

This comment has been minimized.

Copy link
@ggarber

ggarber Oct 16, 2023

Author Owner

Agree.

I don't know if Event is the right name either, versus Signal or something else.

{
public:
// Typedef for the function signature.
typedef std::function<void(Args...)> Listener;
void operator()(Args... args) const
{
for (auto& listener : this->listeners)
{
listener(args...);
}
}
void operator+=(const Listener& listener)
{
this->listeners.push_back(listener);
}
void operator-=(const Listener& listener)
{
std::remove(listeners.begin(), listeners.end(), listener);
}
private:
std::vector<Listener> listeners;
};

} // namespace Utils

#endif
10 changes: 10 additions & 0 deletions worker/include/Worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
#include "PayloadChannel/PayloadChannelNotification.hpp"
#include "PayloadChannel/PayloadChannelRequest.hpp"
#include "PayloadChannel/PayloadChannelSocket.hpp"
#include "RTC/Plugin.hpp"
#include "RTC/Router.hpp"
#include "RTC/Shared.hpp"
#include "RTC/WebRtcServer.hpp"
#include "Utils.hpp"
#include "handles/SignalsHandler.hpp"
#include <absl/container/flat_hash_map.h>
#include <nlohmann/json.hpp>
#include <string>
#include <vector>

using json = nlohmann::json;

Expand All @@ -25,6 +28,8 @@ class Worker : public Channel::ChannelSocket::Listener,
public:
explicit Worker(Channel::ChannelSocket* channel, PayloadChannel::PayloadChannelSocket* payloadChannel);
~Worker();

void RunInContext(std::function<void()> func);

private:
void Close();
Expand Down Expand Up @@ -63,6 +68,10 @@ class Worker : public Channel::ChannelSocket::Listener,
public:
RTC::WebRtcServer* OnRouterNeedWebRtcServer(RTC::Router* router, std::string& webRtcServerId) override;

public:
Utils::Event<RTC::Router*> RouterCreated;
Utils::Event<RTC::Router*> RouterClosed;

private:
// Passed by argument.
Channel::ChannelSocket* channel{ nullptr };
Expand All @@ -72,6 +81,7 @@ class Worker : public Channel::ChannelSocket::Listener,
RTC::Shared* shared{ nullptr };
absl::flat_hash_map<std::string, RTC::WebRtcServer*> mapWebRtcServers;
absl::flat_hash_map<std::string, RTC::Router*> mapRouters;
std::vector<RTC::Plugin*> plugins;
// Others.
bool closed{ false };
};
Expand Down
5 changes: 3 additions & 2 deletions worker/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ common_sources = [
'src/RTC/RTCP/XR.cpp',
'src/RTC/RTCP/XrDelaySinceLastRr.cpp',
'src/RTC/RTCP/XrReceiverReferenceTime.cpp',
'plugins/DataPing/Plugin.cpp'

This comment has been minimized.

Copy link
@ibc

ibc Oct 16, 2023

hehe

This comment has been minimized.

Copy link
@ggarber

ggarber Oct 16, 2023

Author Owner

This would be Option 1/ in "Loading of Plugins" in the Issue description; versatica#1175

]

cpp = meson.get_compiler('cpp')
Expand Down Expand Up @@ -264,7 +265,7 @@ libmediasoup_worker = library(
install_tag: 'libmediasoup-worker',
dependencies: dependencies,
sources: common_sources,
include_directories: include_directories('include'),
include_directories: include_directories('include', 'plugins'),
cpp_args: cpp_args,
link_whole: link_whole,
)
Expand All @@ -276,7 +277,7 @@ executable(
install_tag: 'mediasoup-worker',
dependencies: dependencies,
sources: common_sources + ['src/main.cpp'],
include_directories: include_directories('include'),
include_directories: include_directories('include', 'plugins'),
cpp_args: cpp_args + ['-DMS_EXECUTABLE'],
)

Expand Down
61 changes: 61 additions & 0 deletions worker/plugins/DataPing/Plugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include "Plugin.hpp"

#include <iostream>

#include "Worker.hpp"
#include "RTC/Router.hpp"
#include "RTC/DataProducer.hpp"
#include "RTC/DataConsumer.hpp"

DataPingPlugin::DataPingPlugin(Worker* worker)
{
worker->RouterCreated += [&](RTC::Router* router)
{
std::cerr << "RouterCreated" << std::endl;

router->TransportCreated += [&](RTC::Transport* transport)

This comment has been minimized.

Copy link
@ibc

ibc Oct 16, 2023

What does += mean here?

This comment has been minimized.

Copy link
@ggarber

ggarber Oct 16, 2023

Author Owner

Connecting a callback to an event.

I think it is a common pattern to use the += operator for this (It is used that way in C# too iirc) but it could be replaced with a method and without operators with something like router->TransportCreated.connect([&]() { ... }

{
std::cerr << "TransportCreated" << std::endl;

transport->DataProducerCreated += [&](RTC::DataProducer* producer)
{
producer->Data += [&](uint32_t id, const uint8_t* data, size_t len)
{
auto isPing = len == 4 && std::string((const char*)data, len) == "ping";
if (!isPing)
{
return;
}
for (auto [ consumerTransport, consumer ] : this->consumers)
{
if (consumerTransport == transport &&
consumer->GetSctpStreamParameters().streamId == producer->GetSctpStreamParameters().streamId)
{
consumer->SendMessage(id, data, len);
}
}
// TBD: Implement support to stop processing of this message.
// return false;
};
};

transport->DataConsumerCreated += [&](RTC::DataConsumer* dataConsumer)
{
this->consumers.push_back({ transport, dataConsumer });
};

transport->DataConsumerClosed += [&](RTC::DataConsumer* dataConsumer)
{
for (auto it = this->consumers.begin(); it != this->consumers.end(); ++it)
{
if (it->dataConsumer == dataConsumer)
{
this->consumers.erase(it);
break;
}
}
};

};
};
}
23 changes: 23 additions & 0 deletions worker/plugins/DataPing/Plugin.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include "RTC/Plugin.hpp"
#include "RTC/Transport.hpp"
#include "RTC/DataConsumer.hpp"

#include <vector>

class DataPingPlugin: public RTC::Plugin
{
public:
DataPingPlugin(Worker* worker);

private:
struct ConsumerInfo
{
RTC::Transport* transport;
RTC::DataConsumer* dataConsumer;
};
std::vector<ConsumerInfo> consumers;
};

// RTC::Plugin* CreatePlugin(Worker* worker);
2 changes: 2 additions & 0 deletions worker/src/RTC/DataProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,7 @@ namespace RTC
this->bytesReceived += len;

this->listener->OnDataProducerMessageReceived(this, ppid, msg, len);

this->Data(ppid, msg, len);
}
} // namespace RTC
33 changes: 25 additions & 8 deletions worker/src/RTC/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ namespace RTC
auto* transport = kv.second;

delete transport;

TransportClosed(transport);
}
this->mapTransports.clear();

Expand Down Expand Up @@ -297,6 +299,8 @@ namespace RTC

request->Accept(data);

TransportCreated(webRtcTransport);

break;
}

Expand All @@ -321,6 +325,8 @@ namespace RTC

request->Accept(data);

TransportCreated(plainTransport);

break;
}

Expand All @@ -343,6 +349,8 @@ namespace RTC
pipeTransport->FillJson(data);

request->Accept(data);

TransportCreated(pipeTransport);

break;
}
Expand All @@ -367,6 +375,8 @@ namespace RTC
directTransport->FillJson(data);

request->Accept(data);

TransportCreated(directTransport);

break;
}
Expand Down Expand Up @@ -430,6 +440,8 @@ namespace RTC

request->Accept();

TransportClosed(transport);

break;
}

Expand Down Expand Up @@ -718,17 +730,22 @@ namespace RTC
// Cloned ref-counted packet that RtpStreamSend will store for as long as
// needed avoiding multiple allocations unless absolutely necessary.
// Clone only happens if needed.
std::shared_ptr<RTC::RtpPacket> sharedPacket;
std::shared_ptr<RTC::RtpPacket> sharedPacket(packet->Clone());

for (auto* consumer : consumers)
{
// Update MID RTP extension value.
const auto& mid = consumer->GetRtpParameters().mid;

if (!mid.empty())
packet->UpdateMid(mid);

consumer->SendRtpPacket(packet, sharedPacket);
// Possible optimizations
// if consumer.worker == producer.worker
// group consumers by worker and call run per worker
consumer->RunInContext([consumer, sharedPacket]() mutable {

This comment has been minimized.

Copy link
@ggarber

ggarber Oct 16, 2023

Author Owner

This change is unrelated to the plugins PR, please ignore it.

// Update MID RTP extension value.
const auto& mid = consumer->GetRtpParameters().mid;

if (!mid.empty())
sharedPacket->UpdateMid(mid);

consumer->SendRtpPacket(sharedPacket.get(), sharedPacket);
});
}
}

Expand Down
Loading

0 comments on commit 14a3ba0

Please sign in to comment.