worker.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /**
  2. * Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
  3. *
  4. * This source code is licensed under the MIT license found in the
  5. * LICENSE file in the root directory of this source tree.
  6. *
  7. *
  8. */
  9. 'use strict';
  10. Object.defineProperty(exports, "__esModule", {
  11. value: true
  12. });
  13. var _child_process;
  14. function _load_child_process() {
  15. return _child_process = _interopRequireDefault(require('child_process'));
  16. }
  17. var _types;
  18. function _load_types() {
  19. return _types = require('./types');
  20. }
  21. function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
  22. /**
  23. * This class wraps the child process and provides a nice interface to
  24. * communicate with. It takes care of:
  25. *
  26. * - Re-spawning the process if it dies.
  27. * - Queues calls while the worker is busy.
  28. * - Re-sends the requests if the worker blew up.
  29. *
  30. * The reason for queueing them here (since childProcess.send also has an
  31. * internal queue) is because the worker could be doing asynchronous work, and
  32. * this would lead to the child process to read its receiving buffer and start a
  33. * second call. By queueing calls here, we don't send the next call to the
  34. * children until we receive the result of the previous one.
  35. *
  36. * As soon as a request starts to be processed by a worker, its "processed"
  37. * field is changed to "true", so that other workers which might encounter the
  38. * same call skip it.
  39. */
  40. exports.default = class {
  41. constructor(options) {
  42. this._options = options;
  43. this._queue = [];
  44. this._initialize();
  45. }
  46. getStdout() {
  47. return this._child.stdout;
  48. }
  49. getStderr() {
  50. return this._child.stderr;
  51. }
  52. send(request, callback) {
  53. this._queue.push({ callback, request });
  54. this._process();
  55. }
  56. _initialize() {
  57. const child = (_child_process || _load_child_process()).default.fork(require.resolve('./child'),
  58. // $FlowFixMe: Flow does not work well with Object.assign.
  59. Object.assign({
  60. cwd: process.cwd(),
  61. env: Object.assign({}, process.env, {
  62. JEST_WORKER_ID: this._options.workerId
  63. }),
  64. // suppress --debug / --inspect flags while preserving others (like --harmony)
  65. execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)),
  66. silent: true
  67. }, this._options.forkOptions));
  68. child.on('message', this._receive.bind(this));
  69. child.on('exit', this._exit.bind(this));
  70. // $FlowFixMe: wrong "ChildProcess.send" signature.
  71. child.send([(_types || _load_types()).CHILD_MESSAGE_INITIALIZE, false, this._options.workerPath]);
  72. this._retries++;
  73. this._child = child;
  74. this._busy = false;
  75. // If we exceeded the amount of retries, we will emulate an error reply
  76. // coming from the child. This avoids code duplication related with cleaning
  77. // the queue, and scheduling the next call.
  78. if (this._retries > this._options.maxRetries) {
  79. const error = new Error('Call retries were exceeded');
  80. this._receive([(_types || _load_types()).PARENT_MESSAGE_ERROR, error.name, error.message, error.stack, { type: 'WorkerError' }]);
  81. }
  82. }
  83. _process() {
  84. if (this._busy) {
  85. return;
  86. }
  87. const queue = this._queue;
  88. let skip = 0;
  89. // Calls in the queue might have already been processed by another worker,
  90. // so we have to skip them.
  91. while (queue.length > skip && queue[skip].request[1]) {
  92. skip++;
  93. }
  94. // Remove all pieces at once.
  95. queue.splice(0, skip);
  96. if (queue.length) {
  97. const call = queue[0];
  98. // Flag the call as processed, so that other workers know that they don't
  99. // have to process it as well.
  100. call.request[1] = true;
  101. this._retries = 0;
  102. this._busy = true;
  103. // $FlowFixMe: wrong "ChildProcess.send" signature.
  104. this._child.send(call.request);
  105. }
  106. }
  107. _receive(response /* Should be ParentMessage */) {
  108. const callback = this._queue[0].callback;
  109. this._busy = false;
  110. this._process();
  111. switch (response[0]) {
  112. case (_types || _load_types()).PARENT_MESSAGE_OK:
  113. callback.call(this, null, response[1]);
  114. break;
  115. case (_types || _load_types()).PARENT_MESSAGE_ERROR:
  116. let error = response[4];
  117. if (error != null && typeof error === 'object') {
  118. const extra = error;
  119. const NativeCtor = global[response[1]];
  120. const Ctor = typeof NativeCtor === 'function' ? NativeCtor : Error;
  121. error = new Ctor(response[2]);
  122. // $FlowFixMe: adding custom properties to errors.
  123. error.type = response[1];
  124. error.stack = response[3];
  125. for (const key in extra) {
  126. // $FlowFixMe: adding custom properties to errors.
  127. error[key] = extra[key];
  128. }
  129. }
  130. callback.call(this, error, null);
  131. break;
  132. default:
  133. throw new TypeError('Unexpected response from worker: ' + response[0]);
  134. }
  135. }
  136. _exit(exitCode) {
  137. if (exitCode !== 0) {
  138. this._initialize();
  139. }
  140. }
  141. };