Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG CRYSTAL_VERSION=latest

FROM placeos/crystal:$CRYSTAL_VERSION as build
FROM placeos/crystal:$CRYSTAL_VERSION AS build
WORKDIR /app

# Set the commit via a build arg
Expand Down
4 changes: 4 additions & 0 deletions spec/publishing/publish_metadata_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ module PlaceOS::Source
def broadcast(message : Publisher::Message)
messages << message
end

def stats : Hash(String, UInt64)
{"mock" => 0_u64}
end
end

class Dummy
Expand Down
4 changes: 4 additions & 0 deletions src/source/publishing/influx_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ module PlaceOS::Source

delegate start, stop, to: publisher

def stats : Hash(String, UInt64)
{"influx" => publisher.processed}
end

def initialize(
@influx_host : String = INFLUX_HOST || abort("INFLUX_HOST unset"),
@influx_api_key : String = INFLUX_API_KEY || abort("INFLUX_API_KEY unset"),
Expand Down
12 changes: 11 additions & 1 deletion src/source/publishing/mqtt_broker_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ module PlaceOS::Source
end
end

def stats : Hash(String, UInt64)
hash = {} of String => UInt64
read_publishers do |publishers|
publishers.values.each do |publisher|
hash["MQTT: #{publisher.broker.name}"] = publisher.processed
end
end
hash
end

def process_resource(action : Resource::Action, resource : Model::Broker) : Resource::Result
# Don't recreate the publisher if only "safe" attributes have changed
case action
Expand Down Expand Up @@ -131,7 +141,7 @@ module PlaceOS::Source
end
end

# Safe to update iff fields in SAFE_ATTRIBUTES changed
# Safe to update if fields in SAFE_ATTRIBUTES changed
#
def self.safe_update?(model : Model::Broker)
# Take the union of the changed fields and the safe fields
Expand Down
2 changes: 1 addition & 1 deletion src/source/publishing/mqtt_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module PlaceOS::Source
Event.new(value, timestamp).to_json
end

private getter broker : PlaceOS::Model::Broker
getter broker : PlaceOS::Model::Broker
private getter broker_lock : RWLock = RWLock.new
protected getter client : ::MQTT::V3::Client

Expand Down
18 changes: 9 additions & 9 deletions src/source/publishing/publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ module PlaceOS::Source
timestamp : Time
)

getter message_queue : Channel(Message) = Channel(Message).new
getter message_queue : Channel(Message) = Channel(Message).new(StatusEvents::BATCH_SIZE)
getter processed : UInt64 = 0_u64

abstract def publish(message : Message)

def start
consume_messages
spawn { consume_messages }
end

def stop
message_queue.close
end

private def consume_messages
spawn do
while message = message_queue.receive?
begin
publish(message)
rescue error
Log.warn(exception: error) { "publishing message: #{message}" }
end
while message = message_queue.receive?
begin
publish(message)
@processed += 1_u64
rescue error
Log.warn(exception: error) { "publishing message: #{message}" }
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions src/source/publishing/publisher_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ module PlaceOS::Source
abstract def broadcast(message : Publisher::Message)
abstract def start
abstract def stop

abstract def stats : Hash(String, UInt64)
end
end
102 changes: 66 additions & 36 deletions src/source/status_events.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ module PlaceOS::Source
Log = ::Log.for(self)

STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
MAX_CONTAINER_SIZE = 50_000
MAX_CONTAINER_SIZE = 40_000
BATCH_SIZE = 100
PROCESSING_INTERVAL = 100.milliseconds
PROCESSING_INTERVAL = 40.milliseconds
CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8

private getter! redis : Redis
Expand Down Expand Up @@ -66,30 +66,51 @@ module PlaceOS::Source
redis.close
end

def paginate_modules(&)
batch_size = 64
last_created_at = Time.unix(0)
last_id = ""

loop do
modules = PlaceOS::Model::Module
.where("created_at > ? OR (created_at = ? AND id > ?)", last_created_at, last_created_at, last_id)
.order(created_at: :asc, id: :asc)
.limit(batch_size)
.to_a

# process
break if modules.empty?
modules.each do |mod|
yield mod
end
break if modules.size < batch_size

last_created_at = modules.last.created_at
last_id = modules.last.id
end
end

def update_values
mods_mapped = 0_u64
status_updated = 0_u64
pattern = "initial_sync"
PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end
paginate_modules do |mod|
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
# Backpressure if event container is growing too fast
if event_container.size > MAX_CONTAINER_SIZE // 2
until event_container.size < MAX_CONTAINER_SIZE // 4
sleep 10.milliseconds
end
rescue error
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
end
rescue error
Log.warn(exception: error) { "error syncing #{mod.try(&.id)}" }
end
Log.info { {
message: "initial status sync complete",
Expand All @@ -109,26 +130,23 @@ module PlaceOS::Source
status_updated = 0_u64
pattern = "broker_resync"

PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end
paginate_modules do |mod|
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE // 2
until event_container.size < MAX_CONTAINER_SIZE // 4
sleep 10.milliseconds
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end

Log.info { {
Expand Down Expand Up @@ -169,6 +187,18 @@ module PlaceOS::Source
else
process_batch(batch)
Fiber.yield

# This outputs how many writes have occured for each publisher
# stats = String.build do |io|
# io << "\n\n\nNEXT BATCH:\n"
# publisher_managers.each do |manager|
# manager.stats.each do |name, count|
# io << " * #{name} => #{count}"
# end
# end
# io << "\n\n"
# end
# puts stats
end
rescue error
Log.error(exception: error) { "error processing events" }
Expand Down
Loading