other
(in-package "ACL2")
st-futuremacro
(defmacro st-future (x) `(let ((st-future (make-st-future))) (setf (st-future-closure st-future) (lambda nil ,X)) (setf (st-future-valid st-future) nil) st-future))
st-future-readfunction
(defun st-future-read (st-future) (assert (st-future-p st-future)) (if (st-future-valid st-future) (values-list (st-future-value st-future)) (progn (setf (st-future-value st-future) (multiple-value-list (funcall (st-future-closure st-future)))) (setf (st-future-valid st-future) t) (values-list (st-future-value st-future)))))
st-future-abortfunction
(defun st-future-abort (st-future) (assert (st-future-p st-future)) (setf (st-future-aborted st-future) t) (setf (st-future-closure st-future) nil) st-future)
other
(push :skip-resource-availability-test *features*)
other
(defstruct atomic-notification (value nil))
other
(defstruct barrier (value nil) (lock (make-lock)) (wait-count 0) (sem (make-semaphore)))
broadcast-barrierfunction
(defun broadcast-barrier (barrier) (without-interrupts (setf (barrier-value barrier) t) (with-lock (barrier-lock barrier) (let ((count (barrier-wait-count barrier))) (loop for i from 0 to count do (signal-semaphore (barrier-sem barrier)) (decf (barrier-wait-count barrier)))))))
wait-on-barrierfunction
(defun wait-on-barrier (barrier) (if (barrier-value barrier) t (progn (with-lock (barrier-lock barrier) (incf (barrier-wait-count barrier))) (when (not (barrier-value barrier)) (wait-on-semaphore (barrier-sem barrier))))))
other
(defstruct mt-future (index nil) (value nil) (valid (make-barrier)) (closure nil) (aborted nil) (thrown-tag nil))
other
(define-atomically-modifiable-counter *last-slot-saved* 0)
other
(define-atomically-modifiable-counter *last-slot-taken* 0)
other
(defvar *future-array*)
other
(defvar *thread-array*)
other
(defvar *future-dependencies*)
other
(defparameter *future-queue-length-history* nil)
other
(defvar *current-thread-index* 0)
other
(defconstant *starting-core* 'start)
other
(defconstant *resumptive-core* 'resumptive)
other
(defvar *allocated-core* *resumptive-core*)
other
(defvar *decremented-idle-future-thread-count* nil)
other
(defvar *idle-future-core-count* (make-atomically-modifiable-counter *core-count*))
other
(defvar *idle-future-resumptive-core-count* (make-atomically-modifiable-counter (1- *core-count*)))
other
(defvar *idle-core* (make-semaphore))
other
(define-atomically-modifiable-counter *idle-future-thread-count*
0)
other
(defvar *future-added* (make-semaphore))
other
(defvar *idle-resumptive-core* (make-semaphore))
other
(defvar *threads-spawned* 0)
other
(define-atomically-modifiable-counter *unassigned-and-active-future-count*
1)
other
(define-atomically-modifiable-counter *total-future-count*
0)
other
(defconstant *future-array-size* 200000)
farefmacro
(defmacro faref (array subscript) `(aref ,ARRAY (if (equal 0 ,SUBSCRIPT) 0 (1+ (mod ,SUBSCRIPT (1- *future-array-size*))))))
other
(defvar *resource-and-timing-based-parallelizations* 0 "Tracks the number of times that we parallelize execution when waterfall-parallelism is set to :resource-and-timing-based")
other
(defvar *resource-and-timing-based-serializations* 0 "Tracks the number of times that we do not parallelize execution when waterfall-parallelism is set to :resource-and-timing-based")
other
(defvar *resource-based-parallelizations* 0 "Tracks the number of times that we parallelize execution when waterfall-parallelism is set to :resource-based")
other
(defvar *resource-based-serializations* 0 "Tracks the number of times that we do not parallelize execution when waterfall-parallelism is set to :resource-based")
reset-future-queue-length-historyfunction
(defun reset-future-queue-length-history nil (setf *future-queue-length-history* nil))
reset-future-parallelism-variablesfunction
(defun reset-future-parallelism-variables nil (setf *thread-array* (make-array *future-array-size* :initial-element nil)) (setf *future-array* (make-array *future-array-size* :initial-element nil)) (setf *future-dependencies* (make-array *future-array-size* :initial-element nil)) (setf *future-added* (make-semaphore)) (setf *idle-future-core-count* (make-atomically-modifiable-counter *core-count*)) (setf *idle-future-resumptive-core-count* (make-atomically-modifiable-counter (1- *core-count*))) (setf *idle-core* (make-semaphore)) (setf *idle-resumptive-core* (make-semaphore)) (dotimes (i *core-count*) (signal-semaphore *idle-core*)) (dotimes (i (1- *core-count*)) (signal-semaphore *idle-resumptive-core*)) (setf *last-slot-taken* (make-atomically-modifiable-counter 0)) (setf *last-slot-saved* (make-atomically-modifiable-counter 0)) (setf *threads-spawned* 0) (setf *total-future-count* (make-atomically-modifiable-counter 0)) (setf *unassigned-and-active-future-count* (make-atomically-modifiable-counter 1)) (setf *idle-future-thread-count* (make-atomically-modifiable-counter 0)) (setf *resource-and-timing-based-parallelizations* 0) (setf *resource-and-timing-based-serializations* 0) (setf *resource-based-parallelizations* 0) (setf *resource-based-serializations* 0) (reset-future-queue-length-history) t)
reset-all-parallelism-variablesfunction
(defun reset-all-parallelism-variables nil (format t "Resetting parallelism and futures variables. This may take a ~ few seconds (often either~% 0 or 15).~%") (reset-parallelism-variables) (reset-future-parallelism-variables) (format t "Done resetting parallelism and futures variables.~%"))
futures-parallelism-buffer-has-space-availablefunction
(defun futures-parallelism-buffer-has-space-available nil (< (atomically-modifiable-counter-read *unassigned-and-active-future-count*) *unassigned-and-active-work-count-limit*))
not-too-many-futures-already-in-existencefunction
(defun not-too-many-futures-already-in-existence nil (let ((total-parallelism-work-limit (f-get-global 'total-parallelism-work-limit *the-live-state*))) (cond ((equal total-parallelism-work-limit :none) t) ((< (atomically-modifiable-counter-read *total-future-count*) total-parallelism-work-limit) t) (t (let ((total-parallelism-work-limit-error (f-get-global 'total-parallelism-work-limit-error *the-live-state*))) (cond ((equal total-parallelism-work-limit-error t) (er hard 'not-too-many-futures-already-in-existence "The system has encountered the limit placed upon the ~ total amount of parallelism work allowed. Either ~ the limit must be increased, or this error must be ~ disabled. See :DOC set-total-parallelism-work-limit ~ and :DOC set-total-parallelism-work-limit-error for ~ more information.")) ((null total-parallelism-work-limit-error) nil) (t (er hard 'not-too-many-futures-already-in-existence "The value for global variable ~ total-parallelism-work-limit-error must be one of ~ t or nil. Please change the value of this global ~ variable to either of those values."))))))))
futures-resources-availablefunction
(defun futures-resources-available nil (and (f-get-global 'parallel-execution-enabled *the-live-state*) (futures-parallelism-buffer-has-space-available) (not-too-many-futures-already-in-existence)))
other
(define-atomically-modifiable-counter *threads-waiting-for-starting-core*
0)
claim-starting-corefunction
(defun claim-starting-core nil (atomic-incf *threads-waiting-for-starting-core*) (let ((notification (make-semaphore-notification))) (unwind-protect-disable-interrupts-during-cleanup (wait-on-semaphore *idle-core* :notification notification) (progn (when (semaphore-notification-status notification) (setf *allocated-core* *starting-core*) (atomic-decf *idle-future-core-count*) (setf *decremented-idle-future-thread-count* t) (atomic-decf *idle-future-thread-count*)) (atomic-decf *threads-waiting-for-starting-core*)))))
claim-resumptive-corefunction
(defun claim-resumptive-core nil (let ((notification (make-semaphore-notification))) (unwind-protect-disable-interrupts-during-cleanup (wait-on-semaphore *idle-resumptive-core* :notification notification) (when (semaphore-notification-status notification) (setf *allocated-core* *resumptive-core*) (atomic-decf *idle-future-resumptive-core-count*)))))
free-allocated-corefunction
(defun free-allocated-core nil (without-interrupts (cond ((eq *allocated-core* *starting-core*) (atomic-incf *idle-future-core-count*) (signal-semaphore *idle-core*) (setf *allocated-core* nil)) ((eq *allocated-core* *resumptive-core*) (atomic-incf *idle-future-resumptive-core-count*) (signal-semaphore *idle-resumptive-core*) (setf *allocated-core* nil)) (t nil)) t))
early-terminate-childrenfunction
(defun early-terminate-children (index) (abort-future-indices (faref *future-dependencies* index)) (setf (faref *future-dependencies* index) nil))
other
(defvar *aborted-futures-via-flag* 0)
other
(defvar *aborted-futures-total* 0)
other
(defvar *futures-resources-available-count* 0)
other
(defvar *futures-resources-unavailable-count* 0)
set-thread-check-for-abort-and-funcallfunction
(defun set-thread-check-for-abort-and-funcall (future) (let* ((index (mt-future-index future)) (closure (mt-future-closure future)) (*allocated-core* nil) (*current-thread-index* index) (*decremented-idle-future-thread-count* nil) (early-terminated t)) (unwind-protect-disable-interrupts-during-cleanup (progn (claim-starting-core) (setf (faref *thread-array* index) (current-thread)) (if (mt-future-aborted future) (incf *aborted-futures-via-flag*) (progn (setf (mt-future-value future) (multiple-value-list (funcall closure))) (setq early-terminated nil) (broadcast-barrier (mt-future-valid future))))) (progn (setf (faref *thread-array* index) nil) (when early-terminated (early-terminate-children index)) (setf (faref *future-dependencies* index) nil) (when *decremented-idle-future-thread-count* (atomic-incf *idle-future-thread-count*)) (free-allocated-core) (setf (faref *future-array* index) nil)))))
other
(defvar *throwable-future-worker-thread* nil)
wait-for-a-closurefunction
(defun wait-for-a-closure nil (loop while (>= (atomically-modifiable-counter-read *last-slot-taken*) (atomically-modifiable-counter-read *last-slot-saved*)) do (let ((random-amount-of-time (+ 10 (random 110.0)))) (when (not (wait-on-semaphore *future-added* :timeout random-amount-of-time)) (throw :worker-thread-no-longer-needed nil)))))
other
(defvar *busy-wait-var* 0)
other
(defvar *current-waiting-thread* nil)
other
(defvar *fresh-waiting-threads* 0)
make-tclet-thrown-symbol1function
(defun make-tclet-thrown-symbol1 (tags first-tag) (if (endp tags) "" (concatenate 'string (if first-tag "" "-OR-") (symbol-name (car tags)) "-THROWN" (make-tclet-thrown-symbol1 (cdr tags) nil))))
make-tclet-thrown-symbolfunction
(defun make-tclet-thrown-symbol (tags) (intern (make-tclet-thrown-symbol1 tags t) "ACL2"))
make-tclet-bindings1function
(defun make-tclet-bindings1 (tags) (if (endp tags) nil (cons (list (make-tclet-thrown-symbol (reverse tags)) t) (make-tclet-bindings1 (cdr tags)))))
make-tclet-bindingsfunction
(defun make-tclet-bindings (tags) (reverse (make-tclet-bindings1 (reverse tags))))
make-tclet-thrown-tags1function
(defun make-tclet-thrown-tags1 (tags) (if (endp tags) nil (cons (make-tclet-thrown-symbol (reverse tags)) (make-tclet-thrown-tags1 (cdr tags)))))
make-tclet-thrown-tagsfunction
(defun make-tclet-thrown-tags (tags) (reverse (make-tclet-thrown-tags1 (reverse tags))))
make-tclet-catchesfunction
(defun make-tclet-catches (rtags body thrown-tag-bindings) (if (endp rtags) body (list 'catch (list 'quote (car rtags)) (list 'prog1 (make-tclet-catches (cdr rtags) body (cdr thrown-tag-bindings)) `(setq ,(CAR THROWN-TAG-BINDINGS) nil)))))
make-tclet-cleanupsfunction
(defun make-tclet-cleanups (thrown-tags cleanups) (if (endp thrown-tags) '((t nil)) (cons (list (car thrown-tags) (car cleanups)) (make-tclet-cleanups (cdr thrown-tags) (cdr cleanups)))))
throw-catch-letmacro
(defmacro throw-catch-let (tags body cleanups) (let* ((thrown-tags (make-tclet-thrown-tags tags))) `(let ,(MAKE-TCLET-BINDINGS TAGS) (let ((tclet-result ,(MAKE-TCLET-CATCHES TAGS BODY THROWN-TAGS))) (prog2 (cond ,@(MAKE-TCLET-CLEANUPS THROWN-TAGS CLEANUPS)) tclet-result)))))
eval-a-closurefunction
(defun eval-a-closure nil (let* ((index (atomic-incf *last-slot-taken*)) (*current-thread-index* index) (thrown-tag nil) (thrown-val nil) (future nil)) (loop while (not (faref *future-array* index)) do (incf *busy-wait-var*) (when (not (equal (current-thread) *current-waiting-thread*)) (setf *current-waiting-thread* (current-thread)) (incf *fresh-waiting-threads*))) (throw-catch-let (raw-ev-fncall local-top-level time-limit5-tag step-limit-tag) (catch :result-no-longer-needed (let ((*throwable-future-worker-thread* t)) (progn (setq future (faref *future-array* index)) (set-thread-check-for-abort-and-funcall future)))) ((progn (setf thrown-tag 'raw-ev-fncall) (setf thrown-val tclet-result)) (progn (setf thrown-tag 'local-top-level) (setf thrown-val tclet-result)) (progn (setf thrown-tag 'time-limit5-tag) (setf thrown-val tclet-result)) (progn (setf thrown-tag 'step-limit-tag) (setf thrown-val tclet-result)))) (atomic-decf *unassigned-and-active-future-count*) (atomic-decf *total-future-count*) (when thrown-tag (setf (mt-future-thrown-tag future) (cons thrown-tag thrown-val)) (broadcast-barrier (mt-future-valid future)))))
eval-closuresfunction
(defun eval-closures nil (catch :worker-thread-no-longer-needed (let ((*throwable-worker-thread* t) (*default-hs* nil)) (declare (special *default-hs*)) (loop (wait-for-a-closure) (eval-a-closure)))) (atomic-decf *idle-future-thread-count*))
number-of-idle-threads-and-threads-waiting-for-a-starting-corefunction
(defun number-of-idle-threads-and-threads-waiting-for-a-starting-core nil (+ (atomically-modifiable-counter-read *idle-future-thread-count*) (atomically-modifiable-counter-read *threads-waiting-for-starting-core*)))
spawn-closure-consumersfunction
(defun spawn-closure-consumers nil (without-interrupts (loop while (< (number-of-idle-threads-and-threads-waiting-for-a-starting-core) *max-idle-thread-count*) do (progn (atomic-incf *idle-future-thread-count*) (incf *threads-spawned*) (run-thread "Worker thread" 'eval-closures)))))
make-future-with-closurefunction
(defun make-future-with-closure (closure) (let ((future (make-mt-future)) (index (atomic-incf *last-slot-saved*))) (assert (not (faref *thread-array* index))) (assert (not (faref *future-array* index))) (assert (not (faref *future-dependencies* index))) (setf (mt-future-index future) index) (setf (faref *future-dependencies* *current-thread-index*) (cons index (faref *future-dependencies* *current-thread-index*))) (setf (mt-future-closure future) closure) future))
add-future-to-queuefunction
(defun add-future-to-queue (future) (setf (faref *future-array* (mt-future-index future)) future) (atomic-incf *total-future-count*) (atomic-incf *unassigned-and-active-future-count*) (spawn-closure-consumers) (signal-semaphore *future-added*) future)
make-closure-expr-with-acl2-bindingsfunction
(defun make-closure-expr-with-acl2-bindings (body) (let ((ld-level-sym (gensym)) (ld-level-state-sym (gensym)) (wormholep-sym (gensym)) (local-safe-mode-sym (gensym)) (local-gc-on-sym (gensym))) `(let* ((,LD-LEVEL-SYM *ld-level*) (,LD-LEVEL-STATE-SYM (assert$ (equal *ld-level* (f-get-global 'ld-level *the-live-state*)) (f-get-global 'ld-level *the-live-state*))) (acl2-unwind-protect-stack *acl2-unwind-protect-stack*) (,WORMHOLEP-SYM *wormholep*) (,LOCAL-SAFE-MODE-SYM (f-get-global 'safe-mode *the-live-state*)) (,LOCAL-GC-ON-SYM (f-get-global 'guard-checking-on *the-live-state*))) (lambda nil (let ((*ld-level* ,LD-LEVEL-SYM) (*acl2-unwind-protect-stack* acl2-unwind-protect-stack) (*wormholep* ,WORMHOLEP-SYM)) (state-free-global-let* ((ld-level ,LD-LEVEL-STATE-SYM) (safe-mode ,LOCAL-SAFE-MODE-SYM) (guard-checking-on ,LOCAL-GC-ON-SYM)) ,BODY))))))
mt-futuremacro
(defmacro mt-future (x) `(cond ((not (futures-resources-available)) (incf *futures-resources-unavailable-count*) (st-future ,X)) (t (incf *futures-resources-available-count*) (without-interrupts (let ((future (make-future-with-closure ,(MAKE-CLOSURE-EXPR-WITH-ACL2-BINDINGS X)))) (without-interrupts (add-future-to-queue future)) future)))))
mt-future-readfunction
(defun mt-future-read (future) (cond ((st-future-p future) (st-future-read future)) ((mt-future-p future) (when (not (barrier-value (mt-future-valid future))) (let ((notif nil)) (unwind-protect-disable-interrupts-during-cleanup (progn (without-interrupts (free-allocated-core) (atomic-decf *unassigned-and-active-future-count*) (setq notif t)) (wait-on-barrier (mt-future-valid future)) (claim-resumptive-core)) (when notif (atomic-incf *unassigned-and-active-future-count*))))) (when (mt-future-thrown-tag future) (throw (car (mt-future-thrown-tag future)) (cdr (mt-future-thrown-tag future)))) (values-list (mt-future-value future))) (t (error "future-read was given a non-future argument"))))
other
(defvar *aborted-futures-via-throw* 0)
other
(defvar *almost-aborted-future-count* 0)
mt-future-abortfunction
(defun mt-future-abort (future) (incf *aborted-futures-total*) (cond ((st-future-p future) (st-future-abort future)) ((mt-future-p future) (without-interrupts (let ((index (mt-future-index future))) (assert index) (setf (mt-future-aborted future) t) (let ((thread (faref *thread-array* index))) (when thread (interrupt-thread thread (lambda nil (if (equal (mt-future-index future) *current-thread-index*) (when *throwable-future-worker-thread* (incf *aborted-futures-via-throw*) (throw :result-no-longer-needed nil)) (incf *almost-aborted-future-count*))))))))) ((null future) t) (t (error "future-abort was given a non-future argument"))) future)
abort-future-indicesfunction
(defun abort-future-indices (indices) (if (endp indices) t (progn (mt-future-abort (faref *future-array* (car indices))) (abort-future-indices (cdr indices)))))
print-non-nils-in-arrayfunction
(defun print-non-nils-in-array (array n) (if (equal n (length array)) "end" (if (null (faref array n)) (print-non-nils-in-array array (1+ n)) (progn (print n) (print (faref array n)) (print-non-nils-in-array array (1+ n))))))
futures-still-in-flightfunction
(defun futures-still-in-flight nil (< 1 (atomically-modifiable-counter-read *unassigned-and-active-future-count*)))
future-readfunction
(defun future-read (x) (mt-future-read x))
future-abortfunction
(defun future-abort (x) (mt-future-abort x))
abort-futuresfunction
(defun abort-futures (futures) (cond ((endp futures) t) (t (future-abort (car futures)) (abort-futures (cdr futures)))))