Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.scalajs.linker.interface.ModuleSplitStyle

import scala.sys.process.*

lazy val projectVersion = "2.4.2"
lazy val projectVersion = "2.4.3"
lazy val organizationName = "ru.trett"
lazy val scala3Version = "3.7.4"
lazy val circeVersion = "0.14.15"
Expand Down Expand Up @@ -120,6 +120,7 @@ lazy val server = project
).map(_ % doobieVersion),
libraryDependencies += "org.jsoup" % "jsoup" % "1.21.2",
libraryDependencies += "com.github.blemale" %% "scaffeine" % "5.3.0",
libraryDependencies += "io.circe" %% "circe-fs2" % "0.14.1",
libraryDependencies += "org.flywaydb" % "flyway-database-postgresql" % "11.17.2" % "runtime",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.19" % Test,
libraryDependencies += "org.scalamock" %% "scalamock" % "7.5.2" % Test,
Expand Down
18 changes: 18 additions & 0 deletions client/src/main/scala/client/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ object Decoders:
}
given Decoder[SummaryResponse] = deriveDecoder

import SummaryEvent.*
given Decoder[Content] = deriveDecoder
given Decoder[Metadata] = deriveDecoder
given Decoder[FunFact] = deriveDecoder
given Decoder[Error] = deriveDecoder

given Decoder[SummaryEvent] = Decoder.instance { cursor =>
cursor.downField("type").as[String].flatMap {
case "content" => cursor.as[Content]
case "metadata" => cursor.as[Metadata]
case "funFact" => cursor.as[FunFact]
case "error" => cursor.as[Error]
case "done" => Right(Done)
case other =>
Left(io.circe.DecodingFailure(s"Unknown SummaryEvent type: $other", cursor.history))
}
}

final class Model:
val feedVar: Var[FeedItemList] = Var(List())
val channelVar: Var[ChannelList] = Var(List())
Expand Down
16 changes: 16 additions & 0 deletions client/src/main/scala/client/NetworkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import ru.trett.rss.models.UserSettings
import ru.trett.rss.models.SummaryEvent

object NetworkUtils {

Expand Down Expand Up @@ -68,4 +69,19 @@ object NetworkUtils {

def logout(): EventStream[Unit] =
FetchStream.post("/api/logout", _.body("")).mapTo(())

def streamSummary(url: String): (EventStream[Try[SummaryEvent]], () => Unit) =
val bus = new EventBus[Try[SummaryEvent]]
val source = new dom.EventSource(url)

source.onmessage = msg =>
decode[SummaryEvent](msg.data.toString) match
case Right(event) => bus.emit(Success(event))
case Left(err) => bus.emit(Failure(err))

source.onerror = _ =>
bus.emit(Failure(new RuntimeException("Stream error")))
source.close()

(bus.events, () => source.close())
}
97 changes: 57 additions & 40 deletions client/src/main/scala/client/SummaryPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import be.doeraene.webcomponents.ui5.*
import be.doeraene.webcomponents.ui5.configkeys.*
import client.NetworkUtils.*
import com.raquo.laminar.api.L.*
import ru.trett.rss.models.{SummaryResponse, SummarySuccess, SummaryError}
import ru.trett.rss.models.SummaryEvent

import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}

object SummaryPage:

import Decoders.given

private val model = AppState.model

private case class PageState(
Expand All @@ -27,52 +25,71 @@ object SummaryPage:
private val stateSignal = stateVar.signal
private val loadMoreBus: EventBus[Unit] = new EventBus

private var currentSubscription: Option[Subscription] = None
private var currentClose: Option[() => Unit] = None

private def resetState(): Unit = stateVar.set(PageState())

private def fetchSummaryBatch(): EventStream[Try[Option[SummaryResponse]]] =
FetchStream
.withDecoder(responseDecoder[SummaryResponse])
.get("/api/summarize")

private val batchObserver: Observer[Try[Option[SummaryResponse]]] = Observer {
case Success(Some(resp)) if resp.funFact.isDefined =>
stateVar.update(_.copy(isLoading = false, hasMore = false, funFact = resp.funFact))

case Success(Some(resp)) if resp.feedsProcessed > 0 =>
val (newContent, isError) = resp.result match
case SummarySuccess(html) => (html, false)
case SummaryError(message) => (message, true)
stateVar.update(s =>
s.copy(
isLoading = false,
summaries = s.summaries :+ newContent,
hasError = isError,
totalProcessed = s.totalProcessed + resp.feedsProcessed,
hasMore = resp.hasMore
)
private def cleanup(): Unit =
currentSubscription.foreach(_.kill())
currentClose.foreach(_())
currentSubscription = None
currentClose = None

private def startStreaming(offset: Int): Unit =
cleanup()

stateVar.update(s =>
s.copy(
isLoading = true,
hasError = false,
summaries = if offset > 0 then s.summaries :+ "" else s.summaries
)
Home.refreshUnreadCountBus.emit(())
)

val (stream, close) = NetworkUtils.streamSummary(s"/api/summarize?offset=$offset")
currentClose = Some(close)

currentSubscription = Some(stream.foreach {
case Success(SummaryEvent.Content(text)) =>
stateVar.update(s =>
val newSummaries =
if s.summaries.isEmpty then List(text)
else s.summaries.init :+ (s.summaries.last + text)
s.copy(summaries = newSummaries)
)

case Success(SummaryEvent.Metadata(processed, remaining, more)) =>
stateVar.update(s =>
s.copy(totalProcessed = s.totalProcessed + processed, hasMore = more)
)
Home.refreshUnreadCountBus.emit(())

case Success(SummaryEvent.FunFact(text)) =>
stateVar.update(_.copy(funFact = Some(text), isLoading = false))

case Success(SummaryEvent.Error(msg)) =>
stateVar.update(_.copy(hasError = true, isLoading = false))
client.NotifyComponent.errorMessage(new RuntimeException(msg))

case Success(_) =>
stateVar.update(_.copy(isLoading = false, hasError = true))
case Success(SummaryEvent.Done) =>
stateVar.update(_.copy(isLoading = false))
cleanup()

case Failure(err) =>
stateVar.update(_.copy(isLoading = false, hasError = true))
handleError(err)
}
case Failure(err) =>
stateVar.update(_.copy(hasError = true, isLoading = false))
cleanup()
handleError(err)
}(unsafeWindowOwner))

def render: Element =
resetState()
val initialFetch = fetchSummaryBatch()
div(
cls := "main-content",
initialFetch --> batchObserver,
onMountBind { ctx =>
loadMoreBus.events.flatMapSwitch { _ =>
stateVar.update(_.copy(isLoading = true))
fetchSummaryBatch()
} --> batchObserver
},
onMountUnmountCallback(mount = _ => startStreaming(0), unmount = _ => cleanup()),
loadMoreBus.events.map(_ => stateVar.now().totalProcessed) --> (offset =>
startStreaming(offset)
),
Card(
_.slots.header := CardHeader(
_.titleText := "AI Summary",
Expand Down
2 changes: 1 addition & 1 deletion scripts/local-docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
- host.docker.internal:host-gateway

server:
image: server:2.4.2
image: server:2.4.3
container_name: rss_server
restart: always
depends_on:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package ru.trett.rss.server.codecs
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.*
import io.circe.syntax.*
import ru.trett.rss.models.{SummaryResult, SummarySuccess, SummaryError, SummaryResponse}
import ru.trett.rss.models.{
SummaryResult,
SummarySuccess,
SummaryError,
SummaryResponse,
SummaryEvent
}

object SummaryCodecs:
given Encoder[SummarySuccess] = deriveEncoder
Expand Down Expand Up @@ -34,3 +40,17 @@ object SummaryCodecs:

given Encoder[SummaryResponse] = deriveEncoder
given Decoder[SummaryResponse] = deriveDecoder

import SummaryEvent.*
given Encoder[Content] = deriveEncoder
given Encoder[Metadata] = deriveEncoder
given Encoder[FunFact] = deriveEncoder
given Encoder[Error] = deriveEncoder

given Encoder[SummaryEvent] = Encoder.instance {
case c: Content => c.asJson.mapObject(_.add("type", "content".asJson))
case m: Metadata => m.asJson.mapObject(_.add("type", "metadata".asJson))
case f: FunFact => f.asJson.mapObject(_.add("type", "funFact".asJson))
case e: Error => e.asJson.mapObject(_.add("type", "error".asJson))
case Done => io.circe.Json.obj("type" -> "done".asJson)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package ru.trett.rss.server.controllers

import cats.effect.IO
import org.http4s.AuthedRoutes
import org.http4s.circe.CirceEntityEncoder.*
import org.http4s.ServerSentEvent
import org.http4s.dsl.io.*
import io.circe.syntax.*
import ru.trett.rss.server.models.User
import ru.trett.rss.server.services.SummarizeService
import ru.trett.rss.server.codecs.SummaryCodecs.given
Expand All @@ -15,8 +16,8 @@ object SummarizeController:
def routes(summarizeService: SummarizeService): AuthedRoutes[User, IO] =
AuthedRoutes.of[User, IO] {
case GET -> Root / "api" / "summarize" :? OffsetQueryParamMatcher(offset) as user =>
for
summary <- summarizeService.getSummary(user, offset.getOrElse(0))
response <- Ok(summary)
yield response
val stream = summarizeService
.streamSummary(user, offset.getOrElse(0))
.map(event => ServerSentEvent(data = Some(event.asJson.noSpaces)))
Ok(stream)
}
Loading