You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

189 lines
8.1 KiB
C#

using System;
using Expedience.Infrastructure;
using Expedience.Infrastructure.Concurrency;
using Expedience.Infrastructure.Models;
using MassTransit;
using static MassTransit.Monitoring.Performance.BuiltInCounters;
namespace Expedience.Api.Consumers
{
public class DutyCompletionResultConsumer : IConsumer<Models.DutyCompletionResult>
{
private readonly ILogger<DutyCompletionResultConsumer> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IDistributedLock _distributedLock;
public DutyCompletionResultConsumer(ILogger<DutyCompletionResultConsumer> logger,
IDistributedLock distributedLock,
IServiceScopeFactory serviceScopeFactory)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
_distributedLock = distributedLock;
}
public async Task Consume(ConsumeContext<Models.DutyCompletionResult> context)
{
var message = context.Message;
if (message == null || message.UserInfo == null || message.PlayerInfo == null ||
message.DutyInfo == null || message.ClientInfo == null || message.CompletionTimeInfo == null)
{
_logger.LogError("Required message fields were not populated on Message with Id {messageId}", context.MessageId);
return;
}
var userHash = message.UserInfo.UserId;
var worldId = message.UserInfo.WorldId;
_logger.LogInformation("Received message {uploadId} from user {userHash}", message.UploadId, userHash);
using var scope = _serviceScopeFactory.CreateScope();
using var dbContext = scope.ServiceProvider.GetRequiredService<ExpedienceContext>();
var user = dbContext.Users.FirstOrDefault(x => x.UserHash == userHash && x.WorldId == worldId) ??
await CreateUser(dbContext, worldId, userHash, CancellationToken.None);
if (user == null)
return;
var dutyCompletionResult = dbContext.DutyCompletionResults.FirstOrDefault(d => d.Id == message.UploadId);
var completionResult = new DutyCompletionResult
{
Id = message.UploadId,
TerritoryId = message.DutyInfo.TerritoryId,
UserId = user.UserId,
HasEcho = message.DutyInfo.HasEcho,
IsUnrestricted = message.DutyInfo.IsUnrestricted,
IsMinILevel = message.DutyInfo.IsMinILevel,
HasNpcMembers = message.DutyInfo.IsNpcSupported,
StartTime = message.DutyStartDateUtc,
EndTime = message.DutyCompletionDateUtc,
Hours = message.CompletionTimeInfo.Hours,
Minutes = message.CompletionTimeInfo.Minutes,
Seconds = message.CompletionTimeInfo.Seconds,
Milliseconds = message.CompletionTimeInfo.Milliseconds,
GameVersion = message.ClientInfo.GameVersion,
PluginVersion = message.ClientInfo.PluginVersion,
Lang = message.ClientInfo.GameLanguage,
DataCenter = message.DataCenter,
UploadedAt = message.UploadDateUtc,
};
if (dutyCompletionResult == null)
{
SaveDutyCompletionResult(dbContext, message, completionResult);
}
else
{
_logger.LogWarning("The Result with Id {uploadId} has already been uploaded", message.UploadId);
}
// Insert this territory if it does not exist
var territory = dbContext.Territories.FirstOrDefault(t => t.TerritoryId == completionResult.TerritoryId);
if (territory == null)
{
_logger.LogInformation("Adding territory {territoryId} - {contentName}", completionResult.TerritoryId, message.DutyInfo.ContentName);
dbContext.Territories.Add(new Models.Territory
{
TerritoryId = completionResult.TerritoryId,
PlaceName = message.DutyInfo.PlaceName,
ContentId = "0",
ContentName = message.DutyInfo.ContentName,
Level = 0,
});
}
else if (String.IsNullOrWhiteSpace(territory.ContentName))
{
_logger.LogInformation("Updating territory {territoryId} with content name {contentName} and place name {placeName}",
completionResult.TerritoryId, message.DutyInfo.ContentName, message.DutyInfo.PlaceName);
// Update the place/content name if necessary
territory.ContentName = message.DutyInfo.ContentName;
territory.PlaceName = message.DutyInfo.PlaceName;
}
await dbContext.SaveChangesAsync();
_logger.LogInformation("Consumed message {uploadId} from user {userHash}", message.UploadId, userHash);
}
private static void SaveDutyCompletionResult(ExpedienceContext dbContext, Models.DutyCompletionResult message,
DutyCompletionResult completionResult)
{
dbContext.DutyCompletionResults.Add(completionResult);
var memberNumber = 0;
var dutyMembers = new List<DutyMember>();
foreach (var member in message.GroupMembers)
{
var dutyMember = new DutyMember
{
UploadId = message.UploadId,
GroupNumber = member.GroupNumber,
MemberNumber = memberNumber++,
ClassJob = member.ClassJob,
Level = member.Level,
IsNpc = member.IsNpc,
IsPlayer = member.IsPlayer,
};
dutyMembers.Add(dutyMember);
}
dbContext.DutyMembers.AddRange(dutyMembers);
}
private async Task<User?> CreateUser(ExpedienceContext dbContext, int worldId, string userHash, CancellationToken cancellationToken)
{
var lockKey = $"{worldId}-{userHash}";
if (_distributedLock.AcquireLock(lockKey, TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(4)))
{
try
{
var user = dbContext.Users.FirstOrDefault(x => x.UserHash == userHash && x.WorldId == worldId);
if (user == null)
{
string userName;
var isDuplicate = false;
do
{
userName = UserNameGenerator.Generate();
isDuplicate = dbContext.Users.Any(x => x.WorldId == worldId && x.UserName == userName);
await Task.Delay(20, cancellationToken); // Don't hog the CPU
}
while (isDuplicate == true);
user = new User
{
UserHash = userHash,
WorldId = worldId,
UserName = userName,
CreatedAt = DateTime.UtcNow,
};
dbContext.Users.Add(user);
await dbContext.SaveChangesAsync(cancellationToken);
_logger.LogInformation("Created user for World {worldId} and Hash {userHash}: {userName}", worldId, userHash, user.UserName);
}
return user;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error obtaining user name for World Id {worldId} and hash {userHash}: {errorMessage}", worldId, userHash, ex.Message);
}
finally
{
_distributedLock.ReleaseLock(lockKey);
}
}
else
{
_logger.LogError("Could not acquire lock for {lockKey}", lockKey);
}
return null;
}
}
}