タム
2024.02.02
86
こんにちは。タムです。
前回まで、Scalaの型応用編ということで、Pub/Subの実装を例に用いてきました。
Scalaの型というよりPub/Subの方がメインになってきてしまって申し訳ないです。
今回はいっそ開き直って、Pub/Subの話をもう少し深ぼっていきたいです。
型についての話はもうそんなに出ないと思うので、ご了承くださいw
趣味で自分用のアプリケーションの実装であれば、前回の実装で十分というよりオーバースペックくらいだと思います。
しかし、実際の業務システムであれば、異常系も考慮しなければいけません。
処理の途中で異常終了するケースを考えてみます。
def reserve(
organizerId: UserId,
participantIds: List[UserId],
startAt: LocalDateTime,
endAt: LocalDateTime,
title: String,
detail: String
): Unit =
val (meeting, event) =
Meeting.create(
organizerId,
participantIds,
startAt,
endAt,
title,
detail
)
meetingRepository.save(meeting) // ①
val users = userRepsitory.getByIds(organizerId :: participantIds)
publisher.publish(event) // ②
reserve
メソッド全体にDBトランザクションが張られているとします。
つまり、reserve
メソッドのどこかの処理で異常終了した場合はロールバックします。
この問題を回避するためにはどうしたらいいでしょうか。
ミーティング作成と、作成後の通知が同じトランザクション内で処理してしまっていることが問題と言えます。
なのでトランザクションを切り離したいです。
つまり、通知処理はあくまでミーティング作成のコミットが成功してから走るようにしたいです。
また、後続のトランザクションが失敗した場合、後続のみ再実行できるようにしたいので、
同じアプリケーションサービス内で複数トランザクションを実行するのではなく、
reserve
メソッド内ではあくまでミーティング作成のトランザクションのみに責任を持ち、
通知は別プロセスで(何らかの方法で)行うようにします。
通知は別プロセスで行うとして、ミーティングが作成されたという事実を知る必要があります。
ドメインイベントを生成するようにしたので、それ自体をミーティングオブジェクトと同じDBに保存してしまいましょう。
そうすれば同じDBトランザクションで整合性を担保できます。
ミーティングオブジェクトをDBに保存することを決めたので、テーブル設計を考えます。
各イベントごとにテーブルを用意してもいいのですが、コストがかかるためすべてのイベントを同じテーブルに保存する方向で考えます。
以下のようなカラム構成でどうでしょうか。
id
(string): イベントID(UUID)type
(string): イベント種別(例:"MeetingCreated", "MeetingDeleted")occuredOn
(datetime): イベント発生日時details
(json): イベント詳細(例:{"meetingId": 1, "organizerId": 1, ...})processed_at
(datetime): イベント処理完了日時created_at
(timestamp): 作成日時updated_at
(timestamp): 更新日時イベントを一意に指定するIDがあったほうが何かと便利だと思うので、id
を付与しました。
イベント種別はtype
で識別します。
またイベントごとに異なるカラムについてはdetails
で吸収できるようにしました。
別プロセスでイベントを処理する際にすでに処理済みなのかを知る必要があるため、processed_at
を付与しました。
次に実装です。
イベント作成と通知が別プロセスということになったので、まずはイベント作成の方の実装からしてみます。
@@ -100,6 +100,10 @@ trait MeetingRepository:
trait DomainEvent:
val occuredOn = LocalDateTime.now()
+trait DomainEventRepository:
+ def save[T <: DomainEvent](event: T): Unit =
+ println(event)
+
class MeetingCreated(
val meetingId: MeetingId,
val organizerId: UserId,
@@ -171,13 +175,9 @@ class EmailReserveService(
class MeetingService(
meetingRepository: MeetingRepository,
- userRepsitory: UserRepository
+ userRepsitory: UserRepository,
+ eventRepository: DomainEventRepository
):
- private val publisher = DomainEventPublisher(
- EmailSendService(userRepsitory, EmailSender()),
- EmailReserveService(userRepsitory, EmailReserver())
- )
-
def reserve(
organizerId: UserId,
participantIds: List[UserId],
@@ -196,8 +196,7 @@ class MeetingService(
detail
)
meetingRepository.save(meeting)
- publisher.publish(event)
+ eventRepository.save(event)
def reserveWithFollowed(
organizerId: UserId,
@@ -218,13 +217,10 @@ class MeetingService(
detail
)
meetingRepository.save(meeting)
- publisher.publish(event)
+ eventRepository.save(event)
def remove(meetingId: MeetingId): Unit =
val meeting = meetingRepository.get(meetingId)
val event = meeting.delete
meetingRepository.delete(meetingId, event)
- publisher.publish(event)
+ eventRepository.save(event)
publisher.publish
の代わりにeventRepository.save
を呼ぶようになっただけです。簡単ですね。
では続いて、通知処理です。
@@ -99,11 +99,28 @@ trait MeetingRepository:
trait DomainEvent:
val occuredOn = LocalDateTime.now()
+ var processedAt: Option[LocalDateTime] = None
+
+ def processCompleted(): Unit =
+ processedAt = Some(LocalDateTime.now())
trait DomainEventRepository:
def save[T <: DomainEvent](event: T): Unit =
println(event)
+ def getUnprocessed: List[DomainEvent] =
+ List(
+ MeetingCreated(
+ MeetingId(1),
+ UserId(1),
+ List(UserId(2), UserId(3)),
+ title = "ミーティング1",
+ detail = "全員集合",
+ startAt = LocalDateTime.now(),
+ endAt = LocalDateTime.now()
+ )
+ )
+
class MeetingCreated(
val meetingId: MeetingId,
val organizerId: UserId,
@@ -173,6 +190,24 @@ class EmailReserveService(
reserver.reserve(u.email, e.title, e.detail, e.endAt)
})
+class EventProcessService(
+ eventRepository: DomainEventRepository,
+ userRepsitory: UserRepository
+):
+ val publisher = DomainEventPublisher(
+ EmailSendService(userRepsitory, EmailSender()),
+ EmailReserveService(userRepsitory, EmailReserver())
+ )
+
+ def process(): Unit =
+ val events = eventRepository.getUnprocessed
+ events.foreach(processEvent)
+
+ private def processEvent(event: DomainEvent): Unit =
+ publisher.publish(event)
+ event.processCompleted()
+ eventRepository.save(event)
+
class MeetingService(
meetingRepository: MeetingRepository,
userRepsitory: UserRepository,
未処理のイベントリストを取得し、それぞれのイベントについてpublish
し、完了時間を記録してからDBに保存します。
各イベント発生時の処理と別プロセスで行うという大幅な仕様変更ですが、実装的にはそれほど大きく変わっていません。
(別プロセスの具体的な方法については未実装ですが・・・)
ところで、今の実装でも、イベントの処理途中で落ちた場合のことを考えると、
通知などは行われますが完了時間の記録は行われないため、
同じイベントが再度発行されてしまいます。
これはDBトランザクションでない以上、どうしようもないと思います。
ただし、テーブル設計でイベントIDを付与しているため、
同じイベントが発行されてもリモートのサブスクライバ側で処理済みの場合は破棄することが可能です。
イベント発行の際の冪等性は留意が必要なテーマだと思います。
今回は、異常系について考慮する中で、イベントの保存と発行を別プロセスで行うという仕様変更を行いました。
なお、「何らかの方法で」別プロセスで行うと言いましたが、それを実現する具体的な技術論については
自分がまだ勉強不足のため、別の機会に譲ろうと思います。
あと、実はまだ一度も実行していないので、そろそろちゃんと動かしてみようと思います。
それについても次回以降に回したいと思います。
ひたすらリファクタを繰り返してきましたが、今回でかなり理想的な形に近づいたと思います。
現時点でのコード全容を貼っておきます。
// ドメイン層
class UserId(val value: Int) extends AnyVal
class Email(val value: String) extends AnyVal
class User(val id: UserId, val email: Email):
def followed: List[User] =
List(
User(UserId(1), Email("hoge1@example.com")),
User(UserId(2), Email("hoge2@example.com"))
)
trait UserRepository:
def get(id: UserId) =
User(id, Email("hoge@example.com"))
def getByIds(ids: List[UserId]) =
ids.map(User(_, Email("hoge@example.com")))
class MeetingId(val value: Int) extends AnyVal
class Meeting(
val id: MeetingId,
val organizerId: UserId,
val participantIds: List[UserId],
val startAt: LocalDateTime,
val endAt: LocalDateTime,
val title: String,
val detail: String
):
def delete: MeetingDeleted =
MeetingDeleted(id, organizerId, participantIds)
object Meeting:
def create(
organizerId: UserId,
participantIds: List[UserId],
startAt: LocalDateTime,
endAt: LocalDateTime,
title: String,
detail: String
): (Meeting, MeetingCreated) =
val id = MeetingId(1)
val meeting =
Meeting(id, organizerId, participantIds, startAt, endAt, title, detail)
val event = MeetingCreated(
id,
organizerId,
participantIds,
title,
detail,
startAt,
endAt
)
(meeting, event)
trait MeetingRepository:
def save(meeting: Meeting): Unit
def get(meetingId: MeetingId): Meeting
def delete(meetingId: MeetingId, event: MeetingDeleted): Unit
trait DomainEvent:
val occuredOn = LocalDateTime.now()
var processedAt: Option[LocalDateTime] = None
def processCompleted(): Unit =
processedAt = Some(LocalDateTime.now())
trait DomainEventRepository:
def save[T <: DomainEvent](event: T): Unit =
println(event)
def getUnprocessed: List[DomainEvent] =
List(
MeetingCreated(
MeetingId(1),
UserId(1),
List(UserId(2), UserId(3)),
title = "ミーティング1",
detail = "全員集合",
startAt = LocalDateTime.now(),
endAt = LocalDateTime.now()
)
)
class MeetingCreated(
val meetingId: MeetingId,
val organizerId: UserId,
val participantIds: List[UserId],
val title: String,
val detail: String,
val startAt: LocalDateTime,
val endAt: LocalDateTime
) extends DomainEvent
class MeetingDeleted(
val meetingId: MeetingId,
val organizerId: UserId,
val participantIds: List[UserId]
) extends DomainEvent
trait DomainEventObserver:
def respondTo[T <: DomainEvent](event: T): Unit
class DomainEventPublisher(obs: DomainEventObserver*):
def publish[T <: DomainEvent](event: T): Unit =
obs.foreach(_.respondTo(event))
// インフラストラクチャ層
class EmailSender:
def send(to: Email, subject: String, body: String): Unit =
println(s"email send to $to")
class EmailReserver:
def reserve(
to: Email,
subject: String,
body: String,
when: LocalDateTime
): Unit =
println(s"email reserved sending to $to when $when")
// アプリケーション層
class EmailSendService(
userRepsitory: UserRepository,
sender: EmailSender
) extends DomainEventObserver:
override def respondTo[T <: DomainEvent](event: T): Unit =
event match
case e: MeetingCreated =>
val users =
userRepsitory.getByIds(e.organizerId :: e.participantIds)
users.foreach(u => sender.send(u.email, e.title, e.detail))
case e: MeetingDeleted =>
val users =
userRepsitory.getByIds(e.organizerId :: e.participantIds)
users.foreach(u =>
sender.send(u.email, "ミーティング削除のお知らせ", "ミーティングが削除されました")
)
class EmailReserveService(
userRepsitory: UserRepository,
reserver: EmailReserver
) extends DomainEventObserver:
override def respondTo[T <: DomainEvent](event: T): Unit =
event match
case e: MeetingCreated =>
val users =
userRepsitory.getByIds(e.organizerId :: e.participantIds)
users.foreach(u => {
reserver.reserve(u.email, e.title, e.detail, e.startAt)
reserver.reserve(u.email, e.title, e.detail, e.endAt)
})
class EventProcessService(
eventRepository: DomainEventRepository,
userRepsitory: UserRepository
):
val publisher = DomainEventPublisher(
EmailSendService(userRepsitory, EmailSender()),
EmailReserveService(userRepsitory, EmailReserver())
)
def process(): Unit =
val events = eventRepository.getUnprocessed
events.foreach(processEvent)
private def processEvent(event: DomainEvent): Unit =
publisher.publish(event)
event.processCompleted()
eventRepository.save(event)
class MeetingService(
meetingRepository: MeetingRepository,
userRepsitory: UserRepository,
eventRepository: DomainEventRepository
):
def reserve(
organizerId: UserId,
participantIds: List[UserId],
startAt: LocalDateTime,
endAt: LocalDateTime,
title: String,
detail: String
): Unit =
val (meeting, event) =
Meeting.create(
organizerId,
participantIds,
startAt,
endAt,
title,
detail
)
meetingRepository.save(meeting)
eventRepository.save(event)
def reserveWithFollowed(
organizerId: UserId,
startAt: LocalDateTime,
endAt: LocalDateTime,
title: String,
detail: String
): Unit =
val organizer = userRepsitory.get(organizerId)
val followers = organizer.followed
val (meeting, event) =
Meeting.create(
organizerId,
followers.map(_.id),
startAt,
endAt,
title,
detail
)
meetingRepository.save(meeting)
eventRepository.save(event)
def remove(meetingId: MeetingId): Unit =
val meeting = meetingRepository.get(meetingId)
val event = meeting.delete
meetingRepository.delete(meetingId, event)
eventRepository.save(event)