Scalaの型について(応用編その4)

タム

2024.02.02

12

こんにちは。タムです。

前回まで、Scalaの型応用編ということで、Pub/Subの実装を例に用いてきました。

Scalaの型というよりPub/Subの方がメインになってきてしまって申し訳ないです。

今回はいっそ開き直って、Pub/Subの話をもう少し深ぼっていきたいです。

型についての話はもうそんなに出ないと思うので、ご了承くださいw

異常系を考える

問題点

趣味で自分用のアプリケーションの実装であれば、前回の実装で十分というよりオーバースペックくらいだと思います。

しかし、実際の業務システムであれば、異常系も考慮しなければいけません。

処理の途中で異常終了するケースを考えてみます。

  def reserve(
      organizerId: UserId,
      participantIds: List[UserId],
      startAt: LocalDateTime,
      endAt: LocalDateTime,
      title: String,
      detail: String
  ): Unit =
    val (meeting, event) =
      Meeting.create(
        organizerId,
        participantIds,
        startAt,
        endAt,
        title,
        detail
      )
    meetingRepository.save(meeting) // ①
    val users = userRepsitory.getByIds(organizerId :: participantIds)
    publisher.publish(event) // ②


reserveメソッド全体にDBトランザクションが張られているとします。

つまり、reserveメソッドのどこかの処理で異常終了した場合はロールバックします。

  • ①で落ちた場合
    • DBはロールバックします。ミーティング作成通知(及び通知予約)も送られていないため、処理全体としてなかったことと同じであり、整合性は保たれています。
  • ②で落ちた場合
    • ミーティング作成通知(及び通知予約)の途中で落ちるため、何人かにはミーティングが作成された(もしくは開始・終了した)旨の通知が届くことでしょう。しかしDBはロールバックするため、実際にはミーティングは作成されておらず、整合性が保たれなくなってしまいます。

この問題を回避するためにはどうしたらいいでしょうか。

対応方針

ミーティング作成と、作成後の通知が同じトランザクション内で処理してしまっていることが問題と言えます。

なのでトランザクションを切り離したいです。

つまり、通知処理はあくまでミーティング作成のコミットが成功してから走るようにしたいです。

また、後続のトランザクションが失敗した場合、後続のみ再実行できるようにしたいので、

同じアプリケーションサービス内で複数トランザクションを実行するのではなく、

reserveメソッド内ではあくまでミーティング作成のトランザクションのみに責任を持ち、

通知は別プロセスで(何らかの方法で)行うようにします。


通知は別プロセスで行うとして、ミーティングが作成されたという事実を知る必要があります。

ドメインイベントを生成するようにしたので、それ自体をミーティングオブジェクトと同じDBに保存してしまいましょう。

そうすれば同じDBトランザクションで整合性を担保できます。

テーブル設計

ミーティングオブジェクトをDBに保存することを決めたので、テーブル設計を考えます。

各イベントごとにテーブルを用意してもいいのですが、コストがかかるためすべてのイベントを同じテーブルに保存する方向で考えます。

以下のようなカラム構成でどうでしょうか。

  • id (string): イベントID(UUID)
  • type (string): イベント種別(例:"MeetingCreated", "MeetingDeleted")
  • occuredOn (datetime): イベント発生日時
  • details (json): イベント詳細(例:{"meetingId": 1, "organizerId": 1, ...})
  • processed_at (datetime): イベント処理完了日時
  • created_at (timestamp): 作成日時
  • updated_at (timestamp): 更新日時

イベントを一意に指定するIDがあったほうが何かと便利だと思うので、idを付与しました。

イベント種別はtypeで識別します。

またイベントごとに異なるカラムについてはdetailsで吸収できるようにしました。

別プロセスでイベントを処理する際にすでに処理済みなのかを知る必要があるため、processed_atを付与しました。


次に実装です。

実装修正

イベント作成と通知が別プロセスということになったので、まずはイベント作成の方の実装からしてみます。

@@ -100,6 +100,10 @@ trait MeetingRepository:
 trait DomainEvent:
   val occuredOn = LocalDateTime.now()
 
+trait DomainEventRepository:
+  def save[T <: DomainEvent](event: T): Unit =
+    println(event)
+
 class MeetingCreated(
     val meetingId: MeetingId,
     val organizerId: UserId,
@@ -171,13 +175,9 @@ class EmailReserveService(
 
 class MeetingService(
     meetingRepository: MeetingRepository,
-    userRepsitory: UserRepository
+    userRepsitory: UserRepository,
+    eventRepository: DomainEventRepository
 ):
-  private val publisher = DomainEventPublisher(
-    EmailSendService(userRepsitory, EmailSender()),
-    EmailReserveService(userRepsitory, EmailReserver())
-  )
-
   def reserve(
       organizerId: UserId,
       participantIds: List[UserId],
@@ -196,8 +196,7 @@ class MeetingService(
         detail
       )
     meetingRepository.save(meeting)
-    publisher.publish(event)
+    eventRepository.save(event)
 
   def reserveWithFollowed(
       organizerId: UserId,
@@ -218,13 +217,10 @@ class MeetingService(
         detail
       )
     meetingRepository.save(meeting)
-    publisher.publish(event)
+    eventRepository.save(event)
 
   def remove(meetingId: MeetingId): Unit =
     val meeting = meetingRepository.get(meetingId)
     val event = meeting.delete
     meetingRepository.delete(meetingId, event)
-    publisher.publish(event)
+    eventRepository.save(event)


publisher.publishの代わりにeventRepository.saveを呼ぶようになっただけです。簡単ですね。


では続いて、通知処理です。

@@ -99,11 +99,28 @@ trait MeetingRepository:
 
 trait DomainEvent:
   val occuredOn = LocalDateTime.now()
+  var processedAt: Option[LocalDateTime] = None
+
+  def processCompleted(): Unit =
+    processedAt = Some(LocalDateTime.now())
 
 trait DomainEventRepository:
   def save[T <: DomainEvent](event: T): Unit =
     println(event)
 
+  def getUnprocessed: List[DomainEvent] =
+    List(
+      MeetingCreated(
+        MeetingId(1),
+        UserId(1),
+        List(UserId(2), UserId(3)),
+        title = "ミーティング1",
+        detail = "全員集合",
+        startAt = LocalDateTime.now(),
+        endAt = LocalDateTime.now()
+      )
+    )
+
 class MeetingCreated(
     val meetingId: MeetingId,
     val organizerId: UserId,
@@ -173,6 +190,24 @@ class EmailReserveService(
           reserver.reserve(u.email, e.title, e.detail, e.endAt)
         })
 
+class EventProcessService(
+    eventRepository: DomainEventRepository,
+    userRepsitory: UserRepository
+):
+  val publisher = DomainEventPublisher(
+    EmailSendService(userRepsitory, EmailSender()),
+    EmailReserveService(userRepsitory, EmailReserver())
+  )
+
+  def process(): Unit =
+    val events = eventRepository.getUnprocessed
+    events.foreach(processEvent)
+
+  private def processEvent(event: DomainEvent): Unit =
+    publisher.publish(event)
+    event.processCompleted()
+    eventRepository.save(event)
+
 class MeetingService(
     meetingRepository: MeetingRepository,
     userRepsitory: UserRepository,


未処理のイベントリストを取得し、それぞれのイベントについてpublishし、完了時間を記録してからDBに保存します。

各イベント発生時の処理と別プロセスで行うという大幅な仕様変更ですが、実装的にはそれほど大きく変わっていません。

(別プロセスの具体的な方法については未実装ですが・・・)

重複排除

ところで、今の実装でも、イベントの処理途中で落ちた場合のことを考えると、

通知などは行われますが完了時間の記録は行われないため、

同じイベントが再度発行されてしまいます。

これはDBトランザクションでない以上、どうしようもないと思います。

ただし、テーブル設計でイベントIDを付与しているため、

同じイベントが発行されてもリモートのサブスクライバ側で処理済みの場合は破棄することが可能です。

イベント発行の際の冪等性は留意が必要なテーマだと思います。

まとめ

今回は、異常系について考慮する中で、イベントの保存と発行を別プロセスで行うという仕様変更を行いました。

なお、「何らかの方法で」別プロセスで行うと言いましたが、それを実現する具体的な技術論については

自分がまだ勉強不足のため、別の機会に譲ろうと思います。

あと、実はまだ一度も実行していないので、そろそろちゃんと動かしてみようと思います。

それについても次回以降に回したいと思います。


ひたすらリファクタを繰り返してきましたが、今回でかなり理想的な形に近づいたと思います。

現時点でのコード全容を貼っておきます。

// ドメイン層
class UserId(val value: Int) extends AnyVal
class Email(val value: String) extends AnyVal

class User(val id: UserId, val email: Email):
  def followed: List[User] =
    List(
      User(UserId(1), Email("hoge1@example.com")),
      User(UserId(2), Email("hoge2@example.com"))
    )

trait UserRepository:
  def get(id: UserId) =
    User(id, Email("hoge@example.com"))

  def getByIds(ids: List[UserId]) =
    ids.map(User(_, Email("hoge@example.com")))

class MeetingId(val value: Int) extends AnyVal

class Meeting(
    val id: MeetingId,
    val organizerId: UserId,
    val participantIds: List[UserId],
    val startAt: LocalDateTime,
    val endAt: LocalDateTime,
    val title: String,
    val detail: String
):
  def delete: MeetingDeleted =
    MeetingDeleted(id, organizerId, participantIds)

object Meeting:
  def create(
      organizerId: UserId,
      participantIds: List[UserId],
      startAt: LocalDateTime,
      endAt: LocalDateTime,
      title: String,
      detail: String
  ): (Meeting, MeetingCreated) =
    val id = MeetingId(1)
    val meeting =
      Meeting(id, organizerId, participantIds, startAt, endAt, title, detail)
    val event = MeetingCreated(
      id,
      organizerId,
      participantIds,
      title,
      detail,
      startAt,
      endAt
    )
    (meeting, event)

trait MeetingRepository:
  def save(meeting: Meeting): Unit
  def get(meetingId: MeetingId): Meeting
  def delete(meetingId: MeetingId, event: MeetingDeleted): Unit

trait DomainEvent:
  val occuredOn = LocalDateTime.now()
  var processedAt: Option[LocalDateTime] = None

  def processCompleted(): Unit =
    processedAt = Some(LocalDateTime.now())

trait DomainEventRepository:
  def save[T <: DomainEvent](event: T): Unit =
    println(event)

  def getUnprocessed: List[DomainEvent] =
    List(
      MeetingCreated(
        MeetingId(1),
        UserId(1),
        List(UserId(2), UserId(3)),
        title = "ミーティング1",
        detail = "全員集合",
        startAt = LocalDateTime.now(),
        endAt = LocalDateTime.now()
      )
    )

class MeetingCreated(
    val meetingId: MeetingId,
    val organizerId: UserId,
    val participantIds: List[UserId],
    val title: String,
    val detail: String,
    val startAt: LocalDateTime,
    val endAt: LocalDateTime
) extends DomainEvent

class MeetingDeleted(
    val meetingId: MeetingId,
    val organizerId: UserId,
    val participantIds: List[UserId]
) extends DomainEvent

trait DomainEventObserver:
  def respondTo[T <: DomainEvent](event: T): Unit

class DomainEventPublisher(obs: DomainEventObserver*):
  def publish[T <: DomainEvent](event: T): Unit =
    obs.foreach(_.respondTo(event))

// インフラストラクチャ層
class EmailSender:
  def send(to: Email, subject: String, body: String): Unit =
    println(s"email send to $to")

class EmailReserver:
  def reserve(
      to: Email,
      subject: String,
      body: String,
      when: LocalDateTime
  ): Unit =
    println(s"email reserved sending to $to when $when")

// アプリケーション層
class EmailSendService(
    userRepsitory: UserRepository,
    sender: EmailSender
) extends DomainEventObserver:
  override def respondTo[T <: DomainEvent](event: T): Unit =
    event match
      case e: MeetingCreated =>
        val users =
          userRepsitory.getByIds(e.organizerId :: e.participantIds)
        users.foreach(u => sender.send(u.email, e.title, e.detail))
      case e: MeetingDeleted =>
        val users =
          userRepsitory.getByIds(e.organizerId :: e.participantIds)
        users.foreach(u =>
          sender.send(u.email, "ミーティング削除のお知らせ", "ミーティングが削除されました")
        )

class EmailReserveService(
    userRepsitory: UserRepository,
    reserver: EmailReserver
) extends DomainEventObserver:
  override def respondTo[T <: DomainEvent](event: T): Unit =
    event match
      case e: MeetingCreated =>
        val users =
          userRepsitory.getByIds(e.organizerId :: e.participantIds)
        users.foreach(u => {
          reserver.reserve(u.email, e.title, e.detail, e.startAt)
          reserver.reserve(u.email, e.title, e.detail, e.endAt)
        })

class EventProcessService(
    eventRepository: DomainEventRepository,
    userRepsitory: UserRepository
):
  val publisher = DomainEventPublisher(
    EmailSendService(userRepsitory, EmailSender()),
    EmailReserveService(userRepsitory, EmailReserver())
  )

  def process(): Unit =
    val events = eventRepository.getUnprocessed
    events.foreach(processEvent)

  private def processEvent(event: DomainEvent): Unit =
    publisher.publish(event)
    event.processCompleted()
    eventRepository.save(event)

class MeetingService(
    meetingRepository: MeetingRepository,
    userRepsitory: UserRepository,
    eventRepository: DomainEventRepository
):
  def reserve(
      organizerId: UserId,
      participantIds: List[UserId],
      startAt: LocalDateTime,
      endAt: LocalDateTime,
      title: String,
      detail: String
  ): Unit =
    val (meeting, event) =
      Meeting.create(
        organizerId,
        participantIds,
        startAt,
        endAt,
        title,
        detail
      )
    meetingRepository.save(meeting)
    eventRepository.save(event)

  def reserveWithFollowed(
      organizerId: UserId,
      startAt: LocalDateTime,
      endAt: LocalDateTime,
      title: String,
      detail: String
  ): Unit =
    val organizer = userRepsitory.get(organizerId)
    val followers = organizer.followed
    val (meeting, event) =
      Meeting.create(
        organizerId,
        followers.map(_.id),
        startAt,
        endAt,
        title,
        detail
      )
    meetingRepository.save(meeting)
    eventRepository.save(event)

  def remove(meetingId: MeetingId): Unit =
    val meeting = meetingRepository.get(meetingId)
    val event = meeting.delete
    meetingRepository.delete(meetingId, event)
    eventRepository.save(event)


この記事をシェアする