using System; using Expedience.Infrastructure; using Expedience.Infrastructure.Concurrency; using Expedience.Infrastructure.Models; using MassTransit; using static MassTransit.Monitoring.Performance.BuiltInCounters; namespace Expedience.Api.Consumers { public class DutyCompletionResultConsumer : IConsumer { private readonly ILogger _logger; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IDistributedLock _distributedLock; public DutyCompletionResultConsumer(ILogger logger, IDistributedLock distributedLock, IServiceScopeFactory serviceScopeFactory) { _logger = logger; _serviceScopeFactory = serviceScopeFactory; _distributedLock = distributedLock; } public async Task Consume(ConsumeContext context) { var message = context.Message; if (message == null || message.UserInfo == null || message.PlayerInfo == null || message.DutyInfo == null || message.ClientInfo == null || message.CompletionTimeInfo == null) { _logger.LogError("Required message fields were not populated on Message with Id {messageId}", context.MessageId); return; } var userHash = message.UserInfo.UserId; var worldId = message.UserInfo.WorldId; _logger.LogInformation("Received message {uploadId} from user {userHash}", message.UploadId, userHash); using var scope = _serviceScopeFactory.CreateScope(); using var dbContext = scope.ServiceProvider.GetRequiredService(); var user = dbContext.Users.FirstOrDefault(x => x.UserHash == userHash && x.WorldId == worldId) ?? await CreateUser(dbContext, worldId, userHash, CancellationToken.None); if (user == null) return; var dutyCompletionResult = dbContext.DutyCompletionResults.FirstOrDefault(d => d.Id == message.UploadId); var completionResult = new DutyCompletionResult { Id = message.UploadId, TerritoryId = message.DutyInfo.TerritoryId, UserId = user.UserId, HasEcho = message.DutyInfo.HasEcho, IsUnrestricted = message.DutyInfo.IsUnrestricted, IsMinILevel = message.DutyInfo.IsMinILevel, HasNpcMembers = message.DutyInfo.IsNpcSupported, StartTime = message.DutyStartDateUtc, EndTime = message.DutyCompletionDateUtc, Hours = message.CompletionTimeInfo.Hours, Minutes = message.CompletionTimeInfo.Minutes, Seconds = message.CompletionTimeInfo.Seconds, Milliseconds = message.CompletionTimeInfo.Milliseconds, GameVersion = message.ClientInfo.GameVersion, PluginVersion = message.ClientInfo.PluginVersion, Lang = message.ClientInfo.GameLanguage, DataCenter = message.DataCenter, UploadedAt = message.UploadDateUtc, }; if (dutyCompletionResult == null) { SaveDutyCompletionResult(dbContext, message, completionResult); } else { _logger.LogWarning("The Result with Id {uploadId} has already been uploaded", message.UploadId); } // Insert this territory if it does not exist var territory = dbContext.Territories.FirstOrDefault(t => t.TerritoryId == completionResult.TerritoryId); if (territory == null) { _logger.LogInformation("Adding territory {territoryId} - {contentName}", completionResult.TerritoryId, message.DutyInfo.ContentName); dbContext.Territories.Add(new Models.Territory { TerritoryId = completionResult.TerritoryId, PlaceName = message.DutyInfo.PlaceName, ContentId = "0", ContentName = message.DutyInfo.ContentName, Level = 0, }); } else if (String.IsNullOrWhiteSpace(territory.ContentName)) { _logger.LogInformation("Updating territory {territoryId} with content name {contentName} and place name {placeName}", completionResult.TerritoryId, message.DutyInfo.ContentName, message.DutyInfo.PlaceName); // Update the place/content name if necessary territory.ContentName = message.DutyInfo.ContentName; territory.PlaceName = message.DutyInfo.PlaceName; } await dbContext.SaveChangesAsync(); _logger.LogInformation("Consumed message {uploadId} from user {userHash}", message.UploadId, userHash); } private static void SaveDutyCompletionResult(ExpedienceContext dbContext, Models.DutyCompletionResult message, DutyCompletionResult completionResult) { dbContext.DutyCompletionResults.Add(completionResult); var memberNumber = 0; var dutyMembers = new List(); foreach (var member in message.GroupMembers) { var dutyMember = new DutyMember { UploadId = message.UploadId, GroupNumber = member.GroupNumber, MemberNumber = memberNumber++, ClassJob = member.ClassJob, Level = member.Level, IsNpc = member.IsNpc, IsPlayer = member.IsPlayer, }; dutyMembers.Add(dutyMember); } dbContext.DutyMembers.AddRange(dutyMembers); } private async Task CreateUser(ExpedienceContext dbContext, int worldId, string userHash, CancellationToken cancellationToken) { var lockKey = $"{worldId}-{userHash}"; if (_distributedLock.AcquireLock(lockKey, TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(4))) { try { var user = dbContext.Users.FirstOrDefault(x => x.UserHash == userHash && x.WorldId == worldId); if (user == null) { string userName; var isDuplicate = false; do { userName = UserNameGenerator.Generate(); isDuplicate = dbContext.Users.Any(x => x.WorldId == worldId && x.UserName == userName); await Task.Delay(20, cancellationToken); // Don't hog the CPU } while (isDuplicate == true); user = new User { UserHash = userHash, WorldId = worldId, UserName = userName, CreatedAt = DateTime.UtcNow, }; dbContext.Users.Add(user); await dbContext.SaveChangesAsync(cancellationToken); _logger.LogInformation("Created user for World {worldId} and Hash {userHash}: {userName}", worldId, userHash, user.UserName); } return user; } catch (Exception ex) { _logger.LogError(ex, "Error obtaining user name for World Id {worldId} and hash {userHash}: {errorMessage}", worldId, userHash, ex.Message); } finally { _distributedLock.ReleaseLock(lockKey); } } else { _logger.LogError("Could not acquire lock for {lockKey}", lockKey); } return null; } } }