Skip to content

Commit

Permalink
Update ChannelGrain Join method to include sessionId parameter, add S…
Browse files Browse the repository at this point in the history
…etActiveChannelConnection method in FusionGrain, and make adjustments in related interfaces and classes.

- Refactor ChannelGrain Join method to accept sessionId
- Introduce SetActiveChannelConnection in FusionGrain
- Update related interfaces and classes for compatibility
  • Loading branch information
0xF6 committed Jan 5, 2025
1 parent 4d8e2fa commit 74ba6b8
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 14 deletions.
7 changes: 5 additions & 2 deletions src/Argon.Api/Grains/ChannelGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
_self = await GetChannel();

_userStateEmitter = await this.Streams().CreateServerStream();
_userStateEmitter = await this.Streams().CreateServerStreamFor(ServerId.id);
}

public async Task<List<RealtimeChannelUser>> GetMembers()
=> state.State.Users.Select(x => x.Value).ToList();


public async Task<Maybe<RealtimeToken>> Join(Guid userId)
public async Task<Maybe<RealtimeToken>> Join(Guid userId, Guid sessionId)
{
if (_self.ChannelType != ChannelType.Voice)
return Maybe<RealtimeToken>.None();
Expand All @@ -49,6 +49,8 @@ public async Task<Maybe<RealtimeToken>> Join(Guid userId)

await _userStateEmitter.Fire(new JoinedToChannelUser(userId, this.GetPrimaryKey()));

await GrainFactory.GetGrain<IFusionSessionGrain>(sessionId).SetActiveChannelConnection(this.GetPrimaryKey());

return await sfu.IssueAuthorizationTokenAsync(userId, ChannelId, SfuPermission.DefaultUser);
}

Expand Down Expand Up @@ -84,6 +86,7 @@ private async Task<Channel> Get()
}
}


public enum ChannelUserChangedStateEvent
{
ON_JOINED,
Expand Down
32 changes: 29 additions & 3 deletions src/Argon.Api/Grains/FusionGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,33 @@ namespace Argon.Grains;

using Features.Jwt;
using Features.Rpc;
using R3;
using static DeactivationReasonCode;

public class FusionGrain(IGrainFactory grainFactory) : Grain, IFusionSessionGrain
public class FusionGrain(IGrainFactory grainFactory, IClusterClient clusterClient) : Grain, IFusionSessionGrain
{
private Guid _userId;
private Guid _machineId;
private Guid _activeChannelId;

private IArgonStream<IArgonEvent> userStream;

private IGrainTimer? refreshTimer;

public async ValueTask SelfDestroy()
{
if (refreshTimer is not null)
refreshTimer.Dispose();
var servers = await grainFactory
.GetGrain<IUserGrain>(_userId)
.GetMyServersIds();
foreach (var server in servers)
await grainFactory
.GetGrain<IServerGrain>(server)
.SetUserStatus(_userId, UserStatus.Offline);
if (_activeChannelId != default)
await grainFactory
.GetGrain<IChannelGrain>(_activeChannelId)
.Leave(_userId);
_userId = default;
_machineId = default;
GrainContext.Deactivate(new(ApplicationRequested, "omae wa mou shindeiru"));
Expand All @@ -32,6 +40,7 @@ public async ValueTask BeginRealtimeSession(Guid userId, Guid machineKey, UserSt
this._machineId = machineKey;

userStream = await this.Streams().CreateServerStreamFor(_userId);
refreshTimer = this.RegisterGrainTimer(RefreshUserStatus, TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(1));

await grainFactory
.GetGrain<IUserMachineSessions>(userId)
Expand All @@ -44,11 +53,22 @@ await grainFactory
.GetGrain<IServerGrain>(server)
.SetUserStatus(userId, preferredStatus ?? UserStatus.Online);

await userStream.Fire(new WelcomeCommander($"Outside temperature is {MathF.Round(Random.Shared.Next(-273_15, 45_00) / 100f)}\u00b0",
await userStream.Fire(new WelcomeCommander($"Outside temperature is {MathF.Round(Random.Shared.Next(-273_15, 45_00) / 100f)}\u00b0",
preferredStatus ?? UserStatus.Online,
new UserNotificationSnapshot(servers.Select(x => new UserNotificationItem(x, 5)).ToList())));
}

private async Task RefreshUserStatus(CancellationToken arg)
{
var servers = await grainFactory
.GetGrain<IUserGrain>(_userId)
.GetMyServersIds();
foreach (var server in servers)
await grainFactory
.GetGrain<IServerGrain>(server)
.SetUserStatus(_userId, UserStatus.Online);
}

public ValueTask EndRealtimeSession()
=> SelfDestroy();

Expand All @@ -57,4 +77,10 @@ public ValueTask<bool> HasSessionActive()

public ValueTask<TokenUserData> GetTokenUserData()
=> new(new TokenUserData(_userId, _machineId));

public ValueTask SetActiveChannelConnection(Guid channelId)
{
_activeChannelId = channelId;
return ValueTask.CompletedTask;
}
}
2 changes: 1 addition & 1 deletion src/Argon.Api/Grains/Interfaces/IChannelGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Argon.Grains.Interfaces;
public interface IChannelGrain : IGrainWithGuidKey
{
[Alias("Join")]
Task<Maybe<RealtimeToken>> Join(Guid userId);
Task<Maybe<RealtimeToken>> Join(Guid userId, Guid sessionId);

[Alias("Leave")]
Task Leave(Guid userId);
Expand Down
3 changes: 3 additions & 0 deletions src/Argon.Api/Grains/Interfaces/IFusionSessionGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public interface IFusionSessionGrain : IGrainWithGuidKey
[Alias("GetTokenUserData")]
ValueTask<TokenUserData> GetTokenUserData();

[Alias("SetActiveChannelConnection")]
ValueTask SetActiveChannelConnection(Guid channelId);

public const string StreamProviderId = "FusionSessionStream";
public const string SelfNs = "@";
public const string StorageId = "CacheStorage";
Expand Down
7 changes: 5 additions & 2 deletions src/Argon.Api/Grains/ServerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Argon.Grains;
using Persistence.States;

public class ServerGrain(
[PersistentState("realtime-server", IFusionSessionGrain.StorageId)]
[PersistentState("realtime-server", IFusionSessionGrain.StorageId)]
IPersistentState<RealtimeServerGrainState> state,
IGrainFactory grainFactory,
IDbContextFactory<ApplicationDbContext> context,
Expand Down Expand Up @@ -47,7 +47,9 @@ public async Task<Server> UpdateServer(ServerInput input)
var server = await ctx.Servers
.FirstAsync(s => s.Id == this.GetPrimaryKey());

var copy = server with { };
var copy = server with
{
};
server.Name = input.Name ?? server.Name;
server.Description = input.Description ?? server.Description;
server.AvatarFileId = input.AvatarUrl ?? server.AvatarFileId;
Expand Down Expand Up @@ -98,6 +100,7 @@ public async ValueTask DoJoinUserAsync(Guid userId)

await ctx.UsersToServerRelations.AddAsync(new ServerMember
{
Id = userId,
ServerId = this.GetPrimaryKey(),
UserId = userId
});
Expand Down
1 change: 1 addition & 0 deletions src/Argon.Api/Grains/UserSessionGrain.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Argon.Grains;

using Microsoft.Extensions.Logging;
using Orleans.BroadcastChannel;

public class UserMachineSessions(
ILogger<IUserMachineSessions> logger) : Grain<UserMachineSessionGrainState>, IUserMachineSessions
Expand Down
5 changes: 2 additions & 3 deletions src/Argon.Api/Services/Transport/EventBusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ public async Task<IArgonStream<IArgonEvent>> SubscribeToServerEvents(Guid Server
public async Task<IArgonStream<IArgonEvent>> SubscribeToMeEvents()
{
var user = this.GetUser();
var sessionId = Guid.NewGuid();

await clusterClient.GetGrain<IFusionSessionGrain>(sessionId)
await clusterClient.GetGrain<IFusionSessionGrain>(user.machineId)
.BeginRealtimeSession(user.id, user.machineId, UserStatus.Online);

ArgonTransportContext.Current.SubscribeToDisconnect(
async () => await clusterClient.GetGrain<IFusionSessionGrain>(sessionId).EndRealtimeSession());
async () => await clusterClient.GetGrain<IFusionSessionGrain>(user.machineId).EndRealtimeSession());

return await clusterClient.Streams().CreateClientStream(user.id);
}
Expand Down
4 changes: 1 addition & 3 deletions src/Argon.Api/Services/Transport/ServerInteraction.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Argon.Services;

using Grpc.Core;
using Orleans.Runtime;
using Shared.Servers;

public class ServerInteraction(IGrainFactory grainFactory) : IServerInteraction
Expand All @@ -21,7 +19,7 @@ public async Task<string> JoinToVoiceChannel(Guid serverId, Guid channelId)
var user = this.GetUser();
var result = await grainFactory
.GetGrain<IChannelGrain>(channelId)
.Join(user.id);
.Join(user.id, user.machineId);
return result.Value.value;
}

Expand Down

0 comments on commit 74ba6b8

Please sign in to comment.