Асинхронный кэш для одного элемента

tox1cozZ

aka Agravaine
8,378
595
2,825
Есть задача реализовать простые муты игроков в чате. Данные о мутах хранятся в БД. Соответственно, при отправке сообщения игроков нужно делать запрос в БД и смотреть, может ли он писать в чат.
Запрос в бд - дело не быстрое, к тому же это нужно делать очень часто, ибо чат может быть довольно активным.

В Guava есть такая штука прикольная, Suppliers.memoizeWithExpiration - своеобразный кэш для одного элемента:
Java:
Supplier<String> task = Suppliers.memoizeWithExpiration(() -> {
            try{
                // Large task
                Thread.sleep(1000L);
                return RandomStringUtils.randomAlphanumeric(12);
            }catch(Throwable e){
                return "ERROR";
            }
        }, 60L, TimeUnit.SECONDS);

String value = task.get();
Мы вызываем get(), идет подгрузка данных и кешируется. 60 секунд мы сможет дергать get() и у нас будет сразу возвращаться значение из кеша. И только через 60 секунд снова перезагрузятся данные.

Удобно, красиво, отлично подходит чтобы постоянно не лазить в базу.
Но это все происходит в том же потоке, что все равно может повесить сервер.
Пытался реализовать это дело асинхронно - вот что вышло:
Java:
AtomicReference<String> prevValue = new AtomicReference<>();
Supplier<CompletableFuture<String>> calculated = Suppliers.memoizeWithExpiration(() -> CompletableFuture.supplyAsync(() -> {
            try{
                // Large task
                Thread.sleep(1000L);
                return RandomStringUtils.randomAlphanumeric(12);
            }catch(Throwable e){
                return "ERROR";
            }
        }).thenApply(res -> {
            prevValue.set(res);
            return res;
        }), 60L, TimeUnit.SECONDS);

String result = calculated.get().getNow(prevValue.get());
Вроде всё работает как надо. Но сомневаюсь, что правильная реализация, может можно как-то сделать проще и красивее?
И да, prevValue запоминает текущее значение и передает его в getNow, чтобы пока подгружаются данные мы получали предыдущий результат, а не нулл(нулл - рабочее значение).
get() дергать у фьючи нельзя, ибо он блокирубщий(кэп).
 
2,505
80
395
Вот так, наверное, немного покрасивее:
Java:
String result = null;

Supplier<CompletableFuture<String>> calculated = Suppliers.memoizeWithExpiration(() -> CompletableFuture.supplyAsync(() -> {
    try {
        // Large task
        Thread.sleep(1000L);
        return RandomStringUtils.randomAlphanumeric(12);
    } catch (Throwable e) {
        return "ERROR";
    }
}), 60L, TimeUnit.SECONDS);

result = calculated.get().getNow(result);
Соответственно вместо локальной переменной result нужно использовать поле где-нибудь в игроке.

А вообще, зачем делать эти запросы в базу постоянно? Может можно сделать запрос один раз при входе?

А но основе чего memoizeWithExpiration кэширует результат? На основе входной лямбды? Мне кажется, могут быть подводные камни, когда будешь использовать для разных игроков.
 
Последнее редактирование:

tox1cozZ

aka Agravaine
8,378
595
2,825
А вообще, зачем делать эти запросы в базу постоянно? Может можно сделать запрос один раз при входе?
База изменяется не только из игры, но и из сайта, удаленной консоли и бог знает еще откуда. Поэтому если снимут мут на сайте, игрок узнает об этом только после перезахода, что не подходит.

А но основе чего memoizeWithExpiration кэширует результат? На основе входной лямбды?
Да, на входе передается обычный сапплаер, который и вычисляет кэшируемое значение.

Мне кажется, могут быть подводные камни, когда будешь использовать для разных игроков.
Да ну хз, у меня этот сапплаер в игроке лежит прям, не думаю.
 

CumingSoon

Местный стендапер
1,635
12
267
Если я правильно понял, что тебе нужно асинхронно тянуть какие-то данные, а потом обновлять новые, то можешь посмотреть сюда: https://github.com/r4v3n6101/rtw/bl...ver/data/impl/OpenWeatherDataRetriever.kt#L51
Не знаю, хуже или лучше это твоего способа, но посмотри, вдруг что-нибудь с Executor сделаешь
 

GoogleTan

Картошка :3
1,352
42
308
А можит при заходе отправлять данные на клиент и в случае мута блочить чат. При обновлении данных они синхронизируются с клиентом. Если игрок таки обойдет эту засчиту(ну есть такие хитрые читеры люди, думаю, ты и сам знаешь), тогда будет работать твоя система. Это поидее должно понизить нагрузку.
 
7,058
320
1,484
Так ведь из-за рефреша раз в 60 сек муты будут действовать в среднем через полминуты, это разве хорошо?
 

GoogleTan

Картошка :3
1,352
42
308
Нет. В эти 30 сек будет работать стандартная схема с запросом в бд, как и в случае с хитрыми игроками.
 
355
2
17
База изменяется не только из игры, но и из сайта, удаленной консоли и бог знает еще откуда. Поэтому если снимут мут на сайте, игрок узнает об этом только после перезахода, что не подходит.
Собственно, а почему бы не посылать вместе с мутом rcon запрос? Сервер обновит информацию о игроке.
 
2,505
80
395
Да, на входе передается обычный сапплаер, который и вычисляет кэшируемое значение.
Я говорю не про вычисление, а про само кэширование. Внутри memoizeWithExpiration скорее всего лежит какой-нибудь статический HashMap<Supplier<Result>, Result>, который и кэширует данные по входной функции (я не изучал этот механизм, поэтому могу ошибаться). А ведь это не нужно, потому что значение итак кэшируется в игроке. Поэтому я бы лучше сделал примерно как Равен. Но это сильно много кода, если нет удобного механизма для выполнения асинхронных тасков и последующего до-выполнения таска в основном трэде (чтобы не делать всякие атомик поля и защиты от гонки).

Не знаю, хуже или лучше это твоего способа, но посмотри, вдруг что-нибудь с Executor сделаешь
Не нужно создавать новый Executor для каждой таски. А еще не стоит создавать именно CachedThreadPool. Если в один случайный момент будет очень много тасков, он создаст слишком много трэдов. И они так и останутся лежать в памяти, пока не удалится Executor.
 

tox1cozZ

aka Agravaine
8,378
595
2,825
Внутри memoizeWithExpiration скорее всего лежит какой-нибудь статический HashMap<Supplier<Result>, Result>
Никаких там статиков нет. Это обычная обёртка над Supplier, которая у себя в get методе проверяет по времени, нужно ли обновить кэш. Если да - вызывает get и родительского Supplier и запоминает в переменную, иначе просто возвращает переменную со значением.
Java:
static class ExpiringMemoizingSupplier<T> implements Supplier<T>, Serializable {
    final Supplier<T> delegate;
    final long durationNanos;
    transient volatile @Nullable T value;
    // The special value 0 means "not yet initialized".
    transient volatile long expirationNanos;

    ExpiringMemoizingSupplier(Supplier<T> delegate, long duration, TimeUnit unit) {
      this.delegate = checkNotNull(delegate);
      this.durationNanos = unit.toNanos(duration);
      checkArgument(duration > 0, "duration (%s %s) must be > 0", duration, unit);
    }

    @Override
    public T get() {
      // Another variant of Double Checked Locking.
      //
      // We use two volatile reads. We could reduce this to one by
      // putting our fields into a holder class, but (at least on x86)
      // the extra memory consumption and indirection are more
      // expensive than the extra volatile reads.
      long nanos = expirationNanos;
      long now = Platform.systemNanoTime();
      if (nanos == 0 || now - nanos >= 0) {
        synchronized (this) {
          if (nanos == expirationNanos) { // recheck for lost race
            T t = delegate.get();
            value = t;
            nanos = now + durationNanos;
            // In the very unlikely event that nanos is 0, set it to 1;
            // no one will notice 1 ns of tardiness.
            expirationNanos = (nanos == 0) ? 1 : nanos;
            return t;
          }
        }
      }
      return value;
    }

Собственно, а почему бы не посылать вместе с мутом rcon запрос? Сервер обновит информацию о игроке.
А если у меня нет доступа к сайту? Некому отправлять эти рконы, увы. Так что не варик.

А можит при заходе отправлять данные на клиент и в случае мута блочить чат. При обновлении данных они синхронизируются с клиентом. Если игрок таки обойдет эту засчиту(ну есть такие хитрые читеры люди, думаю, ты и сам знаешь), тогда будет работать твоя система. Это поидее должно понизить нагрузку.
Ненене, вообще не туда роешь. Клиент ничего не знает о мутах, это все на сервере только.

Не знаю, хуже или лучше это твоего способа, но посмотри, вдруг что-нибудь с Executor сделаешь
Походу что-то такое и придется замутить.
 

CumingSoon

Местный стендапер
1,635
12
267
Если в один случайный момент будет очень много тасков, он создаст слишком много трэдов. И они так и останутся лежать в памяти, пока не удалится Executor.
Так, по идее, они переиспользуют себя. Ну а даже если это не так, то у нас же не адсл, чтобы тянуть лёгкий запрос сто веков

Походу что-то такое и придется замутить.
Есть ScheduledExecutor, ты можешь просто им присваивать другую переменную. Это, кстати, интересная идея, надо у себя будет поправить код
 

CumingSoon

Местный стендапер
1,635
12
267
и обновлял мою атомик переменную
Ну да. Кстати, если ты нигде в другом месте переменную не используешь, то можно не делать ее атомарной. Насколько я помню, объекты на куче могут быть прочитаны много раз асинхронно, но вот записаны должны быть синхронно, дабы не было такого, что они пишут в непонятном порядке
 

CumingSoon

Местный стендапер
1,635
12
267
Видимо, это только на С++ распространяется. Ну да ладно
 

tox1cozZ

aka Agravaine
8,378
595
2,825
Кароч, переделал немного тот готовый сапплаер, вышло что-то такое:
Java:
import lombok.ToString;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

@ToString(of = {"delegate", "durationNanos"})
public class AsyncCachedValue<T> implements Supplier<T>{

    private final ExecutorService executor;
    private final Supplier<T> delegate;
    private final long durationNanos;

    private volatile CompletableFuture<T> value;
    private final AtomicReference<T> previousValue = new AtomicReference<>();

    // The special value 0 means "not yet initialized"
    private volatile long expirationNanos;

    public AsyncCachedValue(ExecutorService executor, Supplier<T> delegate, long duration, TimeUnit unit, boolean cacheImmediately){
        this.executor = checkNotNull(executor);
        this.delegate = checkNotNull(delegate);
        this.durationNanos = unit.toNanos(duration);
        checkArgument(duration > 0, "duration (%s %s) must be > 0", duration, unit);
        if(cacheImmediately){
            get();
        }
    }

    @Override
    public T get(){
        // Another variant of Double Checked Locking.
        //
        // We use two volatile reads. We could reduce this to one by
        // putting our fields into a holder class, but (at least on x86)
        // the extra memory consumption and indirection are more
        // expensive than the extra volatile reads.
        long nanos = expirationNanos;
        long now = System.nanoTime();
        if(nanos == 0 || now - nanos >= 0){
            synchronized(this){
                if(nanos == expirationNanos){ // recheck for lost race
                    value = CompletableFuture.supplyAsync(delegate, executor).whenComplete((result, error) -> previousValue.set(result));
                    nanos = now + durationNanos;
                    // In the very unlikely event that nanos is 0, set it to 1;
                    // no one will notice 1 ns of tardiness.
                    expirationNanos = (nanos == 0) ? 1 : nanos;
                }
            }
        }
        return value.getNow(previousValue.get());
    }
}
Абсолютно без понятия, правильно или нет, но кажись всё работает как надо.
 

CumingSoon

Местный стендапер
1,635
12
267
А зачем synchronized, который по сути легаси, использовать вместе с concurrent? Если тебе будет интересно, то я как-нибудь так написал бы:
Java:
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public class AsyncCachedValue<T> implements Supplier<T> {

    private final ScheduledExecutorService executor;
    private final Supplier<T> supplier;
    private final Duration duration;
    private AtomicReference<T> value = null;

    public AsyncCachedValue(ScheduledExecutorService executor, Supplier<T> supplier, Duration duration, boolean init) {
        this.executor = executor;
        this.supplier = supplier;
        this.duration = duration;
        if (init) initialize();
    }

    private void initialize() {
        value = new AtomicReference<>();
        executor.scheduleAtFixedRate(() -> {
            value.set(supplier.get());
        }, 0, duration.toNanos(), TimeUnit.NANOSECONDS);
    }

    @Override
    public T get() {
        if (value == null) initialize();
        return value.get();
    }
}
 
Сверху