Проблемы пакетной обработки запросов и их решения | OTUS

Проблемы пакетной обработки запросов и их решения

Практически все современные программные продукты состоят из нескольких сервисов. Часто большое время отклика межсервисных каналов становится источником проблем с производительностью. Стандартное решение такого рода проблем — это упаковка нескольких межсервисных запросов в один пакет, которую называют пакетной обработкой (batching).

sb0raprlu2jwllu3gasyznuom40_1-1801-c0fd9d.jpeg

Если вы используете пакетную обработку, вас может не устраивать ее результат с точки зрения производительности или понятности кода. Этот метод не так прост для вызывающей стороны, как можно подумать. Для разных целей и в разных ситуациях решения могут сильно различаться. На конкретных примерах я покажу плюсы и минусы нескольких подходов.

Демонстрационный проект

Для наглядности рассмотрим пример одного из сервисов в приложении, над которым я сейчас работаю.

Пояснение по выбору платформы для примеров

Проблема плохой производительности достаточно общая и не касается каких-то конкретных языков и платформ. В этой статье для демонстрации задач и решений будут использоваться примеры кода на Spring + Kotlin. Kotlin одинаково понятен (или непонятен) Java- и C#- разработчикам, кроме того, код получается более компактным и понятным, чем на Java. Чтобы облегчить понимание для чистых Java-разработчиков, я буду избегать черной магии Kotlin и использовать только белую (в духе Lombok). Будет немного extension-методов, но они на самом деле знакомы всем Java-программистам как static-методы, так что это будет небольшим сахарком, который не испортит вкус блюда.

Есть сервис согласования документов. Кто-то создает документ и выносит его на обсуждение, в процессе которого делаются правки, и в конечном итоге документ согласуется. Сам сервис согласования ничего не знает о документах: это просто чат согласующих с небольшими дополнительными функциями, которые мы здесь рассматривать не будем.

Итак, есть комнаты чатов (соответствуют документам) с предопределенным набором участников в каждой из них. Как в обычных чатах, сообщения содержат текст и файлы и могут быть ответами (reply) и пересылками (forward):

data class ChatMessage(
  // nullable так как появляется только после persist
  val id: Long? = null,
  /** Ссылка на автора */
  val author: UserReference,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
  val files: List<FileReference>? = null,
  /** Если является ответом, то здесь будет оригинал */
  val replyTo: ChatMessage? = null,
  /** Если является пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessage? = null
)

Ссылки на файл и пользователя — это ссылки на другие домены. У нас это живет так:

typealias FileReference = Long
typealias UserReference = Long

Данные по пользователям хранятся в Keycloak и получаются через REST. То же самое касается файлов: файлы и метаинформация о них живут в отдельном сервисе файлового хранилища.

Все вызовы этих сервисов являются тяжелыми запросами. Это означает, что накладные расходы на транспорт этих запросов много больше, чем время их обработки сторонним сервисом. На наших тестовых стендах типичное время вызова таких сервисов — 100 мс, так что в дальнейшем будем использовать эти цифры.

Нам нужно сделать простой REST-контроллер для получения последних N сообщений со всей необходимой информацией. То есть считаем, что во фронтенде модель сообщений почти такая же и нужно переслать все данные. Отличие модели для фронтенда в том, что файл и пользователя нужно представить в немного расшифрованном виде, чтобы сделать их ссылками:

/** В таком виде отдаются ссылки на сущности для фронта */
data class ReferenceUI(
  /** Идентификатор для url */
  val ref: String,
  /** Видимое пользователю название ссылки */
  val name: String
)
data class ChatMessageUI(
  val id: Long,
  /** Ссылка на автора */
  val author: ReferenceUI,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  val files: List<ReferenceUI>,
  /** Если являтся ответом, то здесь будет оригинал */
  val replyTo: ChatMessageUI? = null,
  /** Если являтся пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessageUI? = null
)

Нам нужно реализовать следующее:

interface ChatRestApi {
  fun getLast(n: Int): List<ChatMessageUI>
}

Постфикс UI означает DTO-модельки для фронтенда, то есть то, что мы должны отдать через REST.

Здесь может показаться удивительным то, что мы не передаем никакого идентификатора чата и даже в модели ChatMessage/ChatMessageUI его нет. Я сделал это намеренно, чтобы не загромождать код примеров (чаты изолированы, так что можно считать, что у нас он вообще один).

Философское отступление

И в классе ChatMessageUI, и в методе ChatRestApi.getLast используется тип данных List, тогда как на самом деле это упорядоченный Set. В JDK с этим все плохо, поэтому декларировать порядок элементов на уровне интерфейса (сохранение порядка при добавлении и извлечении) не получится. Так что общей практикой стало использование List в тех случаях, когда нужен упорядоченный Set (еще есть LinkedHashSet, но это не интерфейс).

Важное ограничение: будем считать, что длинных цепочек ответов или пересылок не бывает. То есть они есть, но их длина не превышает трех сообщений. Во фронтенд цепочка сообщений должна передаваться целиком.

Для получения данных из внешних сервисов есть такие API:

interface ChatMessageRepository {
  fun findLast(n: Int): List<ChatMessage>
}
data class FileHeadRemote(
  val id: FileReference,
  val name: String
)
interface FileRemoteApi {
  fun getHeadById(id: FileReference): FileHeadRemote
  fun getHeadsByIds(id: Set<FileReference>): Set<FileHeadRemote>
  fun getHeadsByIds(id: List<FileReference>): List<FileHeadRemote>
  fun getHeadsByChat(): List<FileHeadRemote>
}
data class UserRemote(
  val id: UserReference,
  val name: String
)
interface UserRemoteApi {
  fun getUserById(id: UserReference): UserRemote
  fun getUsersByIds(id: Set<UserReference>): Set<UserRemote>
  fun getUsersByIds(id: List<UserReference>): List<UserRemote>
}

Видно, что во внешних сервисах изначально предусмотрена пакетная обработка, причем в обоих вариантах: через Set (без сохранения порядка элементов, с уникальными ключами) и через List (могут быть и дубли — порядок сохраняется).

Простые реализации

Наивная реализация

Первая наивная реализация нашего REST-контроллера будет выглядеть в большинстве случаев как-то так:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(n: Int) =
    messageRepository.findLast(n)
      .map { it.toFrontModel() }

  private fun ChatMessage.toFrontModel(): ChatMessageUI =
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = userRepository.getUserById(author).toFrontReference(),
      message = message,
      files = files?.let { files ->
        fileRepository.getHeadsByIds(files)
          .map { it.toFrontReference() }
      } ?: listOf(),
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
}

Все предельно ясно, и это большой плюс.

Мы используем пакетную обработку и получаем данные из внешнего сервиса пакетами. Но что у нас происходит с производительностью?

Для каждого сообщения будет сделан один вызов UserRemoteApi для получения данных по полю author и один вызов FileRemoteApi для получения всех приложенных файлов. Вроде бы, все. Допустим, что поля forwardFrom и replyTo для ChatMessage получаются так, что это не потребует лишних вызовов. Но вот превращение их в ChatMessageUI приведет к рекурсии, то есть показатели счетчиков вызовов могут сильно вырасти. Как мы отметили ранее, допустим, что большой вложенности у нас не бывает и цепочка ограничена тремя сообщениями.

В итоге получим от двух до шести вызовов внешних сервисов на одно сообщение и один JPA-вызов на весь пакет сообщений. Общее количество вызовов будет варьироваться от 2N+1 до 6N+1. Сколько это в реальных единицах? Предположим, для отрисовки страницы нужно 20 сообщений. Чтобы их получить, понадобится от 4 c до 10 с. Ужасно! Хотелось бы уложиться в 500 мс. А поскольку во фронтенде мечтали сделать бесшовный скролл, требования к производительности этого endpoint можно удваивать.

Плюсы:

  1. Код краткий и самодокументируемый (мечта саппорта).
  2. Код простой, поэтому возможностей выстрелить в ногу почти нет.
  3. Пакетная обработка не выглядит чем-то чужеродным и органично вписана в логику.
  4. Изменения логики будут вноситься легко и будут локальными.

Минус:

Ужасная производительность, связанная с тем, что пакеты получаются очень маленькими.

Такой подход достаточно часто можно увидеть в простых сервисах или в прототипах. Если важна скорость внесения изменений, вряд ли стоит усложнять систему. В то же время для нашего очень простого сервиса производительность получается ужасной, так что рамки применимости у такого подхода очень узкие.

Наивная параллельная обработка

Можно запустить обработку всех сообщений параллельно — это позволит избавиться от линейного роста времени в зависимости от количества сообщений. Это не особенно хороший путь, потому что он приведет к большой пиковой нагрузке на внешний сервис.

Внедрить параллельную обработку очень просто:

override fun getLast(n: Int) =
  messageRepository.findLast(n).parallelStream()
    .map { it.toFrontModel() }
    .collect(toList())

Используя параллельную обработку сообщений, получим 300–700 мc в идеале, что намного лучше, чем при наивной реализации, но по-прежнему недостаточно быстро.

При таком подходе запросы к userRepository и fileRepository будут выполняться синхронно, что не очень эффективно. Чтобы это исправить, придется достаточно сильно изменить логику вызовов. Например, через CompletionStage (aka CompletableFuture):

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  CompletableFuture.supplyAsync {
    userRepository.getUserById(author).toFrontReference()
  }.thenCombine(
    files?.let {
      CompletableFuture.supplyAsync {
        fileRepository.getHeadsByIds(files).map { it.toFrontReference() }
      }
    } ?: CompletableFuture.completedFuture(listOf())
  ) { author, files ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }.get()!!

Видно, что изначально простой код маппинга стал менее понятным. Это вызвано тем, что нам пришлось отделить вызовы внешних сервисов от места использования результатов. Само по себе это неплохо. Но вот комбинирование вызовов выглядит не особо изящно и напоминает типичную реактивную «лапшу».

Если использовать корутины, все станет выглядеть приличнее:

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  join(
    { userRepository.getUserById(author).toFrontReference() },
    { files?.let { fileRepository.getHeadsByIds(files)
      .map { it.toFrontReference() } } ?: listOf() }
  ).let { (author, files) ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }

Где:

fun <A, B> join(a: () -> A, b: () -> B) =
  runBlocking(IO) {
    awaitAll(async { a() }, async { b() })
  }.let {
    it[0] as A to it[1] as B
  }

Теоретически, используя такую параллельную обработку, получим 200–400 мc, что уже близко к нашим ожиданиям.

К сожалению, такого хорошего распараллеливания не бывает, да и расплата довольно жестокая: при одновременной работе всего нескольких пользователей на сервисы обрушится шквал запросов, которые все равно не будут обрабатываться параллельно, так что мы вернемся к нашим печальным 4 с.

Мой результат при использовании такого сервиса — 1300–1700 мс на обработку 20 сообщений. Это быстрее, чем в первой реализации, но все-таки проблему не снимает.

Альтернативное применение параллельных запросов

Что если в сторонних сервисах не предусмотрена пакетная обработка? Например, можно спрятать отсутствие реализации пакетной обработки внутри методов интерфейсов:

interface UserRemoteApi {
  fun getUserById(id: UserReference): UserRemote
  fun getUsersByIds(id: Set<UserReference>): Set<UserRemote> =
    id.parallelStream()
      .map { getUserById(it) }.collect(toSet())
  fun getUsersByIds(id: List<UserReference>): List<UserRemote> =
    id.parallelStream()
      .map { getUserById(it) }.collect(toList())
}

Это имеет смысл, если есть надежда на появление пакетной обработки в следующих версиях.

Плюсы:

  1. Легкое внедрение параллельной обработки по сообщениям.
  2. Хорошая масштабируемость.

Минусы:

  1. Необходимость отделения получения данных от их обработки при параллельной обработке запросов к разным сервисам.
  2. Повышенная нагрузка на сторонние сервисы.

Видно, что рамки применимости примерно такие же, как у наивного подхода. Использовать метод параллельных запросов имеет смысл, если вы хотите в несколько раз увеличить производительность своего сервиса за счет нещадной эксплуатации чужих. В нашем примере производительность увеличилась в 2,5 раза, но этого явно недостаточно.

Кэширование

Можно сделать кэширование в духе JPA для внешних сервисов, то есть в рамках сессии хранить полученные объекты, чтобы не получать их еще раз (в том числе при пакетной обработке). Можно сделать такие кэши самому, можно использовать Spring c его @Cacheable, плюс всегда можно использовать готовый кэш вроде EhCache вручную.

Общая проблема будет связана с тем, что от кэшей есть толк, только если есть попадания. В нашем случае весьма вероятны попадания по полю author (допустим, 50 %), а попаданий по файлам не будет вообще. Кое-какие улучшения этот подход даст, но радикально производительность не изменит (а нам нужен прорыв).

Межсессионные (длинные) кэши требуют сложной логики инвалидации. Вообще, чем позже вы скатитесь до того, что будете решать проблемы производительности с помощью межсессионных кэшей, тем лучше.

Плюсы:

  1. Внедрение кэширования без изменения кода.
  2. Прирост производительности в несколько раз (в некоторых случаях).

Минусы:

  1. Возможность снижения производительности при неправильном использовании.
  2. Большие накладные расходы памяти, особенно с длинными кэшами.
  3. Сложная инвалидация, ошибки в которой будут приводить к трудновоспроизводимым проблемам в рантайме.

Очень часто кэши используются только для того, чтобы быстро залатать проблемы проектирования. Это не означает, что их не нужно использовать. Однако всегда стоит относиться к ним с осторожностью и вначале оценивать полученный прирост производительности, а уже потом принимать решение.

В нашем примере от кэшей будет прирост производительности в районе 25 %. При этом минусов у кэшей довольно много, так что я бы не стал их здесь использовать.

Итоги

Итак, мы рассмотрели наивную реализацию сервиса, использующего пакетную обработку, и несколько простых способов ее ускорить.

Главное достоинство всех этих методов — простота, из которой есть много приятных следствий.

Общей проблемой этих способов является плохая производительность, связанная в первую очередь с размером пакетов. Поэтому если эти решения вам не подойдут, то стоит рассмотреть более радикальные методы.

Есть два основных направления, в которых можно поискать решения:

  • асинхронная работа с данными (требует смены парадигмы, поэтому в данной статье не рассматривается);
  • укрупнение пачек при сохранении синхронной обработки.

Укрупнение пачек позволит сильно сократить количество внешних вызовов и при этом сохранить код синхронным. Этой теме будет посвящена следующая часть статьи.

Больше материалов смотрите в моем блоге на Хабре: https://habr.com/ru/users/MZinchenko/.

Не пропустите новые полезные статьи!

Спасибо за подписку!

Мы отправили вам письмо для подтверждения вашего email.
С уважением, OTUS!

Автор
0 комментариев
Для комментирования необходимо авторизоваться
Популярное
Сегодня тут пусто