Проблемы пакетной обработки запросов и их решения. Часть 2 | OTUS

Проблемы пакетной обработки запросов и их решения. Часть 2

Это продолжение статьи «Проблемы пакетной обработки запросов и их решения». Рекомендуется сначала ознакомиться с первой частью, так как в ней подробно описана суть задачи и некоторые подходы к ее решению. Здесь же мы рассмотрим другие методы.

Краткое повторение задачи

Есть чат для согласования документа с предопределенным набором участников. Сообщения содержат текст и файлы. И, как в обычных чатах, сообщения могут быть ответами (reply) и пересылками (forward).

Модель сообщения чата:

data class ChatMessage(
  // nullable так как появляется только после persist
  val id: Long? = null,
  /** Ссылка на автора */
  val author: UserReference,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
  val files: List<FileReference>? = null,
  /** Если является ответом, то здесь будет оригинал */
  val replyTo: ChatMessage? = null,
  /** Если является пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessage? = null
)

Через Dependency Injection нам доступны реализации следующих внешних сервисов:

interface ChatMessageRepository {
  fun findLast(n: Int): List<ChatMessage>
}

data class FileHeadRemote(
  val id: FileReference,
  val name: String
)

interface FileRemoteApi {
  fun getHeadById(id: FileReference): FileHeadRemote
  fun getHeadsByIds(id: Set<FileReference>): Set<FileHeadRemote>
  fun getHeadsByIds(id: List<FileReference>): List<FileHeadRemote>
}

data class UserRemote(
  val id: UserReference,
  val name: String
)

interface UserRemoteApi {
  fun getUserById(id: UserReference): UserRemote
  fun getUsersByIds(id: Set<UserReference>): Set<UserRemote>
  fun getUsersByIds(id: List<UserReference>): List<UserRemote>
} 

Нам нужно реализовать REST-контроллер:

interface ChatRestApi {
  fun getLast(n: Int): List<ChatMessageUI>
}

Где:

/** В таком виде отдаются ссылки на сущности для фронта */
data class ReferenceUI(
  /** Идентификатор для url */
  val ref: String,
  /** Видимое пользователю название ссылки */
  val name: String
)

data class ChatMessageUI(
  val id: Long,
  /** Ссылка на автора */
  val author: ReferenceUI,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  val files: List<ReferenceUI>,
  /** Если являтся ответом, то здесь будет оригинал */
  val replyTo: ChatMessageUI? = null,
  /** Если являтся пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessageUI? = null
)

В предыдущей части мы рассмотрели наивную реализацию сервиса, использующего пакетную обработку, и несколько способов ее ускорить. Эти способы очень просты, но их применение не обеспечивает достаточно хорошую производительность.

Увеличение пакетов

Главной проблемой наивных решений стал маленький размер пакетов.

Для того чтобы вызовы группировать в пакеты большего размера, нужно как-то накапливать запросы. Вот эта строка не подразумевает накопления запросов:

author = userRepository.getUserById(author).toFrontReference(),

Сейчас у нас в рантайме нет специального места для хранения перечня пользователей — он формируется постепенно. Это придется менять.

Для начала нужно отделить логику получения данных от маппинга в методе:

ChatMessage.toFrontModel:

private fun ChatMessage.toFrontModel(
  getUser: (UserReference) -> UserRemote,
  getFile: (FileReference) -> FileHeadRemote,
  serializeMessage: (ChatMessage) -> ChatMessageUI
): ChatMessageUI =
  ChatMessageUI(
    id = id ?: throw IllegalStateException("$this must be persisted"),
    author = getUser(author).toFrontReference(),
    message = message,
    files = files?.let {
      it.map(getFile).map { it.toFrontReference() }
    } ?: listOf(),
    forwardFrom = forwardFrom?.let(serializeMessage),
    replyTo = replyTo?.let(serializeMessage)
  )

Получается, что эта функция зависит только от трех внешних функций (а не от целых классов, как было вначале).

После такой переделки тело функции не стало менее понятным, а контракт стал жестче (в этом есть как плюсы, так и минусы).

В действительности можно и не делать такое сужение контракта и оставить зависимости от интерфейсов. Главное, чтобы в них точно не было ничего лишнего, так как нам понадобится делать альтернативные реализации.

Поскольку функция serializeMessage похожа на рекурсивную, на первом шаге рефакторинга это и можно сделать в виде явной рекурсии:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(n: Int) =
    messageRepository.findLast(n)
      .map { it.toFrontModel() }

Я сделал заглушку для метода toFrontModel, которая пока работает ровно так же, как в нашей первой наивной реализации (реализация всех трех внешних функций осталась прежней).

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  toFrontModel(
    getUser = userRepository::getUserById,
    getFile = fileRepository::getHeadById,
    serializeMessage = {it.toFrontModel()}
  )

Но нам нужно сделать так, чтобы функции getUser, getFile и serializeMessage работали эффективно, то есть слали запросы соответствующим сервисам пакетами нужных размеров (для каждого сервиса теоретически этот размер может быть разным) или вообще по одному запросу в каждый сервис, если разрешены безлимитные запросы.

Проще всего добиться такой группировки, если до начала обработки у нас будут на руках все нужные запросы. Для этого нужно перед вызовом toFrontModel собрать все необходимые ссылки, сделать пакетную обработку и потом уже использовать результат.

Еще можно попробовать схемы с накоплением запросов и постепенным их исполнением. Однако такие схемы потребуют асинхронного выполнения, а мы пока остановимся на синхронных.

Итак, чтобы начать использовать пакетную обработку, так или иначе придется узнать заранее как можно больше запросов (желательно все), которые нам придется сделать. Если речь про REST-контроллер, было бы прекрасно объединить запросы к каждому сервису по всей сессии.

Группировка всех вызовов

В некоторых ситуациях все данные, которые необходимы в рамках сессии, могут быть получены сразу и не вызовут проблем с ресурсами ни у инициатора запроса, ни у исполнителя. В таком случае мы можем не ограничивать размер пакета для вызова сервиса и получать сразу вообще все данные.

Еще одно допущение, которое сильно облегчает жизнь, — считать, что у инициатора хватит ресурсов на обработку всех данных. Запросы к внешним сервисам можно слать и ограниченными пакетами, если они этого требуют.

Упрощение логики в этом случае касается того, как будут сопоставляться места, где данные нужны, с результатами вызовов. Если считать, что ресурсы инициатора сильно ограничены, и при этом пытаться минимизировать количество внешних вызовов, получится довольно сложная задача на оптимальное разрезание графа. Скорее всего, придется просто пожертвовать производительностью ради уменьшения потребления ресурсов.

Будем считать, что конкретно в нашем демо-проекте инициатор не особо ограничен в ресурсах, может получать все необходимые данные и хранить их до окончания сессии. Если будут проблемы с ресурсами, мы просто сделаем пагинацию поменьше.

Поскольку в моей практике именно такой подход наиболее востребован, дальнейшие примеры будут касаться этого варианта.

Можно выделить такие способы получения больших наборов запросов:

  • реверс-инжиниринг;
  • бизнес-эвристики;
  • агрегаты в стиле DDD;
  • проксирование и двойной вызов.

Пройдемся по всем вариантам на примере нашего проекта.

Реверс-инжиниринг

Сбор всех запросов

Поскольку у нас есть код реализации всех функций, участвующих в сборе информации и ее преобразовании для фронтенда, можно сделать реверс-инжиниринг и из этого кода понять, какие будут запросы:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(n: Int) =
    messageRepository.findLast(n)
      .let { messages ->
        // получаем полный список сообщений, включая forward и reply
        val allMessages = messages.asSequence().flatMap {
          sequenceOf(it, it.forwardFrom, it.replyTo).filterNotNull()
        }.toSet()
        val allUserReq = allMessages.map { it.author }
        val allFileReq = allMessages.flatMap { it.files ?: listOf() }.toSet()

Все запросы собраны, теперь нужно сделать собственно пакетную обработку.

Для allUserReq и allFileReq делаем внешние запросы и группируем их по id. Если ограничений на размер пакета нет, то это будет выглядеть примерно так:

userRepository.getUsersByIds(allMessages.map { it.author }.toSet())
  .associateBy { it.id }::get
fileRepository.getHeadsByIds(allMessages.flatMap { it.files ?: listOf() }.toSet())
  .associateBy { it.id }::get

Если ограничение есть, то код примет такой вид:

val userApiChunkLimit = 100
allMessages.map { it.author }.asSequence().distinct()
  .chunked(userApiChunkLimit, userRepository::getUsersByIds)
  .flatten()
  .associateBy { it.id }::get

К сожалению, в отличие от Stream, в Sequence нельзя легко перейти на параллельный запрос пакетов.

Если вы считаете параллельный запрос допустимым и нужным, можно сделать, например, так:

allMessages.map { it.author }.parallelStream().distinct()
  .chunked(userApiChunkLimit, userRepository::getUsersByIds)
  .flatten()
  .associateBy { it.id }::get

Видно, что особо ничего не изменилось. В этом нам помогло использование некоторого количества Kotlin-магии:

fun <T, R> Stream<out T>.chunked(size: Int, transform: (List<T>) -> R): Stream<out R> =
  batches(this, size).map(transform)

fun <T> Stream<out Collection<T>>.flatten(): Stream<T> =
  flatMap { it.stream() }

fun <T, K> Stream<T>.associateBy(keySelector: (T) -> K): Map<K, T> =
  collect(Collectors.toMap(keySelector, { it }))

Теперь осталось собрать все вместе:

override fun getLast(n: Int) =
  messageRepository.findLast(n)
    .let { messages ->
      // получаем полный список сообщений, включая forward и reply
      val allMessages = messages.asSequence().flatMap { message ->
        sequenceOf(message, message.forwardFrom, message.replyTo)
          .filterNotNull()
      }.toSet()

      messages.map(ValueHolder<(ChatMessage) -> ChatMessageUI>().apply {
        value = memoize { message: ChatMessage ->
          message.toFrontModel(
            // для этого сервиса есть ограничение размера пакета, но возможны параллельные запросы
            getUser = allMessages.map { it.author }.parallelStream().distinct()
              .chunked(userApiChunkLimit, userRepository::getUsersByIds)
              .flatten()
              .associateBy { it.id }::get.orThrow { IllegalArgumentException("User $it") },
            // для этого сервиса нет ограничений на размер пакета
            getFile = fileRepository.getHeadsByIds(allMessages.flatMap { it.files ?: listOf() }.toSet())
              .associateBy { it.id }::get.orThrow { IllegalArgumentException("File $it") },
            // рекурсивный вызов этой же функции с мемоизацией
            serializeMessage = value
          )
        }
      }.value)
    }

Пояснения и упрощения

Первое, что наверняка бросится в глаза, — это функция memoize. Дело в том, что функция serializeMessage почти наверняка будет вызываться для одних и тех же сообщений несколько раз (из-за reply и forward). Непонятно, зачем нам делать toFrontModel отдельно для каждого такого сообщения (в некоторых случаях это может быть нужно, но не в нашем). Поэтому можно сделать мемоизацию для функции serializeMessage. Реализуется это, например, так:

fun <A, R> memoize(func: (A) -> R) = func as? Memoize2 ?: Memoize2(func)
class Memoize2<A, R>(val func: (A) -> R) : (A) -> R, java.util.function.Function<A, R> {
  private val cache = hashMapOf<A, R>()
  override fun invoke(p1: A) = cache.getOrPut(p1, { func(p1) })
  override fun apply(t: A): R = invoke(t)
}

Далее нам нужно сконструировать мемоизированную функцию serializeMessage, но при этом у нее внутри будет использоваться она же. Здесь важно использовать внутри именно тот же экземпляр функции, иначе вся мемоизация пойдет псу под хвост. Для разрешения этой коллизии используем класс ValueHolder, который просто хранит ссылку на значение (можно взять вместо него что-то стандартное, например AtomicReference). Чтобы сократить запись для рекурсии, можно сделать так:

inline fun <A, R> recursiveMemoize(crossinline func: (A, (A) -> R) -> R): (A) -> R =
  ValueHolder<(A) -> R>().apply {
    value = memoize { a -> func(a, value) }
  }.value

Если вы смогли понять этот стрелочный силлогизм с первого раза — поздравляю, вы функциональный программист :-)

Теперь код будет выглядеть следующим образом:

override fun getLast(n: Int) =
  messageRepository.findLast(n)
    .let { messages ->
      // получаем полный список сообщений, включая forward и reply
      val allMessages = messages.asSequence().flatMap { message ->
        sequenceOf(message, message.forwardFrom, message.replyTo)
          .filterNotNull()
      }.toSet()

      // для этого сервиса есть ограничение размера пакета, но возможны параллельные запросы
      val getUser = allMessages.map { it.author }.parallelStream().distinct()
        .chunked(userApiChunkLimit, userRepository::getUsersByIds)
        .flatten()
        .associateBy { it.id }::get.orThrow { IllegalArgumentException("User $it") }

      // для этого сервиса нет ограничений на размер пакета
      val getFile = fileRepository.getHeadsByIds(allMessages.flatMap { it.files ?: listOf() }.toSet())
        .associateBy { it.id }::get.orThrow { IllegalArgumentException("File $it") }

      messages.map(recursiveMemoize { message, memoized: (ChatMessage) -> ChatMessageUI ->
        message.toFrontModel(
          getUser = getUser,
          getFile = getFile,
          // рекурсивный вызов этой же функции с мемоизацией
          serializeMessage = memoized
        )
      })

Можно заметить еще orThrow, который определяется так:

/** Бросает указанный [exception], если функция возвращает null */
fun <P, R> ((P) -> R?).orThrow(exception: (P) -> Exception): (P) -> R =
  { p -> invoke(p).let { it ?: throw exception(p) } }

Если во внешних сервисах отсутствуют данные по нашим id и это считается легальной ситуацией, нужно обрабатывать ее как-то по-другому.

После этого исправления ожидается, что время выполнения getLast будет в районе 300 мс. Причем это время вырастет несильно, даже если запросы перестанут укладываться в ограничения на размер пакета (так как пакеты запрашиваются параллельно). Напомню, что наша цель-минимум — 500 мс, а нормальной работой можно будет считать 250 мс.

Параллельность

Но нужно двигаться дальше. Обращения к userRepository и fileRepository являются полностью независимыми, и их можно легко распараллелить, в теории приблизившись к 200 мс.

Например, через нашу функцию join:
override fun getLast(n: Int) =
  messageRepository.findLast(n)
    .let { messages ->
      // получаем полный список сообщений, включая forward и reply
      val allMessages = messages.asSequence().flatMap { message ->
        sequenceOf(message, message.forwardFrom, message.replyTo)
          .filterNotNull()
      }.toSet()

      join({
        // для этого сервиса есть ограничение размера пакета, но возможны параллельные запросы
        allMessages.map { it.author }.parallelStream().distinct()
          .chunked(userApiChunkLimit, userRepository::getUsersByIds)
          .flatten()
          .associateBy { it.id }
      }, {
        // для этого сервиса нет ограничений на размер пакета
        fileRepository.getHeadsByIds(allMessages.flatMap { it.files ?: listOf() }.toSet())
          .associateBy { it.id }
      }).let { (users, files) ->
        messages.map(recursiveMemoize { message, memoized: (ChatMessage) -> ChatMessageUI ->
          message.toFrontModel(
            getUser = users::get.orThrow { IllegalArgumentException("User $it") },
            getFile = files::get.orThrow { IllegalArgumentException("File $it") },
            // рекурсивный вызов этой же функции с мемоизацией
            serializeMessage = memoized
          )
        })
      }
    }

Как показывает практика, на исполнение затрачивается в районе 200 мс, и очень важно, что с увеличением количества сообщений время особо не растет.

Проблемы

В целом код стал, конечно, менее читабельным, чем наша наивная первая версия, но хорошо, что сама сериализация (реализация toFrontModel) почти не изменилась и осталась вполне читабельной. Вся логика хитрой работы с внешними сервисами живет в одном месте.

Минус этого подхода в том, что наша абстракция протекает.

Если нам понадобится внести изменения в toFrontModel, почти наверняка придется вносить изменения и в функцию getLast, что нарушает принцип подстановки Барбары Лисков (Liskov Substitution Principle).

Например, мы договорились расшифровывать приложенные файлы только в основных сообщениях, но не в ответах и пересылках (reply/forward), или только в ответах и пересылках первого уровня. В таком случае после внесения изменений в код toFrontModel придется сделать соответствующие исправления и в коде сбора запросов для файлов. Причем исправление будет нетривиальным:

fileRepository.getHeadsByIds(
  allMessages.flatMap { it.files ?: listOf() }.toSet()
)

И здесь мы плавно подходим к еще одной проблеме, тесно связанной с предыдущей: правильность работы кода в целом зависит от грамотности проведенного реверс-инжиниринга. В некоторых сложных случаях код может сработать неправильно именно из-за некорректной работы сбора запросов. Нет никакой гарантии, что у вас получится быстро придумать юнит-тесты, которые покроют все такие хитрые ситуации.

Выводы

Плюсы:

  1. Очевидный способ предварительного получения запросов, который легко отделяется от основного кода.
  2. Почти полное отсутствие накладных расходов памяти и времени, связанное с использованием только тех данных, которые все равно были бы получены.
  3. Хорошее масштабирование и возможность построить сервис, который в теории будет отвечать за предсказуемое время вне зависимости от размера запроса извне.

Минусы:

  1. Довольно сложный код самой пакетной обработки.
  2. Большая и ответственная работа по анализу запросов в существующей реализации.
  3. Протекающая абстракция и, как следствие, хрупкость всей схемы по отношению к изменениям в реализации.
  4. Сложности в поддержке: ошибки в блоке предсказания запросов трудно отличить от ошибок в основном коде. В идеале нужно применять в два раза больше юнит-тестов, поэтому и разбирательства с ошибками в продакшене будут в два раза сложнее.
  5. Соблюдение принципов SOLID при написании кода: код должен быть готов к отчуждению логики пакетной обработки. Само по себе внедрение этих принципов даст некоторые преимущества, так что данный минус самый несущественный.

Важно отметить, что использовать этот метод можно и не делая реверс-инжиниринга как такового. Нам нужно получить контракт getLast, от которого зависит контракт предварительного расчета запросов (далее — prefetch). В данном случае мы сделали это, рассмотрев реализацию getLast (реверс-инжиниринг). Однако при таком подходе возникают сложности: правка этих двух кусков кода всегда должна быть синхронной, а обеспечить это никак невозможно (вспомните hashCode и equals, там ровно то же самое). Следующий подход, который я хотел бы показать, как раз призван решить эту проблему (или хотя бы смягчить).

Бизнес-эвристики

Решение проблемы контракта

Что если оперировать не точным контрактом и, следовательно, точным набором запросов, а примерным? Причем мы построим примерный набор так, что он будет строго включать точный и базироваться на особенностях предметной области.

Таким образом, вместо зависимости контракта prefetch от getLast установим зависимость их обоих от какого-то общего контракта, который будет диктоваться пользователем. Основная сложность будет в том, чтобы как-то овеществить этот общий контракт в виде кода.

Поиск полезных ограничений

Давайте попробуем сделать это на нашем примере. В нашем случае есть следующие бизнес-особенности:

  • список участников чата предопределен;
  • чаты абсолютно изолированы друг от друга;
  • вложенность цепочек reply/forward небольшая (~2–3 сообщения).

Из первого ограничения следует, что не нужно бегать по сообщениям, смотреть, какие там есть пользователи, выбирать уникальных и по ним делать запрос. Можно просто сделать запрос по предопределенному списку. Если вы согласились с этим утверждением, значит, я вас поймал.

На самом деле все не так просто. Список может быть предопределен, но в нем могут быть тысячи пользователей. Такие вещи необходимо уточнять. В нашем случае участников чата, как правило, будет два — три, редко больше. Так что вполне допустимо получать данные по ним всем.

Далее, если список пользователей чата предопределен, но этой информации нет в сервисе пользователей (что очень вероятно), то толку от такой информации тоже не будет. Мы сделаем лишний запрос списка пользователей чата, а потом все равно придется делать запрос(-ы) к сервису пользователей.

Допустим, что информация о связи пользователей и чата хранится в сервисе пользователей. В нашем случае это так, поскольку связь определяется правами пользователя. Тогда для пользователей получится такой prefetch-код:

Здесь может показаться удивительным то, что мы не передаем никакого идентификатора чата. Я сделал это намеренно, чтобы не загромождать код примеров.

Из второго ограничения, на первый взгляд, ничего не следует. Во всяком случае, у меня так и не получилось вывести из него что-то полезное.

Третье ограничение мы уже использовали ранее. Оно может оказать существенное влияние на то, как мы будем хранить и получать цепочки сообщений. Не станем развивать эту тему, так как к REST-контроллеру и пакетной обработке это не имеет отношения.

Что же делать с файлами? Очень хочется получить список всех файлов чата одним простым запросом. По условиям API нам нужны только заголовки файлов, без тел, так что это не выглядит ресурсоемкой и опасной задачей для вызывающей стороны.

С другой стороны, нужно помнить, что мы получаем не все сообщения чата, а только последние N, и легко может оказаться, что они не содержат вообще никаких файлов.

Здесь не может быть универсального ответа: все сильно зависит от бизнес-специфики и вариантов использования. При создании продуктового решения можно попасть впросак, если заложить эвристику под один вариант использования, а потом пользователи будут работать с функционалом другим способом. Для демонстраций и пресейлов это хороший вариант, но сейчас мы пытаемся написать честный продакшен-сервис.

Так что, увы, делать для файлов бизнес-эвристику здесь можно будет только по итогам эксплуатации и сбора статистики (либо после экспертной оценки).

Поскольку хочется все-таки как-то применить наш метод, допустим, что статистика показала следующее:

  1. Типичная цепочка начинается с сообщения, включающего один или несколько файлов, за которым следуют ответные сообщения (reply) без файлов.
  2. Почти все сообщения входят в типичные цепочки.
  3. Ожидаемое количество уникальных файлов в рамках одного чата ~20.

Отсюда следует, что для отображения почти всех сообщений понадобится получить заголовки каких-то файлов (потому что так устроен ChatMessageUI) и что общее количество файлов невелико. В таком случае разумным выглядит получение всех файлов чата одним запросом. Для этого в наш API для файлов придется добавить следующее:

fun getHeadsByChat(): List<FileHeadRemote>

Метод getHeadsByChat не выглядит надуманным и сделанным чисто из-за нашего желания оптимизировать производительность (хотя это тоже вполне себе обоснование). Довольно часто в чатах с файлами пользователи хотят увидеть все использованные файлы, причем в порядке их добавления (поэтому используем List).

Реализация такой явной связи потребует хранения дополнительной информации в файловом сервисе или в нашем приложении. Все зависит от того, в чьей зоне ответственности, как мы считаем, должна храниться эта избыточная информация о связи файла с чатом. Избыточная она потому, что с файлом уже связано сообщение, а оно в свою очередь связано с чатом. Можно не использовать денормализацию, а извлекать эту информацию на лету из сообщений, то есть внутри SQL получать сразу все файлы по всему чату (это в нашем приложении) и запрашивать их все сразу у файлового сервиса. Такой вариант будет работать похуже, если сообщений в чате окажется много, но зато нам не понадобится денормализация. Я бы оба варианта скрыл за getHeadsByChat.

Код получился такой:

override fun getLast(n: Int) =
  messageRepository.findLast(n)
    .let { messages ->
      join(
        { userRepository.getUsersByChat().associateBy { it.id } },
        { fileRepository.getHeadsByChat().associateBy { it.id } }
      )
        .let { // здесь ничего не изменилось
        }
    }

Видно, что по сравнению с предыдущим вариантом изменилось очень мало и изменения коснулись только части с prefetch, что замечательно.

Код prefetch стал намного короче и понятнее.

Время исполнения не изменилось, что логично, так как количество запросов осталось прежним. Теоретически возможны случаи, когда масштабирование будет лучше, чем у честного реверс-инжиниринга (только за счет убирания звена сложного расчета). Однако в равной степени вероятны противоположные ситуации: эвристики гребут слишком много лишнего. Как показывает практика, если удается придумать адекватные эвристики, то особых перемен во времени исполнения быть не должно.

Однако это еще не все. Мы не учли, что теперь получение детальных данных по пользователям и файлам не связано с получением сообщений и запросы можно запустить параллельно:

Такой вариант дает стабильные 100 мс на запрос.

Ошибки эвристик

Что если при использовании эвристик набор запросов окажется не больше, а чуть меньше, чем должен быть? Для большинства вариантов такие эвристики подойдут, но будут исключения, ради которых придется делать отдельный запрос. В моей практике такого рода решения оказывались неудачными, так как каждое исключение сильно сказывалось на производительности, и в конце концов какой-то пользователь делал запрос, полностью состоящий из исключений. Я бы сказал, что в таких ситуациях лучше использовать реверс-инжиниринг, даже если алгоритм сбора запросов получается жуткий и нечитаемый, но, конечно, все зависит от критичности сервиса.

Выводы

Плюсы:

  1. Логика бизнес-эвристик легко читается и обычно тривиальна. Это хорошо для того, чтобы понять границы применимости, верифицировать и изменять контракт prefetch.
  2. Масштабируемость такая же хорошая, как у реверс-инжиниринга.
  3. Уменьшается связность кода по данным, что может привести к лучшей параллелизации кода.
  4. Логика prefetch, как и основная логика REST-контроллера, базируется на требованиях. Это слабый плюс, если требования часто меняются.

Минусы:

  1. Из требований не так легко вывести эвристики для предсказаний запросов. Могут понадобиться уточнения требований, причем до такой степени, которая плохо совместима с agile.
  2. Можно получить лишние данные.
  3. Для обеспечения эффективной работы контракта prefetch, вероятно, понадобится денормализация хранения данных. Это слабый минус, так как эти оптимизации следуют из бизнес-логики и потому, скорее всего, будут востребованы разными процессами.

Из нашего примера можно сделать вывод, что применить данный подход очень сложно и овчинка не стоит выделки. На самом деле в реальных бизнес-проектах количество ограничений огромно и из этой кучи часто удается достать что-то полезное, что позволяет партиционировать данные или предсказывать статистику. Главный плюс этого подхода в том, что используемые ограничения трактуются бизнесом, поэтому они легко понимаются и валидируются.

Обычно самой большой проблемой при попытке использовать этот подход оказывается разделение деятельности. Разработчик должен хорошо погрузиться в бизнес-логику и задавать уточняющие вопросы аналитику, что требует определенного уровня инициативности.

Агрегаты в стиле DDD

В больших проектах часто можно увидеть использование практик DDD, поскольку они позволяют эффективно структурировать код. Не обязательно использовать в проекте все шаблоны DDD — иногда можно получить хорошую отдачу даже от внедрения одного. Рассмотрим такое понятие DDD, как агрегат. Агрегатом называют объединение логически связанных сущностей, работа с которыми осуществляется только через корень агрегата (обычно это сущность, которая является вершиной графа связности сущностей).

С точки зрения получения данных главное в агрегате то, что вся логика работы со списками сущностей находится в одном месте — агрегате. Есть два подхода к тому, что следует передавать в агрегат при его конструировании:

Передаем в агрегат функции для получения внешних данных. Логика определения необходимых данных живет внутри агрегата. Передаем все необходимые данные. Логика определения необходимых данных живет вне агрегата.

Выбор подхода во многом зависит от того, насколько легко можно вынести prefetch за рамки агрегата. Если логика prefetch базируется на бизнес-эвристиках, то обычно ее несложно отделить от агрегата. Выносить за рамки агрегата логику, основанную на анализе его использования (реверс-инжиниринг), может оказаться опасным, так как мы разносим логически связанный код по разным классам.

Логика укрупнения запросов внутри агрегата

Попробуем набросать агрегат, который бы соответствовал понятию «чат». Наши классы ChatMessage, UserReference, FileReference соответствуют модели хранения, поэтому их можно было бы переименовать с каким-то соответствующим префиксом, но у нас проект маленький, поэтому оставим как есть. Агрегат назовем Chat, а его составляющие — ChatPage и ChatPageMessage:

interface Chat {
  fun getLastPage(n: Int): ChatPage
}

interface ChatPage {
  val messages: List<ChatPageMessage>
}

data class ChatPageMessage(
  val id: Long,
  val author: UserRemote,
  val message: String,
  val files: List<FileHeadRemote>,
  val replyTo: ChatPageMessage?,
  val forwardFrom: ChatPageMessage?
)

Пока что получается довольно много бессмысленного дублирования. Это связано с тем, что наша предметная модель похожа на модель хранения и они обе похожи на модель для фронтенда.

Я использую классы FileHeadRemote и UserRemote напрямую, чтобы не писать лишнего кода, хотя обычно в домене стоит избегать прямого использования таких классов.

Если использовать такой агрегат, наш REST-контроллер можно переписать так:

class ChatRestController(
  private val chat: Chat
) : ChatRestApi {
  override fun getLast(n: Int) =
    chat.getLastPage(n).toFrontModel()

  private fun ChatPage.toFrontModel() =
    messages.map { it.toFrontModel() }

  private fun ChatPageMessage.toFrontModel(): ChatMessageUI =
    ChatMessageUI(
      id = id,
      author = author.toFrontReference(),
      message = message,
      files = files.toFrontReference(),
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
}

Этот вариант во многом напоминает нашу первую наивную реализацию, но при этом имеет важное преимущество: контроллер больше не занимается получением данных напрямую и не зависит от классов, связанных с хранением данных, а зависит только от агрегата, который задан через интерфейсы. Таким образом, и логики prefetch больше нет в контроллере. Контроллер занимается только преобразованием агрегата в модель фронтенда, что дает нам соблюдение принципа единственной ответственности (Single Responsibility Principle, SRP).

К сожалению, для всех описанных в агрегате методов придется написать реализацию.

Попробуем просто сохранить логику контроллера, реализованную при использовании бизнес-эвристик.

class ChatImpl(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : Chat {
  override fun getLastPage(n: Int) = object : ChatPage {
    override val messages: List<ChatPageMessage>
      get() =
        runBlocking(IO) {
          val prefetch = async(
            { userRepository.getUsersByChat().associateBy { it.id } },
            { fileRepository.getHeadsByChat().associateBy { it.id } }
          )

          withContext(IO) { messageRepository.findLast(n) }
            .map(
              prefetch.await().let { (users, files) ->
                recursiveMemoize { message, memoized: (ChatMessage) -> ChatPageMessage ->
                  message.toDomainModel(
                    getUser = users::get.orThrow { IllegalArgumentException("User $it") },
                    getFile = files::get.orThrow { IllegalArgumentException("File $it") },
                    // рекурсивный вызов этой же функции с мемоизацией
                    serializeMessage = memoized
                  )
                }
              }
            )
        }
  }
}

private fun ChatMessage.toDomainModel(
  getUser: (UserReference) -> UserRemote,
  getFile: (FileReference) -> FileHeadRemote,
  serializeMessage: (ChatMessage) -> ChatPageMessage
) = ChatPageMessage(
  id = id ?: throw IllegalStateException("$this must be persisted"),
  author = getUser(author),
  message = message,
  files = files?.map(getFile) ?: listOf(),
  forwardFrom = forwardFrom?.let(serializeMessage),
  replyTo = replyTo?.let(serializeMessage)
)

Здесь получилось, что в самой функции getLastPage живет стратегия получения данных, включая prefetch, а функция toDomainModel чисто техническая и отвечает за преобразование хранимых моделей в модель предметной области.

Параллельные вызовы userRepository, fileRepository и messageRepository я переписал в более привычном для Kotlin виде. Надеюсь, что понятность кода из-за этого не пострадала.

В целом такой метод уже вполне работоспособен, производительность при его применении будет такой же, как при простом использовании реверс-инжиниринга или бизнес-эвристик.

Логика укрупнения запросов вне агрегата

В процессе создания агрегата мы сразу же столкнемся с проблемой: для конструирования ChatPage размер страницы нужно будет задавать как константу при создании Chat, а не передавать его в getLast(), как обычно. Придется поменять сам интерфейс агрегата:

interface Chat {
  fun getPage(): ChatPage
}

Поскольку у нас есть дочитка остальных сообщений и мы твердо хотим получать все данные за рамками агрегата, нам придется вообще отказаться от агрегата уровня Chat и сделать корнем ChatPage:

class ChatPageImpl(
  private val messageData: List<ChatMessage>,
  private val userData: List<UserRemote>,
  private val fileData: List<FileHeadRemote>
) : ChatPage {
  override val messages: List<ChatPageMessage>
    get() =
      messageData.map(
        (userData.associateBy { it.id } to fileData.associateBy { it.id })
          .let { (users, files) ->
            recursiveMemoize { message, self: (ChatMessage) -> ChatPageMessage ->
              message.toDomainModel(
                getUser = users::get.orThrow(),
                getFile = files::get.orThrow(),
                // рекурсивный вызов этой же функции с мемоизацией
                serializeMessage = self
              )
            }
          }
      )
}

Далее необходимо создать код prefetch, отдельный от агрегата:

fun chatPagePrefetch(
  pageSize: Int,
  messageRepository: ChatMessageRepository,
  userRepository: UserRemoteApi,
  fileRepository: FileRemoteApi
) =
  runBlocking(IO) {
    async(
      { userRepository.getUsersByChat() },
      { fileRepository.getHeadsByChat() },
      { messageRepository.findLast(pageSize) }
    )
  }

Теперь для того, чтобы создать агрегат, нужно его состыковать с prefetch. В DDD такого рода оркестрациями занимаются Application Services.

class ChatService(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) {
  private fun chatPagePrefetch(pageSize: Int) =
    runBlocking(IO) {
      async(
        { messageRepository.findLast(pageSize) },
        { userRepository.getUsersByChat() },
        { fileRepository.getHeadsByChat() }
      ).await()
    }

  fun getLastPage(n: Int): ChatPage =
    chatPagePrefetch(n)
      .let { (messageData, userData, fileData) ->
        ChatPageImpl(messageData, userData, fileData)
      }
}

Ну а контроллер особо не изменится, нужно только вместо Chat::getLastPage использовать ChatService::getLastPage. То есть код изменится так:

class ChatRestController(
  private val chat: ChatService
) : ChatRestApi

Выводы:

  1. Логику prefetch можно поместить как внутрь агрегата, так и в отдельное место.
  2. Если логика prefetch сильно связана с внутренней логикой агрегата, лучше не выносить ее наружу, так как это может нарушить инкапсуляцию. Я лично не вижу особого смысла выносить prefetch за пределы агрегата, поскольку это сильно ограничивает возможности и увеличивает неявную связность кода.
  3. На производительность пакетной обработки сама по себе организация агрегатов влияет положительно, так как контроля над тяжелыми запросами становится больше и место для логики prefetch становится вполне определенным.

В следующей главе мы рассмотрим такой вариант реализации prefetch, который невозможно реализовать в отрыве от основной функции.

Проксирование и двойной вызов

Решение проблемы контракта

Как мы уже разобрались в предыдущих частях, основная проблема контракта prefetch в том, что он сильно связан с контрактом функции, для которой он должен подготовить данные. Если быть более точным, то он зависит от того, какие данные могут понадобиться основной функции. Что если мы не будем пытаться предсказывать, а попробуем сделать реверс-инжиниринг средствами самого кода? В простых ситуациях нам может помочь подход проксирования, широко используемый в тестировании. Такие библиотеки, как Mockito, генерируют классы с имплементаций интерфейсов, которые могут в том числе накапливать информацию о вызовах. Похожий подход используется в нашей библиотеке.

Если вызвать основную функцию с проксированными репозиториями и собрать информацию о необходимых данных, то потом можно будет эти данные получить в виде пакета и повторно вызвать основную функцию для получения финального результата.

Основное условие следующее: запрашиваемые данные не должны влиять на последующие запросы. Прокси будет возвращать не реальные данные, а только какие-то заглушки, поэтому все ветвления и получения связанных данных отпадают.

В нашем случае это означает, что бесполезно проксировать messageRepository, поскольку по результатам запроса сообщений и делаются дальнейшие запросы. Это не проблема, так как к messageRepository у нас всего один запрос, так что никакой пакетной обработки здесь и не требуется.

Поскольку проксировать мы будем простые функции UserReference->UserRemote и FileReference->FileHeadRemote, то накапливать нужно просто два списка аргументов.

В итоге получаем следующее:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(n: Int): List<ChatMessageUI> {
    val messages = messageRepository.findLast(n)

    // функция получения списка сообщений
    fun transform(
      getUser: (UserReference) -> UserRemote,
      getFile: (FileReference) -> FileHeadRemote
    ): List<ChatMessageUI> =
      messages.map(
        recursiveMemoize { message, self ->
          message.toFrontModel(getUser, getFile, self)
        }
      )

    // накапливаем запросы
    val userIds = mutableSetOf<UserReference>()
    val fileIds = mutableSetOf<FileReference>()
    transform(
      { userIds += it; UserRemote(0L, "") },
      { fileIds += it; FileHeadRemote(0L, "") }
    )

    return runBlocking(IO) {
      // получаем данные по всем запросам сразу
      async(
        { userRepository.getUsersByIds(userIds).associateBy { it.id }::get.orThrow() },
        { fileRepository.getHeadsByIds(fileIds).associateBy { it.id }::get.orThrow() }
      ).await().let { (getUser, getFile) ->
        transform(getUser, getFile)
      }
    }
  }
}

Если измерить производительность, получится, что при данном подходе она не хуже, чем при использовании методов реверс-инжиниринга, хотя мы вызываем функцию два раза. Это связано с тем, что по сравнению со временем выполнения внешних запросов, временем выполнения функции преобразования можно пренебречь (в нашем случае).

Если сравнивать с производительностью при использовании бизнес-эвристик, то в нашем случае накопление запросов окажется менее эффективным. Но нужно учитывать, что не всегда удается найти такие хорошие эвристики. Например, если количество пользователей в чате будет большим, как и количество файлов, и при этом файлы будут прикрепляться к сообщениям редко, то наш алгоритм на бизнес-эвристиках сразу же начнет проигрывать честному получению списку запросов.

Выводы

Плюсы:

  1. Для реализации prefetch не нужен реверс-инжиниринг кода получения данных.
  2. В большинстве случаев изменения основной функции не повлияют на prefetch.
  3. Масштабируемость такая же хорошая, как у реверс-инжиниринга.

Минусы:

  1. Требуется независимость следующих запросов от результатов предыдущих.
  2. Редко используется, поэтому логика не так очевидна.

Несмотря на кажущуюся экзотичность, накопление запросов через проксирование и повторный вызов вполне применимо в ситуациях, когда логика основной функции не завязана на получаемые данные. Основная сложность здесь такая же, как при реверс-инжиниринге: мы закладываемся на текущую реализацию функции, хотя и в гораздо меньшей степени (только на тот факт, что следующие запросы не зависят от результатов предыдущих запросов).

Производительность упадет незначительно, зато в коде prefetch не нужно будет учитывать всех нюансов реализации основной функции.

Можно использовать такой подход, когда не получается построить хороших бизнес-эвристик для предсказания запросов, а связность prefetch и функции хочется уменьшить.

Заключение

Использование пакетной обработки не так просто, как кажется на первый взгляд. Думаю, все шаблоны проектирования обладают этим свойством (вспомните кэширование).

Для эффективной пакетной обработки запросов вызывающей стороне важно собрать вместе как можно больше запросов, что часто затрудняется структурой приложения. Выхода два: либо проектировать приложение в расчете на эффективную работу с данными (очень может быть, что это приведет к реактивному устройству приложения), либо, как это часто бывает, пытаться внедрить пакетную обработку в существующее приложение без значительной его перестройки.

Самый очевидный способ собрать запросы в кучу — это реверс-инжиниринг существующего кода в поисках тяжелых запросов. Главным недостатком этого подхода будет увеличение неявной связности кода. Альтернатива — задействовать информацию о бизнес-особенностях для того, чтобы разделить данные на порции, которые часто используются совместно и целиком. Иногда для такого эффективного разделения придется денормализовать хранение, но зато, если такое получится осуществить, логика пакетной обработки будет определятся предметной областью, что хорошо.

Менее очевидный способ получить все запросы — реализовать два прохода. На первом этапе собираем все необходимые запросы, на втором работаем с уже полученными данными. Применимость такого подхода ограничена требованием независимости запросов друг от друга.

Не пропустите новые полезные статьи!

Спасибо за подписку!

Мы отправили вам письмо для подтверждения вашего email.
С уважением, OTUS!

Автор
0 комментариев
Для комментирования необходимо авторизоваться
Популярное
Сегодня тут пусто