Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable dbus-broker reexecute #310

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 148 additions & 1 deletion src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "broker/broker.h"
#include "broker/controller.h"
#include "broker/main.h"
Expand All @@ -21,7 +22,9 @@
#include "util/error.h"
#include "util/log.h"
#include "util/proc.h"
#include "util/serialize.h"
#include "util/sockopt.h"
#include "util/string.h"
#include "util/user.h"

static int broker_dispatch_signals(DispatchFile *file) {
Expand All @@ -40,12 +43,113 @@ static int broker_dispatch_signals(DispatchFile *file) {
return DISPATCH_E_EXIT;
}

int broker_new(Broker **brokerp, const char *machine_id, int log_fd, int controller_fd, uint64_t max_bytes, uint64_t max_fds, uint64_t max_matches, uint64_t max_objects) {
static int serialize_broker(Broker *broker) {
FILE *f = NULL;
int mem_fd;
mem_fd = state_file_init(&f);
if (mem_fd < 0)
return error_fold(mem_fd);

(void) serialize_basic(f, "max_ids", "%d", broker->bus.peers.ids);

Check failure

Code scanning / CodeQL

Wrong type of arguments to formatting function

This argument should be of type 'int' but is of type 'unsigned long'.
(void) serialize_peers(f, broker);
fseeko(f, 0, SEEK_SET);

return mem_fd;
}

static int broker_execv_with_args(Broker *broker, int mem_fd) {
_c_cleanup_(c_freep) char *str_mem_fd = NULL, *str_log = NULL, *str_controller = NULL;
_c_cleanup_(c_freep) char *str_max_bytes = NULL, *str_max_fds = NULL, *str_max_matches = NULL;
int r;
/* Generating args */
r = asprintf(&str_mem_fd, "%d", mem_fd);
if (r < 0)
return error_fold(r);
r = asprintf(&str_log, "%d", broker->log_fd);
if (r < 0)
return error_fold(r);
r = asprintf(&str_controller, "%d", broker->controller_fd);
if (r < 0)
return error_fold(r);
r = asprintf(&str_max_bytes, "%lu", broker->max_bytes);
if (r < 0)
return error_fold(r);
r = asprintf(&str_max_fds, "%lu", broker->max_fds + 1);
if (r < 0)
return error_fold(r);
r = asprintf(&str_max_matches, "%lu", broker->max_matches);
if (r < 0)
return error_fold(r);

/* execv */
char *args[OPTION_NUM_MAX];
int i = 0;
args[i++] = broker->bin_path;
generate_args_string(broker->log_fd > 0, args, OPTION_NUM_MAX, &i, "--log", str_log);
generate_args_string(true, args, OPTION_NUM_MAX, &i, "--controller", str_controller);
generate_args_string(true, args, OPTION_NUM_MAX, &i, "--machine-id", broker->machine_id);
generate_args_string(true, args, OPTION_NUM_MAX, &i, "--max-bytes", str_max_bytes);
generate_args_string(true, args, OPTION_NUM_MAX, &i, "--max-fds", str_max_fds);
generate_args_string(true, args, OPTION_NUM_MAX, &i, "--max-matches", str_max_matches);
generate_args_string(true, args, OPTION_NUM_MAX, &i, "--reexec", str_mem_fd);
if (broker->arg_audit && i + 2 < OPTION_NUM_MAX)
args[i++] = "--audit";
args[i++] = NULL;

log_append_here(&broker->log, LOG_INFO, 0, NULL);
r = log_commitf(&broker->log, "Broker now reexecuting...");
if (r)
return error_fold(r);

execv(broker->bin_path, args);
return 0;
}

static void set_broker_from_arg(Broker *broker, BrokerArg *broker_arg) {
broker->arg_audit = broker_arg->arg_audit;
broker->bin_path = broker_arg->bin_path;
broker->machine_id = broker_arg->machine_id;
broker->log_fd = broker_arg->log_fd;
broker->controller_fd = broker_arg->controller_fd;
broker->mem_fd = broker_arg->mem_fd;
broker->max_bytes = broker_arg->max_bytes;
broker->max_fds = broker_arg->max_fds;
broker->max_matches = broker_arg->max_matches;
broker->max_objects = broker_arg->max_objects;
}

static int broker_reexecute(Broker *broker) {
int mem_fd;
int r;

log_append_here(&broker->log, LOG_INFO, 0, NULL);
r = log_commitf(&broker->log, "Serializing broker.\n");
if (r)
return error_fold(r);

/* serialize */
mem_fd = serialize_broker(broker);
if (mem_fd < 0) {
log_append_here(&broker->log, LOG_INFO, errno, DBUS_BROKER_CATALOG_BROKER_EXITED);
r = log_commitf(&broker->log, "Failed to serialize broker.\n");
if (r < 0)
return error_fold(r);
}

kill(broker->launcher_pid, SIGCHLD);
return broker_execv_with_args(broker, mem_fd);
}

int broker_new(Broker **brokerp, BrokerArg *broker_arg) {
_c_cleanup_(broker_freep) Broker *broker = NULL;
struct ucred ucred;
socklen_t z;
sigset_t sigmask;
int r, log_type;
int log_fd = broker_arg->log_fd, controller_fd = broker_arg->controller_fd;
const char* machine_id = broker_arg->machine_id;
uint64_t max_bytes = broker_arg->max_bytes, max_fds = broker_arg->max_fds;
uint64_t max_matches = broker_arg->max_matches, max_objects = broker_arg->max_objects;

if (log_fd >= 0) {
z = sizeof(log_type);
Expand All @@ -67,6 +171,10 @@ int broker_new(Broker **brokerp, const char *machine_id, int log_fd, int control
broker->bus = (Bus)BUS_NULL(broker->bus);
broker->dispatcher = (DispatchContext)DISPATCH_CONTEXT_NULL(broker->dispatcher);
broker->signals_fd = -1;
broker->reexec_serial = -1;
broker->do_reexec = false;
broker->launcher_pid = getppid();
set_broker_from_arg(broker, broker_arg);
broker->signals_file = (DispatchFile)DISPATCH_FILE_NULL(broker->signals_file);
broker->controller = (Controller)CONTROLLER_NULL(broker->controller);

Expand Down Expand Up @@ -209,6 +317,12 @@ int broker_run(Broker *broker) {
else if (r)
return error_fold(r);

if (broker->mem_fd) {
r = deserialize_broker(broker, broker->mem_fd);
if (r)
return error_trace(r);
}

do {
r = dispatch_context_dispatch(&broker->dispatcher);
if (r == DISPATCH_E_EXIT)
Expand All @@ -217,8 +331,20 @@ int broker_run(Broker *broker) {
r = MAIN_FAILED;
else
r = error_fold(r);

if (broker->do_reexec)
r = MAIN_REEXEC;

} while (!r);

Peer *peeri;
c_rbtree_for_each_entry(peeri, &broker->bus.peers.peer_tree, registry_node) {
socket_dispatch_write(&peeri->connection.socket);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not reliably flush the entire queue. There are several scenarios where this will not block and instead retain data in the user-space queue. In those cases, we would lose messages.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I toke it for granted that all messages we have gotten will be sent out by socket_dispatch_write successfully. But if there are not many messages flying between dbus and peers, I think it's probobly fine.

By the way, do you think if we should support new client to establish connection during reexecuting? I haved checked systemd, systemctl start/stop doesn't work when systemctl daemon-reexec is running. Supporting that seems very complicated IMHO.

}

if (r == MAIN_REEXEC)
(void) broker_reexecute(broker);

peer_registry_flush(&broker->bus.peers);

k = broker_log_metrics(broker);
Expand Down Expand Up @@ -248,3 +374,24 @@ int broker_reload_config(Broker *broker, User *sender_user, uint64_t sender_id,

return 0;
}

int deserialize_broker(Broker *broker, int mem_fd) {
FILE *f = NULL;
int max_ids_length = ID_LENGTH_MAX + strlen("max_ids=");
_c_cleanup_(c_freep) char *buf = malloc(max_ids_length);

errno = 0;
f = fdopen(mem_fd, "r");
if (!f)
return error_trace(-errno);

while (fgets(buf, max_ids_length, f) != NULL) {
char *max_ids = string_prefix(buf, "max_ids=");
if (max_ids) {
broker->bus.peers.ids = atoi(max_ids);
break;
}
}

return 0;
}
34 changes: 33 additions & 1 deletion src/broker/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,65 @@

#include <c-stdaux.h>
#include <stdlib.h>
#include <systemd/sd-event.h>
#include "broker/controller.h"
#include "bus/bus.h"
#include "util/dispatch.h"
#include "util/log.h"

#define OPTION_NUM_MAX 20

enum {
_BROKER_E_SUCCESS,

BROKER_E_FORWARD_FAILED,
};

typedef struct Broker Broker;
typedef struct BrokerArg BrokerArg;
typedef struct User User;

struct BrokerArg {
const char *bin_path;
const char *machine_id;
bool arg_audit;
int log_fd;
int controller_fd;
int mem_fd;
uint64_t max_bytes;
uint64_t max_fds;
uint64_t max_matches;
uint64_t max_objects;
};

struct Broker {
sd_event *event;
Log log;
Bus bus;
DispatchContext dispatcher;

int signals_fd;
int reexec_serial;
bool arg_audit;
bool do_reexec;
const char *bin_path;
const char *machine_id;
int log_fd;
int controller_fd;
int mem_fd;
uint64_t max_bytes;
uint64_t max_fds;
uint64_t max_matches;
uint64_t max_objects;
pid_t launcher_pid;
DispatchFile signals_file;

Controller controller;
};

/* broker */

int broker_new(Broker **brokerp, const char *machine_id, int log_fd, int controller_fd, uint64_t max_bytes, uint64_t max_fds, uint64_t max_matches, uint64_t max_objects);
int broker_new(Broker **brokerp, BrokerArg *broker_arg);
Broker *broker_free(Broker *broker);

int broker_run(Broker *broker);
Expand All @@ -58,3 +89,4 @@ static inline Broker *BROKER(Bus *bus) {
*/
return c_container_of(bus, Broker, bus);
}
int deserialize_broker(Broker *broker, int mem_fd);
61 changes: 61 additions & 0 deletions src/broker/controller-dbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "broker/broker.h"
#include "broker/controller.h"
#include "bus/policy.h"
#include "bus/peer.h"
#include "dbus/connection.h"
#include "dbus/message.h"
#include "dbus/protocol.h"
Expand Down Expand Up @@ -275,6 +276,16 @@ static int controller_method_add_listener(Controller *controller, const char *_p

c_dvar_write(out_v, "()");

/* We choose to recover peer when adding listener. Because if before this,
* we do not know configs or polices of the new version. If after this, i.e.
* in listener_dispatch, the recover can't be done unless some new clients
* try to establish new connection. */
if (controller->broker->mem_fd) {
r = peer_recover_with_fd(controller->broker->mem_fd, listener);
if (r)
return error_trace(r);
}

return 0;
}

Expand Down Expand Up @@ -528,6 +539,12 @@ static int controller_dispatch_reply(Controller *controller, uint32_t serial, co
ControllerReload *reload;
int r;

/* Set broker->do_reexec to true if we get the reply from launcher. */
if (controller->broker->reexec_serial != -1 && controller->broker->reexec_serial == serial) {
controller->broker->do_reexec = true;
return 0;
}

reload = controller_find_reload(controller, serial);
if (!reload)
return CONTROLLER_E_UNEXPECTED_REPLY;
Expand Down Expand Up @@ -789,3 +806,47 @@ int controller_dbus_send_reload(Controller *controller, User *user, uint32_t ser

return 0;
}

/**
* controller_dbus_send_reexecute() - XXX
*/
int controller_dbus_send_reexecute(Controller *controller, User *user, uint32_t serial) {
static const CDVarType type[] = {
C_DVAR_T_INIT(
CONTROLLER_T_MESSAGE(
C_DVAR_T_TUPLE0
)
)
};
_c_cleanup_(c_dvar_deinit) CDVar var = C_DVAR_INIT;
_c_cleanup_(message_unrefp) Message *message = NULL;
_c_cleanup_(c_freep) void *data = NULL;
size_t n_data;
int r;

c_dvar_begin_write(&var, (__BYTE_ORDER == __BIG_ENDIAN), type, 1);
c_dvar_write(&var, "((yyyyuu[(y<o>)(y<s>)(y<s>)])())",
c_dvar_is_big_endian(&var) ? 'B' : 'l', DBUS_MESSAGE_TYPE_METHOD_CALL, 0, 1, 0, serial,
DBUS_MESSAGE_FIELD_PATH, c_dvar_type_o, "/org/bus1/DBus/Controller",
DBUS_MESSAGE_FIELD_INTERFACE, c_dvar_type_s, "org.bus1.DBus.Controller",
DBUS_MESSAGE_FIELD_MEMBER, c_dvar_type_s, "Reexecute");

r = c_dvar_end_write(&var, &data, &n_data);
if (r)
return error_origin(r);

r = message_new_outgoing(&message, data, n_data);
if (r)
return error_fold(r);
data = NULL;

r = connection_queue(&controller->connection, user, message);
if (r) {
if (r == CONNECTION_E_QUOTA)
return CONTROLLER_E_QUOTA;

return error_fold(r);
}

return 0;
}
17 changes: 16 additions & 1 deletion src/broker/controller.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ static int controller_dispatch_connection(DispatchFile *file) {

do {
_c_cleanup_(message_unrefp) Message *m = NULL;

r = connection_dequeue(&controller->connection, &m);
if (r) {
if (r == CONNECTION_E_EOF)
Expand Down Expand Up @@ -373,6 +372,22 @@ int controller_request_reload(Controller *controller,
return 0;
}

/**
* controller_request_reexecute() - XXX
*/
int controller_request_reexecute(Controller *controller,
User *sender_user,
uint64_t sender_id,
uint32_t sender_serial) {
int r;

r = controller_dbus_send_reexecute(controller, sender_user, sender_serial);
if (r)
return error_trace(r);

return 0;
}

/**
* controller_find_name() - XXX
*/
Expand Down
Loading