Filtering...

parallel-raw

parallel-raw
other
(in-package "ACL2")
other
(defstruct parallelism-piece
  (thread-array nil)
  (result-array nil)
  (array-index -1)
  (semaphore-to-signal-as-last-act nil)
  (closure nil)
  (throw-siblings-when-function nil))
other
(progn (define-atomically-modifiable-counter *idle-thread-count* 0)
  (define-atomically-modifiable-counter *idle-core-count* 0)
  (define-atomically-modifiable-counter *unassigned-and-active-work-count*
    0)
  (define-atomically-modifiable-counter *total-work-count* 0)
  (defvar *work-queue*)
  (deflock *work-queue-lock*)
  (defvar *check-work-and-core-availability*)
  (defvar *check-core-availability-for-resuming*)
  (defvar *total-parallelism-piece-historical-count*))
other
(defparameter *reset-parallelism-variables* nil)
other
(defparameter *reset-core-count-too* t)
reset-parallelism-variablesfunction
(defun reset-parallelism-variables
  nil
  (send-die-to-worker-threads)
  (when *reset-core-count-too*
    (setf *core-count* (core-count-raw))
    (setf *unassigned-and-active-work-count-limit*
      (* 4 *core-count*)))
  (setf *idle-thread-count*
    (make-atomically-modifiable-counter 0))
  (setf *idle-core-count*
    (make-atomically-modifiable-counter (1- *core-count*)))
  (setf *unassigned-and-active-work-count*
    (make-atomically-modifiable-counter 1))
  (setf *total-work-count*
    (make-atomically-modifiable-counter 1))
  (setf *work-queue* nil)
  (reset-lock *work-queue-lock*)
  (setf *check-work-and-core-availability* (make-semaphore))
  (setf *check-core-availability-for-resuming*
    (make-semaphore))
  (setf *throwable-worker-thread* nil)
  (setf *total-parallelism-piece-historical-count* 0)
  (setf *reset-parallelism-variables* nil)
  t)
eval-and-save-resultfunction
(defun eval-and-save-result
  (work)
  (assert work)
  (let ((result-array (parallelism-piece-result-array work)) (array-index (parallelism-piece-array-index work))
      (closure (parallelism-piece-closure work)))
    (setf (aref result-array array-index)
      (cons t (funcall closure)))))
pop-work-and-set-threadfunction
(defun pop-work-and-set-thread
  nil
  (let ((work (with-lock *work-queue-lock*
         (when (consp *work-queue*) (pop *work-queue*)))) (thread (current-thread)))
    (when work
      (assert thread)
      (assert (parallelism-piece-thread-array work))
      (setf (aref (parallelism-piece-thread-array work)
          (parallelism-piece-array-index work))
        thread))
    work))
consume-work-on-work-queue-when-therefunction
(defun consume-work-on-work-queue-when-there
  nil
  (catch :worker-thread-no-longer-needed (let ((*throwable-worker-thread* t) (*default-hs* nil))
      (declare (special *default-hs*))
      (loop (loop while
          (not (and *work-queue*
              (< 0 (atomically-modifiable-counter-read *idle-core-count*))))
          do
          (when (not (wait-on-semaphore *check-work-and-core-availability*
                :timeout 15))
            (throw :worker-thread-no-longer-needed nil)))
        (unwind-protect-disable-interrupts-during-cleanup (when (<= 0 (atomic-decf *idle-core-count*))
            (catch :result-no-longer-needed (let ((work nil))
                (unwind-protect-disable-interrupts-during-cleanup (progn (without-interrupts (setq work (pop-work-and-set-thread))
                      (when work (atomic-decf *idle-thread-count*)))
                    (when work
                      (eval-and-save-result work)
                      (let* ((thread-array (parallelism-piece-thread-array work)) (result-array (parallelism-piece-result-array work))
                          (array-index (parallelism-piece-array-index work))
                          (throw-siblings-when-function (parallelism-piece-throw-siblings-when-function work)))
                        (setf (aref thread-array array-index) nil)
                        (when throw-siblings-when-function
                          (funcall throw-siblings-when-function
                            (aref result-array array-index))))))
                  (when work
                    (let* ((semaphore-to-signal-as-last-act (parallelism-piece-semaphore-to-signal-as-last-act work)) (thread-array (parallelism-piece-thread-array work))
                        (array-index (parallelism-piece-array-index work)))
                      (incf *total-parallelism-piece-historical-count*)
                      (setf (aref thread-array array-index) nil)
                      (atomic-incf *idle-thread-count*)
                      (atomic-decf *total-work-count*)
                      (atomic-decf *unassigned-and-active-work-count*)
                      (assert (semaphorep semaphore-to-signal-as-last-act))
                      (signal-semaphore semaphore-to-signal-as-last-act)))))))
          (atomic-incf *idle-core-count*)
          (signal-semaphore *check-work-and-core-availability*)
          (signal-semaphore *check-core-availability-for-resuming*)))))
  (atomic-decf *idle-thread-count*))
spawn-worker-threads-if-neededfunction
(defun spawn-worker-threads-if-needed
  nil
  (loop while
    (< (atomically-modifiable-counter-read *idle-thread-count*)
      *max-idle-thread-count*)
    do
    (progn (atomic-incf *idle-thread-count*)
      (run-thread "Worker thread"
        'consume-work-on-work-queue-when-there))))
add-work-list-to-queuefunction
(defun add-work-list-to-queue
  (work-list)
  (let ((work-list-length (length work-list)))
    (with-lock *work-queue-lock*
      (setf *work-queue* (nconc *work-queue* work-list)))
    (atomic-incf-multiple *total-work-count* work-list-length)
    (atomic-incf-multiple *unassigned-and-active-work-count*
      work-list-length)
    (dotimes (i work-list-length)
      (signal-semaphore *check-work-and-core-availability*))))
combine-array-results-into-listfunction
(defun combine-array-results-into-list
  (result-array current-position acc)
  (if (< current-position 0)
    acc
    (combine-array-results-into-list result-array
      (1- current-position)
      (cons (cdr (aref result-array current-position)) acc))))
remove-thread-array-from-work-queue-recfunction
(defun remove-thread-array-from-work-queue-rec
  (work-queue thread-array array-positions-left)
  (cond ((eql array-positions-left 0) work-queue)
    ((atom work-queue) nil)
    ((eq thread-array
       (parallelism-piece-thread-array (car work-queue))) (progn (atomic-decf *total-work-count*)
        (atomic-decf *unassigned-and-active-work-count*)
        (assert (semaphorep (parallelism-piece-semaphore-to-signal-as-last-act (car work-queue))))
        (signal-semaphore (parallelism-piece-semaphore-to-signal-as-last-act (car work-queue)))
        (remove-thread-array-from-work-queue-rec (cdr work-queue)
          thread-array
          (1- array-positions-left))))
    (t (cons (car work-queue)
        (remove-thread-array-from-work-queue-rec (cdr work-queue)
          thread-array
          (1- array-positions-left))))))
remove-thread-array-from-work-queuefunction
(defun remove-thread-array-from-work-queue
  (thread-array)
  (without-interrupts (with-lock *work-queue-lock*
      (setf *work-queue*
        (remove-thread-array-from-work-queue-rec *work-queue*
          thread-array
          (length thread-array))))))
terminate-siblingsfunction
(defun terminate-siblings
  (thread-array)
  (remove-thread-array-from-work-queue thread-array)
  (throw-threads-in-array thread-array
    (1- (length thread-array))))
generate-work-list-from-closure-list-recfunction
(defun generate-work-list-from-closure-list-rec
  (thread-array result-array
    children-done-semaphore
    closure-list
    current-position
    &optional
    throw-siblings-when-function)
  (if (atom closure-list)
    (assert (equal current-position (length thread-array)))
    (cons (make-parallelism-piece :thread-array thread-array
        :result-array result-array
        :array-index current-position
        :semaphore-to-signal-as-last-act children-done-semaphore
        :closure (car closure-list)
        :throw-siblings-when-function throw-siblings-when-function)
      (generate-work-list-from-closure-list-rec thread-array
        result-array
        children-done-semaphore
        (cdr closure-list)
        (1+ current-position)
        throw-siblings-when-function))))
generate-work-list-from-closure-listfunction
(defun generate-work-list-from-closure-list
  (closure-list &optional terminate-early-function)
  (let* ((closure-count (length closure-list)) (thread-array (make-array closure-count :initial-element nil))
      (result-array (make-array closure-count :initial-element nil))
      (children-done-semaphore (make-semaphore)))
    (progn (assert (semaphorep children-done-semaphore))
      (mv (generate-work-list-from-closure-list-rec thread-array
          result-array
          children-done-semaphore
          closure-list
          0
          (if terminate-early-function
            (lambda (x)
              (when (funcall terminate-early-function (cdr x))
                (terminate-siblings thread-array)))
            nil))
        thread-array
        result-array
        children-done-semaphore))))
pargs-parallelism-buffer-has-space-availablefunction
(defun pargs-parallelism-buffer-has-space-available
  nil
  (< (atomically-modifiable-counter-read *unassigned-and-active-work-count*)
    *unassigned-and-active-work-count-limit*))
not-too-many-pieces-of-parallelism-work-already-in-existencefunction
(defun not-too-many-pieces-of-parallelism-work-already-in-existence
  nil
  (< (atomically-modifiable-counter-read *total-work-count*)
    (f-get-global 'total-parallelism-work-limit
      *the-live-state*)))
parallelism-resources-availablefunction
(defun parallelism-resources-available
  nil
  (and (f-get-global 'parallel-execution-enabled *the-live-state*)
    (pargs-parallelism-buffer-has-space-available)
    (not-too-many-pieces-of-parallelism-work-already-in-existence)))
throw-threads-in-arrayfunction
(defun throw-threads-in-array
  (thread-array current-position)
  (assert thread-array)
  (when (<= 0 current-position)
    (let ((current-thread (aref thread-array current-position)))
      (when current-thread
        (interrupt-thread current-thread
          (lambda nil
            (when (aref thread-array current-position)
              (throw :result-no-longer-needed nil))))))
    (throw-threads-in-array thread-array (1- current-position))))
decrement-children-leftfunction
(defun decrement-children-left
  (children-left-ptr semaphore-notification-obj)
  (when (semaphore-notification-status semaphore-notification-obj)
    (decf (aref children-left-ptr 0))
    (clear-semaphore-notification-status semaphore-notification-obj)))
wait-for-children-to-finishfunction
(defun wait-for-children-to-finish
  (semaphore children-left-ptr semaphore-notification-obj)
  (assert children-left-ptr)
  (when (<= 1 (aref children-left-ptr 0))
    (assert (not (semaphore-notification-status semaphore-notification-obj)))
    (unwind-protect-disable-interrupts-during-cleanup (wait-on-semaphore semaphore
        :notification semaphore-notification-obj)
      (decrement-children-left children-left-ptr
        semaphore-notification-obj))
    (wait-for-children-to-finish semaphore
      children-left-ptr
      semaphore-notification-obj)))
wait-for-resumptive-parallelism-resourcesfunction
(defun wait-for-resumptive-parallelism-resources
  nil
  (loop while
    (<= (atomically-modifiable-counter-read *idle-core-count*)
      (- *core-count*))
    do
    (wait-on-semaphore *check-core-availability-for-resuming*))
  (atomic-incf *unassigned-and-active-work-count*)
  (atomic-decf *idle-core-count*))
early-terminate-children-and-rewaitfunction
(defun early-terminate-children-and-rewait
  (children-done-semaphore children-left-ptr
    semaphore-notification-obj
    thread-array)
  (when (< 0 (aref children-left-ptr 0))
    (remove-thread-array-from-work-queue thread-array)
    (throw-threads-in-array thread-array
      (1- (length thread-array)))
    (wait-for-children-to-finish children-done-semaphore
      children-left-ptr
      semaphore-notification-obj)))
prepare-to-wait-for-childrenfunction
(defun prepare-to-wait-for-children
  nil
  (atomic-incf *idle-core-count*)
  (signal-semaphore *check-work-and-core-availability*)
  (signal-semaphore *check-core-availability-for-resuming*)
  (atomic-decf *unassigned-and-active-work-count*))
parallelize-closure-listfunction
(defun parallelize-closure-list
  (closure-list &optional terminate-early-function)
  (assert (and (consp closure-list) (cdr closure-list)))
  (let ((work-list-setup-p nil) (semaphore-notification-obj (make-semaphore-notification))
      (children-left-ptr (make-array 1 :initial-element (length closure-list))))
    (mv-let (work-list thread-array
        result-array
        children-done-semaphore)
      (generate-work-list-from-closure-list closure-list
        terminate-early-function)
      (assert (semaphorep children-done-semaphore))
      (unwind-protect-disable-interrupts-during-cleanup (progn (without-interrupts (spawn-worker-threads-if-needed)
            (setq work-list-setup-p
              (progn (add-work-list-to-queue work-list) t))
            (prepare-to-wait-for-children))
          (wait-for-children-to-finish children-done-semaphore
            children-left-ptr
            semaphore-notification-obj))
        (progn (when work-list-setup-p
            (early-terminate-children-and-rewait children-done-semaphore
              children-left-ptr
              semaphore-notification-obj
              thread-array)
            (wait-for-resumptive-parallelism-resources)
            (assert (eq (aref children-left-ptr 0) 0)))))
      (combine-array-results-into-list result-array
        (1- (length result-array))
        nil))))
parallelize-fnfunction
(defun parallelize-fn
  (parent-fun-name arg-closures
    &optional
    terminate-early-function)
  (assert (cdr arg-closures))
  (let ((parallelize-closures-res (parallelize-closure-list arg-closures
         terminate-early-function)))
    (if (or (equal parent-fun-name 'and-list)
        (equal parent-fun-name 'or-list))
      (funcall parent-fun-name parallelize-closures-res)
      (apply parent-fun-name parallelize-closures-res))))
closure-for-expressionmacro
(defmacro closure-for-expression
  (x)
  (make-closure-expr-with-acl2-bindings x))
closure-list-for-expression-listmacro
(defmacro closure-list-for-expression-list
  (x)
  (if (atom x)
    nil
    `(cons (closure-for-expression ,(CAR X))
      (closure-list-for-expression-list ,(CDR X)))))
parallelism-conditionfunction
(defun parallelism-condition
  (gran-form-exists gran-form)
  (if gran-form-exists
    `(and (parallelism-resources-available) ,GRAN-FORM)
    '(parallelism-resources-available)))
pargsmacro
(defmacro pargs
  (&rest forms)
  (mv-let (erp msg gran-form-exists gran-form remainder-forms)
    (check-and-parse-for-granularity-form forms)
    (declare (ignore msg))
    (assert (not erp))
    (let ((function-call (car remainder-forms)))
      (if (null (cddr function-call))
        function-call
        (list 'if
          (parallelism-condition gran-form-exists gran-form)
          (list 'parallelize-fn
            (list 'quote (car function-call))
            (list 'closure-list-for-expression-list (cdr function-call)))
          function-call)))))
plet-doubletsfunction
(defun plet-doublets
  (bindings bsym n)
  (cond ((endp bindings) nil)
    (t (cons (list (caar bindings) (list 'nth n bsym))
        (plet-doublets (cdr bindings) bsym (1+ n))))))
make-closuresfunction
(defun make-closures
  (bindings)
  (if (endp bindings)
    nil
    (cons `(function (lambda nil ,(CADAR BINDINGS)))
      (make-closures (cdr bindings)))))
identity-listfunction
(defun identity-list (&rest rst) rst)
make-list-until-non-declarefunction
(defun make-list-until-non-declare
  (remaining-list acc)
  (if (not (caar-is-declarep remaining-list))
    (mv (reverse acc) remaining-list)
    (make-list-until-non-declare (cdr remaining-list)
      (cons (car remaining-list) acc))))
parse-additional-declare-forms-for-letfunction
(defun parse-additional-declare-forms-for-let
  (x)
  (mv-let (declare-forms body)
    (make-list-until-non-declare (cdr x) nil)
    (mv (car x) declare-forms body)))
pletmacro
(defmacro plet
  (&rest forms)
  (mv-let (erp msg gran-form-exists gran-form remainder-forms)
    (check-and-parse-for-granularity-form forms)
    (declare (ignore msg))
    (assert (not erp))
    (mv-let (bindings declare-forms body)
      (parse-additional-declare-forms-for-let remainder-forms)
      (cond ((null (cdr bindings)) `(let ,BINDINGS
            ,@DECLARE-FORMS
            ,@BODY))
        (t (list 'if
            (parallelism-condition gran-form-exists gran-form)
            (let ((bsym (acl2-gentemp "plet")))
              `(let ((,BSYM (parallelize-fn 'identity-list
                     (list ,@(MAKE-CLOSURES BINDINGS)))))
                (let ,(PLET-DOUBLETS BINDINGS BSYM 0)
                  ,@DECLARE-FORMS
                  ,@BODY)))
            `(let ,BINDINGS
              ,@DECLARE-FORMS
              ,@BODY)))))))
pandmacro
(defmacro pand
  (&rest forms)
  (mv-let (erp msg gran-form-exists gran-form remainder-forms)
    (check-and-parse-for-granularity-form forms)
    (declare (ignore msg))
    (assert (not erp))
    (if (null (cdr remainder-forms))
      (list 'if (car remainder-forms) t nil)
      (let ((and-early-termination-function '(lambda (x) (null x))))
        (list 'if
          (parallelism-condition gran-form-exists gran-form)
          (list 'parallelize-fn
            ''and-list
            (list 'closure-list-for-expression-list remainder-forms)
            and-early-termination-function)
          (list 'if (cons 'and remainder-forms) t nil))))))
pormacro
(defmacro por
  (&rest forms)
  (mv-let (erp msg gran-form-exists gran-form remainder-forms)
    (check-and-parse-for-granularity-form forms)
    (declare (ignore msg))
    (assert (not erp))
    (if (null (cdr remainder-forms))
      (list 'if (car remainder-forms) t nil)
      (let ((or-early-termination-function '(lambda (x) x)))
        (list 'if
          (parallelism-condition gran-form-exists gran-form)
          (list 'parallelize-fn
            ''or-list
            (list 'closure-list-for-expression-list remainder-forms)
            or-early-termination-function)
          (list 'if (cons 'or remainder-forms) t nil))))))
signal-semaphoresfunction
(defun signal-semaphores
  (sems)
  (cond ((endp sems) nil)
    (t (signal-semaphore (car sems))
      (signal-semaphores (cdr sems)))))
spec-mv-letmacro
(defmacro spec-mv-let
  (&whole spec-mv-let-form outer-vars computation body)
  (case-match body
    ((inner-let inner-vars
       inner-body
       ('if test true-branch false-branch)) (cond ((not (member inner-let '(mv-let mv?-let mv-let@par) :test 'eq)) (er hard!
            'spec-mv-let
            "Illegal form (expected inner let to bind with one of ~v0): ~x1. ~ ~
            See :doc spec-mv-let."
            '(mv-let mv?-let mv-let@par)
            spec-mv-let-form))
        ((or (not (symbol-listp outer-vars))
           (not (symbol-listp inner-vars))
           (intersectp inner-vars outer-vars :test 'eq)) (er hard!
            'spec-mv-let
            "Illegal spec-mv-let form: ~x0.  The two bound variable lists ~
            must be disjoint true lists of variables, unlike ~x1 and ~x2.  ~
            See :doc spec-mv-let."
            spec-mv-let-form
            inner-vars
            outer-vars))
        (t `(let ((the-very-obscure-feature (future ,COMPUTATION)))
            (,INNER-LET ,INNER-VARS
              ,INNER-BODY
              (cond (,TEST (mv?-let ,OUTER-VARS
                    (future-read the-very-obscure-feature)
                    ,TRUE-BRANCH))
                (t (future-abort the-very-obscure-feature) ,FALSE-BRANCH)))))))
    (& (er hard!
        'spec-mv-let
        "Illegal form, ~x0.  See :doc spec-mv-let."
        spec-mv-let-form))))
number-of-active-threads-auxfunction
(defun number-of-active-threads-aux
  (threads acc)
  (declare (ignore threads acc))
  0)
number-of-active-threadsfunction
(defun number-of-active-threads
  nil
  (number-of-active-threads-aux (all-threads) 0))
number-of-threads-waiting-on-a-child-auxfunction
(defun number-of-threads-waiting-on-a-child-aux
  (threads acc)
  (declare (ignore threads acc))
  0)
number-of-threads-waiting-on-a-childfunction
(defun number-of-threads-waiting-on-a-child
  nil
  (number-of-threads-waiting-on-a-child-aux (all-threads) 0))
future-queue-lengthfunction
(defun future-queue-length
  nil
  (+ (- *last-slot-saved* *last-slot-taken*)
    *threads-waiting-for-starting-core*))
total-number-of-threadsfunction
(defun total-number-of-threads nil (length (all-threads)))
other
(defvar *refresh-rate-indicator* 0)
value-of-symbolmacro
(defmacro value-of-symbol
  (var)
  (when (not (or (fboundp var) (symbolp var)))
    (error "value-of-symbol requires a symbol or function name as its argument"))
  (cond ((constantp var) `(format nil " Constant ~s is ~s~% " ,(SYMBOL-NAME VAR) ,VAR))
    ((fboundp var) `(format nil
        " Stat     ~s is ~s~% "
        ,(SYMBOL-NAME VAR)
        (,VAR)))
    ((boundp-global var *the-live-state*) `(format nil
        " Stat     ~s is ~s~% "
        ,(SYMBOL-NAME VAR)
        ,(F-GET-GLOBAL VAR *THE-LIVE-STATE*)))
    (t `(format nil " Variable ~s is ~s~% " ,(SYMBOL-NAME VAR) ,VAR))))
acl2p-sum-list1function
(defun acl2p-sum-list1
  (lst acc)
  (cond ((endp lst) acc)
    (t (acl2p-sum-list1 (cdr lst) (+ (car lst) acc)))))
acl2p-sum-listfunction
(defun acl2p-sum-list (lst) (acl2p-sum-list1 lst 0))
average-future-queue-sizefunction
(defun average-future-queue-size
  nil
  (* 1.0
    (/ (acl2p-sum-list *future-queue-length-history*)
      (length *future-queue-length-history*))))
print-interesting-parallelism-variables-strfunction
(defun print-interesting-parallelism-variables-str
  nil
  (incf *refresh-rate-indicator*)
  (setf *future-queue-length-history*
    (cons (future-queue-length) *future-queue-length-history*))
  (concatenate 'string
    (format nil
      "  Printing stats related to executing proofs in parallel.~% ")
    (value-of-symbol *idle-future-core-count*)
    (value-of-symbol *idle-future-resumptive-core-count*)
    (value-of-symbol *idle-future-thread-count*)
    (value-of-symbol *threads-waiting-for-starting-core*)
    (value-of-symbol number-of-idle-threads-and-threads-waiting-for-a-starting-core)
    (value-of-symbol total-number-of-threads)
    (format nil "~% ")
    (value-of-symbol *unassigned-and-active-future-count*)
    (value-of-symbol *unassigned-and-active-work-count-limit*)
    (value-of-symbol *total-future-count*)
    (value-of-symbol total-parallelism-work-limit)
    (format nil "~% ")
    (value-of-symbol number-of-active-threads)
    (value-of-symbol number-of-threads-waiting-on-a-child)
    (format nil "~% ")
    (value-of-symbol *last-slot-taken*)
    (value-of-symbol *last-slot-saved*)
    (value-of-symbol future-queue-length)
    (value-of-symbol average-future-queue-size)
    (format nil "~% ")
    (value-of-symbol *resource-based-parallelizations*)
    (value-of-symbol *resource-based-serializations*)
    (value-of-symbol *resource-and-timing-based-parallelizations*)
    (value-of-symbol *resource-and-timing-based-serializations*)
    (value-of-symbol *futures-resources-available-count*)
    (value-of-symbol *futures-resources-unavailable-count*)
    (format nil "~% ")
    (format nil
      " Printing stats related to aborting futures.~% ")
    (value-of-symbol *aborted-futures-total*)
    (value-of-symbol *aborted-futures-via-throw*)
    (value-of-symbol *aborted-futures-via-flag*)
    (value-of-symbol *almost-aborted-future-count*)
    (format nil "~% ")
    (value-of-symbol *refresh-rate-indicator*)))
print-interesting-parallelism-variablesfunction
(defun print-interesting-parallelism-variables
  nil
  (format t (print-interesting-parallelism-variables-str)))