From 9bfdcc627704cd74445997d55f5bd7ca1cbf6851 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 20 Sep 2022 21:16:16 +0200 Subject: [PATCH] :sparkles: Make the task retry algorithm use better backoff values --- backend/src/app/emails.clj | 2 +- backend/src/app/worker.clj | 35 ++++++++++++++++++----------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/backend/src/app/emails.clj b/backend/src/app/emails.clj index e02e762dd9..fcbb29bcef 100644 --- a/backend/src/app/emails.clj +++ b/backend/src/app/emails.clj @@ -266,7 +266,7 @@ (wrk/submit! (assoc email ::wrk/task :sendmail ::wrk/delay 0 - ::wrk/max-retries 1 + ::wrk/max-retries 4 ::wrk/priority 200 ::wrk/conn conn)))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 52dd41ab78..a221345bd4 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -270,11 +270,6 @@ (s/keys :req [::task ::conn] :opt [::delay ::queue ::priority ::max-retries])) -(def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, clock_timestamp() + ?) - returning id") - (defn- extract-props [options] (persistent! @@ -285,6 +280,11 @@ (transient {}) options))) +(def ^:private sql:insert-new-task + "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) + values (?, ?, ?, ?, ?, ?, now() + ?) + returning id") + (defn submit! [{:keys [::task ::delay ::queue ::priority ::max-retries ::conn] :or {delay 0 queue :default priority 100 max-retries 3} @@ -294,10 +294,13 @@ interval (db/interval duration) props (-> options extract-props db/tjson) id (uuid/next)] + (l/debug :action "submit task" :name (d/name task) :in duration) - (db/exec-one! conn [sql:insert-new-task id (d/name task) props (d/name queue) priority max-retries interval]) + + (db/exec-one! conn [sql:insert-new-task id (d/name task) props + (d/name queue) priority max-retries interval]) id)) ;; --- RUNNER @@ -305,22 +308,20 @@ (def ^:private sql:mark-as-retry "update task - set scheduled_at = clock_timestamp() + ?::interval, - modified_at = clock_timestamp(), + set scheduled_at = now() + ?::interval, + modified_at = now(), error = ?, status = 'retry', - retry_num = retry_num + ? + retry_num = ? where id = ?") -(def default-delay - (dt/duration {:seconds 10})) - (defn- mark-as-retry [conn {:keys [task error inc-by delay] - :or {inc-by 1 delay default-delay}}] + :or {inc-by 1 delay 1000}}] (let [explain (ex-message error) - delay (db/interval delay) - sqlv [sql:mark-as-retry delay explain inc-by (:id task)]] + nretry (+ (:retry-num task) inc-by) + delay (->> (iterate #(* % 2) delay) (take nretry) (last)) + sqlv [sql:mark-as-retry (db/interval delay) explain nretry (:id task)]] (db/exec-one! conn sqlv) nil)) @@ -430,8 +431,8 @@ (map deref) (run! (fn [res] (case (:status res) - :retry (mark-as-retry conn res) - :failed (mark-as-failed conn res) + :retry (mark-as-retry conn res) + :failed (mark-as-failed conn res) :completed (mark-as-completed conn res))))) ::handled)))))