other
(in-package "ACL2")
other
(defstruct parallelism-piece (thread-array nil) (result-array nil) (array-index -1) (semaphore-to-signal-as-last-act nil) (closure nil) (throw-siblings-when-function nil))
other
(progn (define-atomically-modifiable-counter *idle-thread-count* 0) (define-atomically-modifiable-counter *idle-core-count* 0) (define-atomically-modifiable-counter *unassigned-and-active-work-count* 0) (define-atomically-modifiable-counter *total-work-count* 0) (defvar *work-queue*) (deflock *work-queue-lock*) (defvar *check-work-and-core-availability*) (defvar *check-core-availability-for-resuming*) (defvar *total-parallelism-piece-historical-count*))
other
(defparameter *reset-parallelism-variables* nil)
other
(defparameter *reset-core-count-too* t)
reset-parallelism-variablesfunction
(defun reset-parallelism-variables nil (send-die-to-worker-threads) (when *reset-core-count-too* (setf *core-count* (core-count-raw)) (setf *unassigned-and-active-work-count-limit* (* 4 *core-count*))) (setf *idle-thread-count* (make-atomically-modifiable-counter 0)) (setf *idle-core-count* (make-atomically-modifiable-counter (1- *core-count*))) (setf *unassigned-and-active-work-count* (make-atomically-modifiable-counter 1)) (setf *total-work-count* (make-atomically-modifiable-counter 1)) (setf *work-queue* nil) (reset-lock *work-queue-lock*) (setf *check-work-and-core-availability* (make-semaphore)) (setf *check-core-availability-for-resuming* (make-semaphore)) (setf *throwable-worker-thread* nil) (setf *total-parallelism-piece-historical-count* 0) (setf *reset-parallelism-variables* nil) t)
eval-and-save-resultfunction
(defun eval-and-save-result (work) (assert work) (let ((result-array (parallelism-piece-result-array work)) (array-index (parallelism-piece-array-index work)) (closure (parallelism-piece-closure work))) (setf (aref result-array array-index) (cons t (funcall closure)))))
pop-work-and-set-threadfunction
(defun pop-work-and-set-thread nil (let ((work (with-lock *work-queue-lock* (when (consp *work-queue*) (pop *work-queue*)))) (thread (current-thread))) (when work (assert thread) (assert (parallelism-piece-thread-array work)) (setf (aref (parallelism-piece-thread-array work) (parallelism-piece-array-index work)) thread)) work))
consume-work-on-work-queue-when-therefunction
(defun consume-work-on-work-queue-when-there nil (catch :worker-thread-no-longer-needed (let ((*throwable-worker-thread* t) (*default-hs* nil)) (declare (special *default-hs*)) (loop (loop while (not (and *work-queue* (< 0 (atomically-modifiable-counter-read *idle-core-count*)))) do (when (not (wait-on-semaphore *check-work-and-core-availability* :timeout 15)) (throw :worker-thread-no-longer-needed nil))) (unwind-protect-disable-interrupts-during-cleanup (when (<= 0 (atomic-decf *idle-core-count*)) (catch :result-no-longer-needed (let ((work nil)) (unwind-protect-disable-interrupts-during-cleanup (progn (without-interrupts (setq work (pop-work-and-set-thread)) (when work (atomic-decf *idle-thread-count*))) (when work (eval-and-save-result work) (let* ((thread-array (parallelism-piece-thread-array work)) (result-array (parallelism-piece-result-array work)) (array-index (parallelism-piece-array-index work)) (throw-siblings-when-function (parallelism-piece-throw-siblings-when-function work))) (setf (aref thread-array array-index) nil) (when throw-siblings-when-function (funcall throw-siblings-when-function (aref result-array array-index)))))) (when work (let* ((semaphore-to-signal-as-last-act (parallelism-piece-semaphore-to-signal-as-last-act work)) (thread-array (parallelism-piece-thread-array work)) (array-index (parallelism-piece-array-index work))) (incf *total-parallelism-piece-historical-count*) (setf (aref thread-array array-index) nil) (atomic-incf *idle-thread-count*) (atomic-decf *total-work-count*) (atomic-decf *unassigned-and-active-work-count*) (assert (semaphorep semaphore-to-signal-as-last-act)) (signal-semaphore semaphore-to-signal-as-last-act))))))) (atomic-incf *idle-core-count*) (signal-semaphore *check-work-and-core-availability*) (signal-semaphore *check-core-availability-for-resuming*))))) (atomic-decf *idle-thread-count*))
spawn-worker-threads-if-neededfunction
(defun spawn-worker-threads-if-needed nil (loop while (< (atomically-modifiable-counter-read *idle-thread-count*) *max-idle-thread-count*) do (progn (atomic-incf *idle-thread-count*) (run-thread "Worker thread" 'consume-work-on-work-queue-when-there))))
add-work-list-to-queuefunction
(defun add-work-list-to-queue (work-list) (let ((work-list-length (length work-list))) (with-lock *work-queue-lock* (setf *work-queue* (nconc *work-queue* work-list))) (atomic-incf-multiple *total-work-count* work-list-length) (atomic-incf-multiple *unassigned-and-active-work-count* work-list-length) (dotimes (i work-list-length) (signal-semaphore *check-work-and-core-availability*))))
combine-array-results-into-listfunction
(defun combine-array-results-into-list (result-array current-position acc) (if (< current-position 0) acc (combine-array-results-into-list result-array (1- current-position) (cons (cdr (aref result-array current-position)) acc))))
remove-thread-array-from-work-queue-recfunction
(defun remove-thread-array-from-work-queue-rec (work-queue thread-array array-positions-left) (cond ((eql array-positions-left 0) work-queue) ((atom work-queue) nil) ((eq thread-array (parallelism-piece-thread-array (car work-queue))) (progn (atomic-decf *total-work-count*) (atomic-decf *unassigned-and-active-work-count*) (assert (semaphorep (parallelism-piece-semaphore-to-signal-as-last-act (car work-queue)))) (signal-semaphore (parallelism-piece-semaphore-to-signal-as-last-act (car work-queue))) (remove-thread-array-from-work-queue-rec (cdr work-queue) thread-array (1- array-positions-left)))) (t (cons (car work-queue) (remove-thread-array-from-work-queue-rec (cdr work-queue) thread-array (1- array-positions-left))))))
remove-thread-array-from-work-queuefunction
(defun remove-thread-array-from-work-queue (thread-array) (without-interrupts (with-lock *work-queue-lock* (setf *work-queue* (remove-thread-array-from-work-queue-rec *work-queue* thread-array (length thread-array))))))
terminate-siblingsfunction
(defun terminate-siblings (thread-array) (remove-thread-array-from-work-queue thread-array) (throw-threads-in-array thread-array (1- (length thread-array))))
generate-work-list-from-closure-list-recfunction
(defun generate-work-list-from-closure-list-rec (thread-array result-array children-done-semaphore closure-list current-position &optional throw-siblings-when-function) (if (atom closure-list) (assert (equal current-position (length thread-array))) (cons (make-parallelism-piece :thread-array thread-array :result-array result-array :array-index current-position :semaphore-to-signal-as-last-act children-done-semaphore :closure (car closure-list) :throw-siblings-when-function throw-siblings-when-function) (generate-work-list-from-closure-list-rec thread-array result-array children-done-semaphore (cdr closure-list) (1+ current-position) throw-siblings-when-function))))
generate-work-list-from-closure-listfunction
(defun generate-work-list-from-closure-list (closure-list &optional terminate-early-function) (let* ((closure-count (length closure-list)) (thread-array (make-array closure-count :initial-element nil)) (result-array (make-array closure-count :initial-element nil)) (children-done-semaphore (make-semaphore))) (progn (assert (semaphorep children-done-semaphore)) (mv (generate-work-list-from-closure-list-rec thread-array result-array children-done-semaphore closure-list 0 (if terminate-early-function (lambda (x) (when (funcall terminate-early-function (cdr x)) (terminate-siblings thread-array))) nil)) thread-array result-array children-done-semaphore))))
pargs-parallelism-buffer-has-space-availablefunction
(defun pargs-parallelism-buffer-has-space-available nil (< (atomically-modifiable-counter-read *unassigned-and-active-work-count*) *unassigned-and-active-work-count-limit*))
not-too-many-pieces-of-parallelism-work-already-in-existencefunction
(defun not-too-many-pieces-of-parallelism-work-already-in-existence nil (< (atomically-modifiable-counter-read *total-work-count*) (f-get-global 'total-parallelism-work-limit *the-live-state*)))
parallelism-resources-availablefunction
(defun parallelism-resources-available nil (and (f-get-global 'parallel-execution-enabled *the-live-state*) (pargs-parallelism-buffer-has-space-available) (not-too-many-pieces-of-parallelism-work-already-in-existence)))
throw-threads-in-arrayfunction
(defun throw-threads-in-array (thread-array current-position) (assert thread-array) (when (<= 0 current-position) (let ((current-thread (aref thread-array current-position))) (when current-thread (interrupt-thread current-thread (lambda nil (when (aref thread-array current-position) (throw :result-no-longer-needed nil)))))) (throw-threads-in-array thread-array (1- current-position))))
decrement-children-leftfunction
(defun decrement-children-left (children-left-ptr semaphore-notification-obj) (when (semaphore-notification-status semaphore-notification-obj) (decf (aref children-left-ptr 0)) (clear-semaphore-notification-status semaphore-notification-obj)))
wait-for-children-to-finishfunction
(defun wait-for-children-to-finish (semaphore children-left-ptr semaphore-notification-obj) (assert children-left-ptr) (when (<= 1 (aref children-left-ptr 0)) (assert (not (semaphore-notification-status semaphore-notification-obj))) (unwind-protect-disable-interrupts-during-cleanup (wait-on-semaphore semaphore :notification semaphore-notification-obj) (decrement-children-left children-left-ptr semaphore-notification-obj)) (wait-for-children-to-finish semaphore children-left-ptr semaphore-notification-obj)))
wait-for-resumptive-parallelism-resourcesfunction
(defun wait-for-resumptive-parallelism-resources nil (loop while (<= (atomically-modifiable-counter-read *idle-core-count*) (- *core-count*)) do (wait-on-semaphore *check-core-availability-for-resuming*)) (atomic-incf *unassigned-and-active-work-count*) (atomic-decf *idle-core-count*))
early-terminate-children-and-rewaitfunction
(defun early-terminate-children-and-rewait (children-done-semaphore children-left-ptr semaphore-notification-obj thread-array) (when (< 0 (aref children-left-ptr 0)) (remove-thread-array-from-work-queue thread-array) (throw-threads-in-array thread-array (1- (length thread-array))) (wait-for-children-to-finish children-done-semaphore children-left-ptr semaphore-notification-obj)))
prepare-to-wait-for-childrenfunction
(defun prepare-to-wait-for-children nil (atomic-incf *idle-core-count*) (signal-semaphore *check-work-and-core-availability*) (signal-semaphore *check-core-availability-for-resuming*) (atomic-decf *unassigned-and-active-work-count*))
parallelize-closure-listfunction
(defun parallelize-closure-list (closure-list &optional terminate-early-function) (assert (and (consp closure-list) (cdr closure-list))) (let ((work-list-setup-p nil) (semaphore-notification-obj (make-semaphore-notification)) (children-left-ptr (make-array 1 :initial-element (length closure-list)))) (mv-let (work-list thread-array result-array children-done-semaphore) (generate-work-list-from-closure-list closure-list terminate-early-function) (assert (semaphorep children-done-semaphore)) (unwind-protect-disable-interrupts-during-cleanup (progn (without-interrupts (spawn-worker-threads-if-needed) (setq work-list-setup-p (progn (add-work-list-to-queue work-list) t)) (prepare-to-wait-for-children)) (wait-for-children-to-finish children-done-semaphore children-left-ptr semaphore-notification-obj)) (progn (when work-list-setup-p (early-terminate-children-and-rewait children-done-semaphore children-left-ptr semaphore-notification-obj thread-array) (wait-for-resumptive-parallelism-resources) (assert (eq (aref children-left-ptr 0) 0))))) (combine-array-results-into-list result-array (1- (length result-array)) nil))))
parallelize-fnfunction
(defun parallelize-fn (parent-fun-name arg-closures &optional terminate-early-function) (assert (cdr arg-closures)) (let ((parallelize-closures-res (parallelize-closure-list arg-closures terminate-early-function))) (if (or (equal parent-fun-name 'and-list) (equal parent-fun-name 'or-list)) (funcall parent-fun-name parallelize-closures-res) (apply parent-fun-name parallelize-closures-res))))
closure-for-expressionmacro
(defmacro closure-for-expression (x) (make-closure-expr-with-acl2-bindings x))
closure-list-for-expression-listmacro
(defmacro closure-list-for-expression-list (x) (if (atom x) nil `(cons (closure-for-expression ,(CAR X)) (closure-list-for-expression-list ,(CDR X)))))
parallelism-conditionfunction
(defun parallelism-condition (gran-form-exists gran-form) (if gran-form-exists `(and (parallelism-resources-available) ,GRAN-FORM) '(parallelism-resources-available)))
pargsmacro
(defmacro pargs (&rest forms) (mv-let (erp msg gran-form-exists gran-form remainder-forms) (check-and-parse-for-granularity-form forms) (declare (ignore msg)) (assert (not erp)) (let ((function-call (car remainder-forms))) (if (null (cddr function-call)) function-call (list 'if (parallelism-condition gran-form-exists gran-form) (list 'parallelize-fn (list 'quote (car function-call)) (list 'closure-list-for-expression-list (cdr function-call))) function-call)))))
plet-doubletsfunction
(defun plet-doublets (bindings bsym n) (cond ((endp bindings) nil) (t (cons (list (caar bindings) (list 'nth n bsym)) (plet-doublets (cdr bindings) bsym (1+ n))))))
make-closuresfunction
(defun make-closures (bindings) (if (endp bindings) nil (cons `(function (lambda nil ,(CADAR BINDINGS))) (make-closures (cdr bindings)))))
identity-listfunction
(defun identity-list (&rest rst) rst)
make-list-until-non-declarefunction
(defun make-list-until-non-declare (remaining-list acc) (if (not (caar-is-declarep remaining-list)) (mv (reverse acc) remaining-list) (make-list-until-non-declare (cdr remaining-list) (cons (car remaining-list) acc))))
parse-additional-declare-forms-for-letfunction
(defun parse-additional-declare-forms-for-let (x) (mv-let (declare-forms body) (make-list-until-non-declare (cdr x) nil) (mv (car x) declare-forms body)))
pletmacro
(defmacro plet (&rest forms) (mv-let (erp msg gran-form-exists gran-form remainder-forms) (check-and-parse-for-granularity-form forms) (declare (ignore msg)) (assert (not erp)) (mv-let (bindings declare-forms body) (parse-additional-declare-forms-for-let remainder-forms) (cond ((null (cdr bindings)) `(let ,BINDINGS ,@DECLARE-FORMS ,@BODY)) (t (list 'if (parallelism-condition gran-form-exists gran-form) (let ((bsym (acl2-gentemp "plet"))) `(let ((,BSYM (parallelize-fn 'identity-list (list ,@(MAKE-CLOSURES BINDINGS))))) (let ,(PLET-DOUBLETS BINDINGS BSYM 0) ,@DECLARE-FORMS ,@BODY))) `(let ,BINDINGS ,@DECLARE-FORMS ,@BODY)))))))
pandmacro
(defmacro pand (&rest forms) (mv-let (erp msg gran-form-exists gran-form remainder-forms) (check-and-parse-for-granularity-form forms) (declare (ignore msg)) (assert (not erp)) (if (null (cdr remainder-forms)) (list 'if (car remainder-forms) t nil) (let ((and-early-termination-function '(lambda (x) (null x)))) (list 'if (parallelism-condition gran-form-exists gran-form) (list 'parallelize-fn ''and-list (list 'closure-list-for-expression-list remainder-forms) and-early-termination-function) (list 'if (cons 'and remainder-forms) t nil))))))
pormacro
(defmacro por (&rest forms) (mv-let (erp msg gran-form-exists gran-form remainder-forms) (check-and-parse-for-granularity-form forms) (declare (ignore msg)) (assert (not erp)) (if (null (cdr remainder-forms)) (list 'if (car remainder-forms) t nil) (let ((or-early-termination-function '(lambda (x) x))) (list 'if (parallelism-condition gran-form-exists gran-form) (list 'parallelize-fn ''or-list (list 'closure-list-for-expression-list remainder-forms) or-early-termination-function) (list 'if (cons 'or remainder-forms) t nil))))))
signal-semaphoresfunction
(defun signal-semaphores (sems) (cond ((endp sems) nil) (t (signal-semaphore (car sems)) (signal-semaphores (cdr sems)))))
spec-mv-letmacro
(defmacro spec-mv-let (&whole spec-mv-let-form outer-vars computation body) (case-match body ((inner-let inner-vars inner-body ('if test true-branch false-branch)) (cond ((not (member inner-let '(mv-let mv?-let mv-let@par) :test 'eq)) (er hard! 'spec-mv-let "Illegal form (expected inner let to bind with one of ~v0): ~x1. ~ ~ See :doc spec-mv-let." '(mv-let mv?-let mv-let@par) spec-mv-let-form)) ((or (not (symbol-listp outer-vars)) (not (symbol-listp inner-vars)) (intersectp inner-vars outer-vars :test 'eq)) (er hard! 'spec-mv-let "Illegal spec-mv-let form: ~x0. The two bound variable lists ~ must be disjoint true lists of variables, unlike ~x1 and ~x2. ~ See :doc spec-mv-let." spec-mv-let-form inner-vars outer-vars)) (t `(let ((the-very-obscure-feature (future ,COMPUTATION))) (,INNER-LET ,INNER-VARS ,INNER-BODY (cond (,TEST (mv?-let ,OUTER-VARS (future-read the-very-obscure-feature) ,TRUE-BRANCH)) (t (future-abort the-very-obscure-feature) ,FALSE-BRANCH))))))) (& (er hard! 'spec-mv-let "Illegal form, ~x0. See :doc spec-mv-let." spec-mv-let-form))))
number-of-active-threads-auxfunction
(defun number-of-active-threads-aux (threads acc) (declare (ignore threads acc)) 0)
number-of-active-threadsfunction
(defun number-of-active-threads nil (number-of-active-threads-aux (all-threads) 0))
number-of-threads-waiting-on-a-child-auxfunction
(defun number-of-threads-waiting-on-a-child-aux (threads acc) (declare (ignore threads acc)) 0)
number-of-threads-waiting-on-a-childfunction
(defun number-of-threads-waiting-on-a-child nil (number-of-threads-waiting-on-a-child-aux (all-threads) 0))
future-queue-lengthfunction
(defun future-queue-length nil (+ (- *last-slot-saved* *last-slot-taken*) *threads-waiting-for-starting-core*))
total-number-of-threadsfunction
(defun total-number-of-threads nil (length (all-threads)))
other
(defvar *refresh-rate-indicator* 0)
value-of-symbolmacro
(defmacro value-of-symbol (var) (when (not (or (fboundp var) (symbolp var))) (error "value-of-symbol requires a symbol or function name as its argument")) (cond ((constantp var) `(format nil " Constant ~s is ~s~% " ,(SYMBOL-NAME VAR) ,VAR)) ((fboundp var) `(format nil " Stat ~s is ~s~% " ,(SYMBOL-NAME VAR) (,VAR))) ((boundp-global var *the-live-state*) `(format nil " Stat ~s is ~s~% " ,(SYMBOL-NAME VAR) ,(F-GET-GLOBAL VAR *THE-LIVE-STATE*))) (t `(format nil " Variable ~s is ~s~% " ,(SYMBOL-NAME VAR) ,VAR))))
acl2p-sum-list1function
(defun acl2p-sum-list1 (lst acc) (cond ((endp lst) acc) (t (acl2p-sum-list1 (cdr lst) (+ (car lst) acc)))))
acl2p-sum-listfunction
(defun acl2p-sum-list (lst) (acl2p-sum-list1 lst 0))
average-future-queue-sizefunction
(defun average-future-queue-size nil (* 1.0 (/ (acl2p-sum-list *future-queue-length-history*) (length *future-queue-length-history*))))
print-interesting-parallelism-variables-strfunction
(defun print-interesting-parallelism-variables-str nil (incf *refresh-rate-indicator*) (setf *future-queue-length-history* (cons (future-queue-length) *future-queue-length-history*)) (concatenate 'string (format nil " Printing stats related to executing proofs in parallel.~% ") (value-of-symbol *idle-future-core-count*) (value-of-symbol *idle-future-resumptive-core-count*) (value-of-symbol *idle-future-thread-count*) (value-of-symbol *threads-waiting-for-starting-core*) (value-of-symbol number-of-idle-threads-and-threads-waiting-for-a-starting-core) (value-of-symbol total-number-of-threads) (format nil "~% ") (value-of-symbol *unassigned-and-active-future-count*) (value-of-symbol *unassigned-and-active-work-count-limit*) (value-of-symbol *total-future-count*) (value-of-symbol total-parallelism-work-limit) (format nil "~% ") (value-of-symbol number-of-active-threads) (value-of-symbol number-of-threads-waiting-on-a-child) (format nil "~% ") (value-of-symbol *last-slot-taken*) (value-of-symbol *last-slot-saved*) (value-of-symbol future-queue-length) (value-of-symbol average-future-queue-size) (format nil "~% ") (value-of-symbol *resource-based-parallelizations*) (value-of-symbol *resource-based-serializations*) (value-of-symbol *resource-and-timing-based-parallelizations*) (value-of-symbol *resource-and-timing-based-serializations*) (value-of-symbol *futures-resources-available-count*) (value-of-symbol *futures-resources-unavailable-count*) (format nil "~% ") (format nil " Printing stats related to aborting futures.~% ") (value-of-symbol *aborted-futures-total*) (value-of-symbol *aborted-futures-via-throw*) (value-of-symbol *aborted-futures-via-flag*) (value-of-symbol *almost-aborted-future-count*) (format nil "~% ") (value-of-symbol *refresh-rate-indicator*)))
print-interesting-parallelism-variablesfunction
(defun print-interesting-parallelism-variables nil (format t (print-interesting-parallelism-variables-str)))