From 0af30fefbc13fadf693cb2c469a57aa5751f1641 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Mon, 9 Feb 2026 23:38:19 +1100 Subject: [PATCH 1/3] feat(status_events): use DB pagination so we're not loading all modules in one request --- Dockerfile | 2 +- src/source/publishing/mqtt_broker_manager.cr | 2 +- src/source/status_events.cr | 28 +++++++++++++++++--- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9c8cde0..c61e124 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ ARG CRYSTAL_VERSION=latest -FROM placeos/crystal:$CRYSTAL_VERSION as build +FROM placeos/crystal:$CRYSTAL_VERSION AS build WORKDIR /app # Set the commit via a build arg diff --git a/src/source/publishing/mqtt_broker_manager.cr b/src/source/publishing/mqtt_broker_manager.cr index d4f8521..c310c39 100644 --- a/src/source/publishing/mqtt_broker_manager.cr +++ b/src/source/publishing/mqtt_broker_manager.cr @@ -131,7 +131,7 @@ module PlaceOS::Source end end - # Safe to update iff fields in SAFE_ATTRIBUTES changed + # Safe to update if fields in SAFE_ATTRIBUTES changed # def self.safe_update?(model : Model::Broker) # Take the union of the changed fields and the safe fields diff --git a/src/source/status_events.cr b/src/source/status_events.cr index 83fdaba..f1dd1eb 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -12,7 +12,7 @@ module PlaceOS::Source Log = ::Log.for(self) STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*" - MAX_CONTAINER_SIZE = 50_000 + MAX_CONTAINER_SIZE = 40_000 BATCH_SIZE = 100 PROCESSING_INTERVAL = 100.milliseconds CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8 @@ -66,11 +66,33 @@ module PlaceOS::Source redis.close end + def paginate_modules(&) + batch_size = 64 + last_created_at = Time.unix(0) + last_id = "" + + loop do + modules = PlaceOS::Model::Module + .where("created_at > ? OR (created_at = ? AND id > ?)", last_created_at, last_created_at, last_id) + .order(created_at: :asc, id: :asc) + .limit(batch_size) + .to_a + + # process + break if modules.empty? + yield modules + break if modules.size < batch_size + + last_created_at = modules.last.created_at + last_id = modules.last.id + end + end + def update_values mods_mapped = 0_u64 status_updated = 0_u64 pattern = "initial_sync" - PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules| + paginate_modules do |modules| modules.each do |mod| next unless mod mods_mapped += 1_u64 @@ -109,7 +131,7 @@ module PlaceOS::Source status_updated = 0_u64 pattern = "broker_resync" - PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules| + paginate_modules do |modules| modules.each do |mod| next unless mod mods_mapped += 1_u64 From 6f6534cfdb9f2466b541c613496886a90b3d3583 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 11 Feb 2026 09:47:47 +1100 Subject: [PATCH 2/3] feat: add stats to understand processing speeds --- src/source/publishing/influx_manager.cr | 4 + src/source/publishing/mqtt_broker_manager.cr | 10 +++ src/source/publishing/mqtt_publisher.cr | 2 +- src/source/publishing/publisher.cr | 18 ++--- src/source/publishing/publisher_manager.cr | 2 + src/source/status_events.cr | 80 +++++++++++--------- 6 files changed, 70 insertions(+), 46 deletions(-) diff --git a/src/source/publishing/influx_manager.cr b/src/source/publishing/influx_manager.cr index b2d4800..3c26f8b 100644 --- a/src/source/publishing/influx_manager.cr +++ b/src/source/publishing/influx_manager.cr @@ -16,6 +16,10 @@ module PlaceOS::Source delegate start, stop, to: publisher + def stats : Hash(String, UInt64) + {"influx" => publisher.processed} + end + def initialize( @influx_host : String = INFLUX_HOST || abort("INFLUX_HOST unset"), @influx_api_key : String = INFLUX_API_KEY || abort("INFLUX_API_KEY unset"), diff --git a/src/source/publishing/mqtt_broker_manager.cr b/src/source/publishing/mqtt_broker_manager.cr index c310c39..d06bd3d 100644 --- a/src/source/publishing/mqtt_broker_manager.cr +++ b/src/source/publishing/mqtt_broker_manager.cr @@ -26,6 +26,16 @@ module PlaceOS::Source end end + def stats : Hash(String, UInt64) + hash = {} of String => UInt64 + read_publishers do |publishers| + publishers.values.each do |publisher| + hash["MQTT: #{publisher.broker.name}"] = publisher.processed + end + end + hash + end + def process_resource(action : Resource::Action, resource : Model::Broker) : Resource::Result # Don't recreate the publisher if only "safe" attributes have changed case action diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 1ac8696..ecd514e 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -30,7 +30,7 @@ module PlaceOS::Source Event.new(value, timestamp).to_json end - private getter broker : PlaceOS::Model::Broker + getter broker : PlaceOS::Model::Broker private getter broker_lock : RWLock = RWLock.new protected getter client : ::MQTT::V3::Client diff --git a/src/source/publishing/publisher.cr b/src/source/publishing/publisher.cr index b8b871f..c31733a 100644 --- a/src/source/publishing/publisher.cr +++ b/src/source/publishing/publisher.cr @@ -11,12 +11,13 @@ module PlaceOS::Source timestamp : Time ) - getter message_queue : Channel(Message) = Channel(Message).new + getter message_queue : Channel(Message) = Channel(Message).new(StatusEvents::BATCH_SIZE) + getter processed : UInt64 = 0_u64 abstract def publish(message : Message) def start - consume_messages + spawn { consume_messages } end def stop @@ -24,13 +25,12 @@ module PlaceOS::Source end private def consume_messages - spawn do - while message = message_queue.receive? - begin - publish(message) - rescue error - Log.warn(exception: error) { "publishing message: #{message}" } - end + while message = message_queue.receive? + begin + publish(message) + @processed += 1_u64 + rescue error + Log.warn(exception: error) { "publishing message: #{message}" } end end end diff --git a/src/source/publishing/publisher_manager.cr b/src/source/publishing/publisher_manager.cr index 9083dbc..76423cf 100644 --- a/src/source/publishing/publisher_manager.cr +++ b/src/source/publishing/publisher_manager.cr @@ -5,5 +5,7 @@ module PlaceOS::Source abstract def broadcast(message : Publisher::Message) abstract def start abstract def stop + + abstract def stats : Hash(String, UInt64) end end diff --git a/src/source/status_events.cr b/src/source/status_events.cr index f1dd1eb..fe518cc 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -14,7 +14,7 @@ module PlaceOS::Source STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*" MAX_CONTAINER_SIZE = 40_000 BATCH_SIZE = 100 - PROCESSING_INTERVAL = 100.milliseconds + PROCESSING_INTERVAL = 40.milliseconds CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8 private getter! redis : Redis @@ -80,7 +80,9 @@ module PlaceOS::Source # process break if modules.empty? - yield modules + modules.each do |mod| + yield mod + end break if modules.size < batch_size last_created_at = modules.last.created_at @@ -92,26 +94,23 @@ module PlaceOS::Source mods_mapped = 0_u64 status_updated = 0_u64 pattern = "initial_sync" - paginate_modules do |modules| - modules.each do |mod| - next unless mod - mods_mapped += 1_u64 - module_id = mod.id.to_s - store = PlaceOS::Driver::RedisStorage.new(module_id) - store.each do |key, value| - status_updated += 1_u64 - add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) - end + paginate_modules do |mod| + mods_mapped += 1_u64 + module_id = mod.id.to_s + store = PlaceOS::Driver::RedisStorage.new(module_id) + store.each do |key, value| + status_updated += 1_u64 + add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) + end - # Backpressure if event container is growing too fast - if event_container.size >= MAX_CONTAINER_SIZE / 2 - until event_container.size < MAX_CONTAINER_SIZE / 4 - sleep 10.milliseconds - end + # Backpressure if event container is growing too fast + if event_container.size > MAX_CONTAINER_SIZE // 2 + until event_container.size < MAX_CONTAINER_SIZE // 4 + sleep 10.milliseconds end - rescue error - Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" } end + rescue error + Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" } end Log.info { { message: "initial status sync complete", @@ -131,26 +130,23 @@ module PlaceOS::Source status_updated = 0_u64 pattern = "broker_resync" - paginate_modules do |modules| - modules.each do |mod| - next unless mod - mods_mapped += 1_u64 - module_id = mod.id.to_s - store = PlaceOS::Driver::RedisStorage.new(module_id) - store.each do |key, value| - status_updated += 1_u64 - add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) - end + paginate_modules do |mod| + mods_mapped += 1_u64 + module_id = mod.id.to_s + store = PlaceOS::Driver::RedisStorage.new(module_id) + store.each do |key, value| + status_updated += 1_u64 + add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) + end - # Backpressure if event container is growing too fast - if event_container.size >= MAX_CONTAINER_SIZE / 2 - until event_container.size < MAX_CONTAINER_SIZE / 4 - sleep 10.milliseconds - end + # Backpressure if event container is growing too fast + if event_container.size >= MAX_CONTAINER_SIZE // 2 + until event_container.size < MAX_CONTAINER_SIZE // 4 + sleep 10.milliseconds end - rescue error - Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" } end + rescue error + Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" } end Log.info { { @@ -191,6 +187,18 @@ module PlaceOS::Source else process_batch(batch) Fiber.yield + + # This outputs how many writes have occured for each publisher + # stats = String.build do |io| + # io << "\n\n\nNEXT BATCH:\n" + # publisher_managers.each do |manager| + # manager.stats.each do |name, count| + # io << " * #{name} => #{count}" + # end + # end + # io << "\n\n" + # end + # puts stats end rescue error Log.error(exception: error) { "error processing events" } From d1cfea93fbd2a23c4e16d6a8c8c37593e8aea14b Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 11 Feb 2026 09:57:49 +1100 Subject: [PATCH 3/3] fix: specs --- spec/publishing/publish_metadata_spec.cr | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spec/publishing/publish_metadata_spec.cr b/spec/publishing/publish_metadata_spec.cr index bc9df82..972cb9c 100644 --- a/spec/publishing/publish_metadata_spec.cr +++ b/spec/publishing/publish_metadata_spec.cr @@ -17,6 +17,10 @@ module PlaceOS::Source def broadcast(message : Publisher::Message) messages << message end + + def stats : Hash(String, UInt64) + {"mock" => 0_u64} + end end class Dummy