Add SQSServiceSensor for non-polling SQS sensor#91
Add SQSServiceSensor for non-polling SQS sensor#91grepory wants to merge 1 commit intoStackStorm-Exchange:masterfrom
Conversation
This adds a SQS Sensor with its own polling loop so that we can consume messages from one or more SQS queues as quickly as possible without relying on StackStorm to trigger a poll interval.
0651675 to
50d0d31
Compare
| # setting SQS ServiceResource object from the parameter of datastore or configuration file | ||
| self._may_setup_sqs() | ||
|
|
||
| while True: |
There was a problem hiding this comment.
I assume there is no yielding needed in this function (aka eventlet.sleep(0.01) at the end or similar) because _receive_messages performs a network operation which already needs to yield at some point even if there are no messages to be retrieved.
Otherwise if that's not the case and _receive_messages could immediately return this could cause CPU spikes and 100% CPU utilization by the sensor process since there is no yielding.
There was a problem hiding this comment.
boto3 receive_messages accepts a WaitTimeSeconds argument, which _receive_messages defaults to 2 seconds. That's what's keeping the loop from spinning too fast.
| --- | ||
| class_name: "AWSSQSServiceSensor" | ||
| entry_point: "sqs_service_sensor.py" | ||
| description: "Service Sensor which monitors a SQS queue for new messages" |
There was a problem hiding this comment.
Would also be good to document here in the description and also in README how this sensor differentiates from other one :)
| payload = {"queue": queue, "body": json.loads(msg.body)} | ||
| self._sensor_service.dispatch(trigger="aws.sqs_new_message", | ||
| payload=payload) | ||
| msg.delete() |
There was a problem hiding this comment.
Can this method throw?
If so, it probably wouldn't be a bad idea to wrap it in try / catch to avoid a scenario where the same message would always throw for some reason which would prevent sensor from continuing the processing since it would always crash on exception...
There was a problem hiding this comment.
If this throws an exception, it's not because SQS couldn't delete the message due to some unrecoverable error. It will be because of a misconfiguration on the client side or something like that. I don't think it is necessary to try/catch this.
| self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName) | ||
| return self.sqs_res.create_queue(QueueName=queueName) | ||
| elif e.response['Error']['Code'] == 'InvalidClientTokenId': | ||
| self._logger.warning("Cloudn't operate sqs because of invalid credential config") |
There was a problem hiding this comment.
Should we also throw (and abort sensor processing) on this error?
There was a problem hiding this comment.
I don't know why this was logged and not raised. It shouldn't be possible to get here if you have invalid credentials. We tested that scenario, and it raises an exception. I can't recall if it did so when getting the resource or creating the session.
|
@grepory Hey - I want to get this merged, so I am reviewing this PR and I have a few questions.
|
|
My desire to work on this is non-existent at this point. Our internal fork
is working fine for us, and I’ve moved on to another project.
On Fri, Feb 7, 2020 at 7:02 PM JP Bourget ***@***.***> wrote:
@grepory <https://github.com/grepory> Hey - I want to get this merged, so
I am reviewing this PR and I have a few questions.
1. What happens if someone adds a large amount of queues? Say 30? 100?
500?
2. Any system requirements recommendations? I can see this sensor
getting people into trouble without care consideration before turning it on.
3. What if you named the sensor SQSContinuousSensor or something like
that to better differentiate?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#91?email_source=notifications&email_token=AACXAXXSDKJ2HEX6SVDJQ3DRBYOCVA5CNFSM4J4ULUSKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELFH4GY#issuecomment-583695899>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACXAXRNGMVDTC23NVDQ5BLRBYOCVANCNFSM4J4ULUSA>
.
--
greg poirier
Old dancers never die, they just leap from barre to barre.
|
|
Greg Poirier seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
|
Based on @grepory last comment shouldnt this be closed? Can we implement |
|
I am working on a usecase that needs the SQS sensor to churn at a faster rate than 120 messages/minute, which is the limitation of current in-tree sensor. Considering that I would be interested in moving this PR forward. Also, the non-polling sensor could potentially benefit from an async mechanism to grab the sqs message. Would exploring the following https://aiobotocore.readthedocs.io/en/latest/examples/sqs/producer_consumer.html# for non polling sqs sensor help address the open items on the review esp regarding timeout interval? cc @Kami ^ since you were helping with the review |
This adds a SQS Sensor with its own polling loop so that we can
consume messages from one or more SQS queues as quickly as possible
without relying on StackStorm to trigger a poll interval.
Closes #90 cc @Kami
The SQSServiceSensor class name is meh, but I lack in creativity. This is what we're using right now to process anywhere between 30-100 messages per second from a single queue.