[security][CVE-2022-27809] Builtins should always take int64_t, not int
[hiphop-php.git] / hphp / runtime / ext / watchman / ext_watchman.cpp
blobfdc08e924c56bdd242f64955381c14b868d13806
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-2015 Facebook, Inc. (http://www.facebook.com) |
6 | Copyright (c) 1997-2010 The PHP Group |
7 +----------------------------------------------------------------------+
8 | This source file is subject to version 3.01 of the PHP license, |
9 | that is bundled with this package in the file LICENSE, and is |
10 | available through the world-wide-web at the following url: |
11 | http://www.php.net/license/3_01.txt |
12 | If you did not receive a copy of the PHP license and are unable to |
13 | obtain it through the world-wide-web, please send a note to |
14 | license@php.net so we can mail you a copy immediately. |
15 +----------------------------------------------------------------------+
18 #include <atomic>
19 #include <chrono>
20 #include <deque>
21 #include <exception>
22 #include <map>
23 #include <memory>
24 #include <mutex>
25 #include <stdexcept>
26 #include <string>
27 #include <thread>
28 #include <tuple>
29 #include <unordered_map>
30 #include <utility>
31 #include <vector>
33 #include <folly/dynamic.h>
34 #include <folly/ExceptionWrapper.h>
35 #include <folly/Executor.h>
36 #include <folly/Format.h>
37 #include <folly/futures/Future.h>
38 #include <folly/io/async/EventBase.h>
39 #include <folly/io/async/ScopedEventBaseThread.h>
40 #include <folly/json.h>
42 #include <watchman/cppclient/WatchmanClient.h>
44 #include "hphp/runtime/base/array-init.h"
45 #include "hphp/runtime/base/builtin-functions.h"
46 #include "hphp/runtime/base/execution-context.h"
47 #include "hphp/runtime/base/program-functions.h"
48 #include "hphp/runtime/base/tv-refcount.h"
49 #include "hphp/runtime/base/unit-cache.h"
50 #include "hphp/runtime/ext/asio/asio-external-thread-event.h"
51 #include "hphp/runtime/ext/asio/socket-event.h"
52 #include "hphp/runtime/ext/extension.h"
53 #include "hphp/runtime/vm/vm-regs.h"
54 #include "hphp/runtime/vm/treadmill.h"
55 #include "hphp/util/async-func.h"
56 #include "hphp/util/logger.h"
58 /* Threading, Locking, and Data-Sharing Notes
60 * As there is a bunch of data shared between all these contexts we follow a few
61 * rules to keep things sane:
63 * - All shared data are covered by a single mutex: s_sharedDataMutex.
65 * - Unless explicitly noted in comments, entry points from asynchronous sources
66 * (HHVM calls, callbacks, etc.) immediately lock the global mutex and hold it
67 * for their lifetime. Therefore all other functions/methods assume by default
68 * the lock is held apart from some explicit exceptions.
70 * - All IO/Watchman operations are run asynchronously and for this reason most
71 * of the HHVM visible API returns Awaitables.
73 * - All IO/Watchman asynchronous call-backs are handled on a single thread +
74 * event-base managed by a singleton of WatchmanThreadEventBase. This
75 * automatically locks the global mutex around all ASYNC callbacks and
76 * guarantees serial execution in FIFO order.
78 * - The user specified PHP-callback on subsription update is executed in a
79 * fresh AsyncFunc thread. This avoids running this potentially slow operation
80 * in the IO/Watchamn thread.
82 * Code in this module may be executed in one of 4 possible thread contexts:
84 * - Code called from arbitrary PHP-user requests (PHP).
86 * - In an AsyncFunc thread (PHP-CALLBACK).
88 * - In a folly event base thread managed by WatchmanThreadEventBase (ASYNC).
90 * - HHVM initialization (INIT) - musn't use PHP exception throwing!
92 * All methods, functions, and lambdas indicate which of these contexts they are
93 * executed from and whether they are an entry-point from that context. If code
94 * is an entry-point it should either grab a lock immediately or explain why
95 * not. Non entry-points assume they have a lock unless explicitly stated.
97 * All data is worked on using STL/folly types. Arguments coming in from PHP are
98 * immediately converted to their STL equivelants. All data going out are
99 * converted from STL types as late as possible.
102 namespace HPHP {
103 namespace {
105 // Lock covering all Watchman global data - this needs to be grabbed in
106 // any direct entries from HHVM.
107 std::mutex s_sharedDataMutex;
109 // Class to execute short-run Watchman related callbacks in serial order. Long-
110 // running Watchman subscription callbacks are run as AsyncFunc threads
111 // initiated from this thread.
112 struct WatchmanThreadEventBase : folly::Executor {
113 // (PHP)
114 WatchmanThreadEventBase() :
115 m_eventThread([this] {
116 // We don't have/need a lock here as we don't use any shared data.
117 m_eventBase.loopForever();
121 // (INIT)
122 ~WatchmanThreadEventBase() override {
123 drain();
124 m_eventBase.terminateLoopSoon();
125 m_eventThread.join();
128 // (PHP)
129 void add(folly::Func f) override {
130 m_eventBase.add([f = std::move(f)] () mutable { // (ASYNC entry-point)
131 std::lock_guard<std::mutex> g(s_sharedDataMutex);
132 f();
136 // (INIT)
137 void drain() {
138 // assume EventBases execute queue in FIFO order
139 getEventBase().runInEventBaseThreadAndWait([] {});
142 // (PHP)
143 folly::EventBase& getEventBase() {
144 return m_eventBase;
147 // (PHP / INIT) Methods for singleton functionality. Folly's Singleton
148 // implementation doesn't seem to fit here as it's built around shared_ptrs,
149 // but there is no "shared ownership" of the instance. It's instead owned by
150 // the whole process but only after it's created.
151 static WatchmanThreadEventBase* Get() {
152 if (!s_wmTEB) {
153 s_wmTEB = new WatchmanThreadEventBase();
155 return s_wmTEB;
158 // (INIT)
159 static void Free() {
160 delete s_wmTEB;
161 s_wmTEB = nullptr;
164 // (INIT)
165 static bool Exists() {
166 return !!s_wmTEB;
169 private:
170 folly::EventBase m_eventBase;
171 std::thread m_eventThread;
173 static WatchmanThreadEventBase* s_wmTEB;
176 WatchmanThreadEventBase* WatchmanThreadEventBase::s_wmTEB{nullptr};
178 struct ActiveSubscription {
179 // There should only be exaclty one instance of a given ActiveSubscription
180 // and this should live in s_activeSubscriptions.
181 ActiveSubscription& operator=(const ActiveSubscription&) = delete;
182 ActiveSubscription(const ActiveSubscription&) = delete;
183 ActiveSubscription&& operator=(ActiveSubscription&&) = delete;
184 ActiveSubscription(ActiveSubscription&&) = delete;
186 // (PHP)
187 ActiveSubscription(
188 const std::string& socket_path,
189 const std::string& path,
190 const std::string& query,
191 const std::string& callback_func,
192 const std::string& callback_file,
193 const std::string& name
195 m_name(name),
196 m_socketPath(socket_path),
197 m_path(path),
198 m_query(query),
199 m_callbackFunc(callback_func),
200 m_callbackFile(callback_file)
203 // (PHP)
204 folly::Future<folly::Unit> subscribe(
205 std::shared_ptr<watchman::WatchmanClient> client
207 // We hold onto the WatchmanClient pointer so it doesn't get killed before
208 // we are unsubscribed.
209 m_watchmanClient = client;
211 auto dynamic_query = folly::parseJson(m_query);
213 return client->subscribe(
214 dynamic_query,
215 m_path,
216 WatchmanThreadEventBase::Get(),
217 [this] (const folly::Try<folly::dynamic>&& data) { // (ASYNC)
218 if (m_unsubcribeInProgress) {
219 return;
221 if (data.hasException()) {
222 folly::dynamic error_data = folly::dynamic::object;
223 error_data["connection_error"] = data.exception().what().c_str();
224 m_unprocessedCallbackData.emplace_front(std::move(error_data));
225 } else {
226 m_unprocessedCallbackData.emplace_front(std::move(*data));
228 // Existing callbacks will drain the data queue
229 if (!m_callbackInProgress) {
230 processNextUpdate();
233 .via(WatchmanThreadEventBase::Get())
234 .thenValue([this](watchman::SubscriptionPtr wm_sub) { // (ASYNC)
235 m_subscriptionPtr = wm_sub;
236 return folly::unit;
240 // (PHP / INIT)
241 folly::Future<std::string> unsubscribe() {
242 if (m_unsubcribeInProgress) {
243 throw std::runtime_error(folly::sformat(
244 "Unsubscribe operation already in progress for this subscription. "
245 "Make sure to await on the unsubscribe result - query:{}, path:{}, "
246 "name:{}, socket_path:{}",
247 m_query, m_path, m_name, m_socketPath));
249 if (!m_subscriptionPtr) {
250 throw std::runtime_error(folly::sformat(
251 "Subscription still initializing. Make sure to await on the "
252 "subscribe result - query:{}, path:{}, name:{}, socket_path:{}",
253 m_query, m_path, m_name, m_socketPath));
256 m_unprocessedCallbackData.clear();
257 m_unsubcribeInProgress = true;
259 folly::Future<bool> unsubscribe_future{false};
260 // If the connection is alive we must perform an actual unsubscribe. If not,
261 // we just need to sync to make sure all the outstanding threads complete.
262 if (checkConnection()) {
263 unsubscribe_future = m_watchmanClient->unsubscribe(m_subscriptionPtr)
264 .via(WatchmanThreadEventBase::Get())
265 .thenValue([this] (const folly::dynamic& result) { // (ASYNC)
266 m_unsubscribeData = toJson(result).data();
267 return sync(std::chrono::milliseconds::zero());
269 } else {
270 m_unsubscribeData = "Watchman connection dead.";
271 unsubscribe_future = sync(std::chrono::milliseconds::zero());
273 return unsubscribe_future
274 // All ASYNC calls run with the lock held and in some cases the lock held
275 // by the sync() above will not be released before executing the following
276 // then(). This is a problem because the lock may need to be released to
277 // allow an AsyncFunc() thread to complete before the waitForEnd() calls.
278 // So, we use an explicit via() below to force the lock to be released and
279 // only re-acquired after the sync promise is fulfilled in the AsyncFunc()
280 // thread.
281 .via(WatchmanThreadEventBase::Get())
282 .thenValue([this](auto&&){ // (ASYNC)
283 // These should be finished by now due to the syncing above
284 if (m_oldCallbackExecThread) {
285 m_oldCallbackExecThread->waitForEnd();
287 if (m_callbackExecThread) {
288 m_callbackExecThread->waitForEnd();
290 m_unsubscribePromise.setValue(m_unsubscribeData);
291 return m_unsubscribePromise.getFuture();
295 // (PHP-CALLBACK entry-point) This manually gets a lock where needed but
296 // avoids holding one most of the time as this can be a quite slow operation.
297 void runCallback() {
298 hphp_session_init(Treadmill::SessionKind::Watchman);
299 auto context = g_context.getNoCheck();
300 SCOPE_EXIT {
301 hphp_context_exit();
302 hphp_session_exit();
304 std::lock_guard<std::mutex> g(s_sharedDataMutex);
305 processNextUpdate();
308 try {
309 std::string json_data;
311 std::lock_guard<std::mutex> g(s_sharedDataMutex);
312 if (m_unprocessedCallbackData.empty()) {
313 return;
315 auto& data = m_unprocessedCallbackData.back();
316 json_data = toJson(data);
317 m_unprocessedCallbackData.pop_back();
319 bool initial;
320 auto unit = lookupUnit(
321 String(m_callbackFile.c_str()).get(),
323 &initial,
324 Native::s_noNativeFuncs,
325 false);
326 if (!unit) {
327 throw std::runtime_error(
328 folly::sformat("Unit '{}' no longer exists.", m_callbackFile));
330 if (!RuntimeOption::EvalPreludePath.empty()) {
331 auto const doc = unit->filepath()->data();
332 invoke_prelude_script(
333 m_path.c_str(),
334 doc,
335 RuntimeOption::EvalPreludePath,
336 m_path.c_str());
338 auto unit_result = Variant::attach(context->invokeUnit(unit));
339 auto func = Func::load(String(m_callbackFunc.c_str()).get());
340 if (!func) {
341 throw std::runtime_error(
342 folly::sformat("Callback '{}' no longer exists", m_callbackFunc));
344 String str_path(m_path.c_str());
345 String str_query(m_query.c_str());
346 String str_name(m_name.c_str());
347 String str_json_data(json_data.c_str());
348 String str_socket_path(m_socketPath.c_str());
349 TypedValue args[] = {
350 str_path.asTypedValue(),
351 str_query.asTypedValue(),
352 str_name.asTypedValue(),
353 str_json_data.asTypedValue(),
354 str_socket_path.asTypedValue(),
356 tvDecRefGen(
357 context->invokeFuncFew(
358 func,
359 nullptr, // thisOrCls
360 5, // argc
361 args,
362 RuntimeCoeffects::fixme(),
363 true, // dynamic
364 true // allowDynCallNoPointer
367 } catch(Exception& e) {
368 if (m_error.empty()) {
369 m_error = e.getMessage();
371 } catch(Object& e) {
372 if (m_error.empty()) {
373 try {
374 m_error = throwable_to_string(e.get()).data();
375 } catch(...) {
376 m_error = "PHP exception which cannot be turned into a string";
379 } catch(const std::exception& e) {
380 if (m_error.empty()) {
381 m_error = folly::exceptionStr(e).toStdString();
383 } catch(...) {
384 if (m_error.empty()) {
385 m_error = "Unknown error (non std::exception)";
390 // (PHP)
391 std::string getAndClearError() {
392 std::string result;
393 std::swap(result, m_error);
394 return result;
397 // (PHP / INIT)
398 bool checkConnection() {
399 if (!m_alive) { return false; }
400 if (m_watchmanClient.get() && m_watchmanClient->isDead()) {
401 m_alive = false;
403 // Send our WatchmanClient reference to the EventBaseThread to be
404 // destroyed there.
406 // `m_watchmanClient`'s destructor always runs some work on the EventBase
407 // thread, because every `folly::AsyncSocket` must be destroyed on its
408 // EventBase thread. Trying to destroy `m_watchmanClient` on this thread
409 // will just block us until the EventBase thread's work is done.
411 // Because we're currently holding `s_sharedDataMutex`, and we sometimes
412 // ask the EventBase thread to acquire `s_sharedDataMutex`, we sometimes
413 // deadlocked when we blocked on the EventBase thread finishing all of
414 // its work. See D32157097 or https://github.com/facebook/hhvm/pull/8932
415 // for a description of the deadlock that sometimes occurred when we
416 // would synchronously destroy `m_watchmanClient`.
417 WatchmanThreadEventBase::Get()->getEventBase()
418 .runInEventBaseThread([deleteMe = std::move(m_watchmanClient)] {
421 return m_alive;
424 // (PHP / PHP-CALLBACK)
425 void processNextUpdate() {
426 if (m_oldCallbackExecThread) {
427 // Old thread will be complete by now
428 m_oldCallbackExecThread->waitForEnd();
429 m_oldCallbackExecThread.reset();
431 if (m_callbackExecThread) {
432 // Can't wait for previous thread to end here as we may still be in it
433 m_oldCallbackExecThread.swap(m_callbackExecThread);
436 if (m_unprocessedCallbackData.empty()) {
437 m_callbackInProgress = false;
438 for (auto& promise : m_syncPromises) {
439 promise.setValue(true);
441 m_syncPromises.clear();
442 } else {
443 m_callbackInProgress = true;
444 m_callbackExecThread = std::make_unique<AsyncFunc<ActiveSubscription>>(
445 this,
446 &ActiveSubscription::runCallback);
447 m_callbackExecThread->start();
451 // (PHP)
452 folly::Future<folly::Optional<folly::dynamic>> watchmanFlush(
453 std::chrono::milliseconds timeout
455 return !checkConnection() || m_unsubcribeInProgress || !m_subscriptionPtr
456 ? folly::makeFuture(folly::Optional<folly::dynamic>())
457 : m_watchmanClient->flushSubscription(m_subscriptionPtr, timeout)
458 .via(WatchmanThreadEventBase::Get());
461 // (PHP / ASYNC)
462 folly::Future<bool> sync(std::chrono::milliseconds timeout) {
463 if (m_unprocessedCallbackData.size() == 0 && !m_callbackInProgress) {
464 return folly::makeFuture(true);
466 folly::Promise<bool> promise;
467 auto res_future = promise.getFuture();
468 if (timeout != std::chrono::milliseconds::zero()) {
469 res_future = std::move(res_future).within(timeout)
470 .thenError(
471 folly::tag_t<folly::FutureTimeout>{},
472 [](folly::FutureTimeout) {
473 return false;
476 m_syncPromises.emplace_back(std::move(promise));
477 return res_future;
480 const std::string m_name;
482 private:
483 const std::string m_socketPath;
484 const std::string m_path;
485 const std::string m_query;
486 const std::string m_callbackFunc;
487 const std::string m_callbackFile;
488 std::unique_ptr<AsyncFunc<ActiveSubscription>> m_oldCallbackExecThread;
489 std::unique_ptr<AsyncFunc<ActiveSubscription>> m_callbackExecThread;
490 watchman::SubscriptionPtr m_subscriptionPtr;
491 std::shared_ptr<watchman::WatchmanClient> m_watchmanClient;
492 std::string m_error;
493 bool m_alive{true};
494 std::deque<folly::dynamic> m_unprocessedCallbackData;
495 bool m_callbackInProgress{false};
496 bool m_unsubcribeInProgress{false};
497 std::string m_unsubscribeData;
498 folly::Promise<std::string> m_unsubscribePromise;
499 std::vector<folly::Promise<bool>> m_syncPromises;
502 std::unordered_map<std::string, ActiveSubscription> s_activeSubscriptions;
504 template <typename T> struct FutureEvent : AsioExternalThreadEvent {
505 // (PHP)
506 explicit FutureEvent(folly::Future<T>&& future)
508 std::move(future).thenTry([this] (folly::Try<T> result) { // (ASYNC)
509 if (result.hasException()) {
510 m_exception = result.exception();
511 } else {
512 m_result = result.value();
514 markAsFinished();
518 protected:
519 // (PHP entry-point) we do not get a lock here as this should only be called
520 // after the markAsFinished() above which is the only place mutating the state
521 // of the data used here. markAsFinished() can only be called once as it is
522 // only called (indirectly) from construction of this object instance.
523 void unserialize(TypedValue& result) override {
524 if (m_exception) {
525 SystemLib::throwInvalidOperationExceptionObject(
526 m_exception.what().c_str());
527 } else {
528 unserializeImpl(result);
532 private:
533 // (PHP) no lock
534 template<typename U = T>
535 typename std::enable_if<std::is_same<U, std::string>::value>::type
536 unserializeImpl(TypedValue& result)
538 tvCopy(make_tv<KindOfString>(StringData::Make(m_result)), result);
541 // (PHP) no lock
542 template<typename U = T>
543 typename std::enable_if<std::is_same<U, folly::Unit>::value>::type
544 unserializeImpl(TypedValue& result)
546 tvCopy(make_tv<KindOfNull>(), result);
549 // (PHP) no lock
550 template<typename U = T>
551 typename std::enable_if<std::is_same<U, bool>::value>::type
552 unserializeImpl(TypedValue& result)
554 tvCopy(make_tv<KindOfBoolean>(m_result), result);
557 T m_result;
558 folly::exception_wrapper m_exception;
561 // (PHP) Makes a new WatchmanClient.
562 folly::Future<std::shared_ptr<watchman::WatchmanClient>>
563 getWatchmanClientForSocket(const std::string& socket_path) {
564 auto socket =
565 socket_path.size() ? socket_path : folly::Optional<std::string>();
566 auto client = std::make_shared<watchman::WatchmanClient>(
567 &(WatchmanThreadEventBase::Get()->getEventBase()),
568 std::move(socket),
569 WatchmanThreadEventBase::Get(),
570 [](folly::exception_wrapper& /*ex*/) { /* (ASYNC) error handler */ }
572 return client->connect()
573 .via(WatchmanThreadEventBase::Get())
574 .thenValue([client](const folly::dynamic& /*connect_info*/) {
575 // (ASYNC)
576 return client;
580 // (PHP / INIT)
581 folly::Future<std::string> watchman_unsubscribe_impl(const std::string& name) {
582 auto entry = s_activeSubscriptions.find(name);
583 if (entry == s_activeSubscriptions.end()) {
584 throw std::runtime_error(folly::sformat("No subscription '{}'", name));
586 auto res_future = entry->second.unsubscribe()
587 // I assume the items queued on the event base are drained in FIFO
588 // order. So, after the unsubscribe should be safe to clean up.
589 .thenValue([] (std::string&& result) {
590 // (ASYNC)
591 return std::move(result);
593 .ensure([name] {
594 // (ASYNC)
595 s_activeSubscriptions.erase(name);
597 return res_future;
600 // (PHP entry-point)
601 Object HHVM_FUNCTION(HH_watchman_run,
602 const String& _json_query,
603 const Variant& _socket_path
605 std::lock_guard<std::mutex> g(s_sharedDataMutex);
607 auto json_query = _json_query.toCppString();
608 auto socket_path = _socket_path.isNull() ?
609 "" : _socket_path.toString().toCppString();
611 auto dynamic_query = folly::parseJson(json_query);
612 auto res_future = getWatchmanClientForSocket(socket_path)
613 .thenValue([dynamic_query] (std::shared_ptr<watchman::WatchmanClient> client) {
614 // (ASYNC)
615 return client->run(dynamic_query)
616 .via(WatchmanThreadEventBase::Get())
617 // pass client shared_ptr through to keep client alive
618 .thenValue([client] (const folly::dynamic& result) {
619 return std::string(toJson(result).data());
622 return Object{
623 (new FutureEvent<std::string>(std::move(res_future)))->getWaitHandle()
627 // (PHP entry-point)
628 Object HHVM_FUNCTION(HH_watchman_subscribe,
629 const String& _json_query,
630 const String& _path,
631 const String& _name,
632 const String& callback_function,
633 const Variant& _socket_path
635 std::lock_guard<std::mutex> g(s_sharedDataMutex);
637 auto json_query = _json_query.toCppString();
638 auto path = _path.toCppString();
639 auto name = _name.toCppString();
640 auto socket_path = _socket_path.isNull() ?
641 "" : _socket_path.toString().toCppString();
643 if (s_activeSubscriptions.count(name)) {
644 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
645 "Subscription with name '{}' already exists", name));
648 // Validate callback
649 const Func* f = Func::load(callback_function.get());
650 if (!f || f->hasVariadicCaptureParam() || f->numParams() != 5 ||
651 !f->fullName() || !f->filename())
653 SystemLib::throwInvalidOperationExceptionObject(
654 "Invalid callback parameter. Must reference a top-level function defined "
655 "in a PHP/HH file and have exactly 5 (string type) arguments.");
658 s_activeSubscriptions.emplace(
659 std::piecewise_construct,
660 std::forward_as_tuple(name),
661 std::forward_as_tuple(
662 socket_path,
663 path,
664 json_query,
665 f->fullName()->toCppString(),
666 f->filename()->toCppString(),
667 name));
668 try {
669 auto res_future = getWatchmanClientForSocket(socket_path)
670 .thenValue([name] (std::shared_ptr<watchman::WatchmanClient> client)
671 -> folly::Future<folly::Unit>
673 // (ASYNC)
674 auto sub_entry = s_activeSubscriptions.find(name);
675 if (sub_entry != s_activeSubscriptions.end()) {
676 return sub_entry->second.subscribe(client);
678 return folly::unit;
680 .thenError(
681 folly::tag_t<std::exception>{},
682 [name] (std::exception const& e) -> folly::Unit {
683 // (ASNYC) delete active subscription
684 s_activeSubscriptions.erase(name);
685 throw std::runtime_error(e.what());
687 return Object{
688 (new FutureEvent<folly::Unit>(std::move(res_future)))->getWaitHandle()
690 } catch(...) {
691 s_activeSubscriptions.erase(name);
692 throw;
694 not_reached();
697 // (PHP entry-point)
698 bool HHVM_FUNCTION(HH_watchman_check_sub, const String& _name) {
699 std::lock_guard<std::mutex> g(s_sharedDataMutex);
701 std::string name = _name.toCppString();
703 auto sub_entry = s_activeSubscriptions.find(name);
704 if (sub_entry == s_activeSubscriptions.end()) {
705 return false;
708 std::string error;
709 bool connection_alive = false;
710 try {
711 error = sub_entry->second.getAndClearError();
712 connection_alive = sub_entry->second.checkConnection();
713 } catch(const std::exception& e) {
714 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
715 "Error '{}' checking subscription named '{}'", e.what(), name));
716 } catch(...) {
717 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
718 "Unknown error checking subscription named '{}'", name));
720 if (!error.empty() || !connection_alive) {
721 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
722 "Problem(s) on subscription named '{}'. Connection {} alive.\n"
723 "First error:\n{}",
724 name,
725 connection_alive ? "IS" : "IS NOT",
726 error.empty() ? "(none)" : error));
728 return true;
731 // (PHP entry-point)
732 Object HHVM_FUNCTION(HH_watchman_sync_sub,
733 const String& _name,
734 int64_t timeout_ms)
736 std::lock_guard<std::mutex> g(s_sharedDataMutex);
738 std::string name = _name.toCppString();
740 auto sub_entry = s_activeSubscriptions.find(name);
741 if (sub_entry == s_activeSubscriptions.end()) {
742 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
743 "Unknown subscription '{}'", name));
745 std::chrono::milliseconds timeout(timeout_ms);
746 auto start_time = std::chrono::steady_clock::now();
747 try {
748 auto res_future = sub_entry->second.watchmanFlush(timeout)
749 .thenValue([timeout, start_time, name](folly::Optional<folly::dynamic> flush) {
750 // (ASYNC)
751 if (!flush.has_value()) {
752 // Subscription is broken - no updates to process.
753 return folly::makeFuture(true);
755 if (flush.value().find("error") != flush.value().items().end()) {
756 // Timeout
757 return folly::makeFuture(false);
759 // At this stage a flush may have caused an update which is still
760 // waiting to execute in the executor queue. So explicitly schedule
761 // our sync into the same queue causing it to execute after the updates
762 // have been processed.
763 folly::Promise<bool> sync_promise;
764 auto sync_future = sync_promise.getFuture();
765 WatchmanThreadEventBase::Get()->add(
766 [name, start_time, timeout, sync_promise = std::move(sync_promise)]
767 () mutable { // (ASYNC)
768 auto sub_entry = s_activeSubscriptions.find(name);
769 if (sub_entry == s_activeSubscriptions.end()) {
770 // Subscription went away - no updates to process.
771 sync_promise.setValue(true);
772 return;
774 auto elapsed_time = std::chrono::steady_clock::now() - start_time;
775 auto remaining_timeout = timeout -
776 std::chrono::duration_cast<std::chrono::milliseconds>(
777 elapsed_time);
778 if (timeout == timeout.zero()) {
779 remaining_timeout = timeout.zero();
780 } else {
781 if (remaining_timeout <= remaining_timeout.zero()) {
782 sync_promise.setValue(false);
783 return;
786 sub_entry->second.sync(remaining_timeout)
787 .thenTry(
788 [sync_promise= std::move(sync_promise)]
789 (folly::Try<bool> res) mutable { // (ASYNC)
790 sync_promise.setValue(res);
793 return sync_future;
795 return Object{
796 (new FutureEvent<bool>(std::move(res_future)))->getWaitHandle()
798 } catch(const std::exception& e) {
799 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
800 "Error '{}' on subscription named '{}'", e.what(), name));
801 } catch(...) {
802 SystemLib::throwInvalidOperationExceptionObject(folly::sformat(
803 "Unknown error on subscription named '{}'", name));
807 // (PHP entry-point)
808 Object HHVM_FUNCTION(HH_watchman_unsubscribe, const String& _name) {
809 try {
810 std::lock_guard<std::mutex> g(s_sharedDataMutex);
811 auto res_future = watchman_unsubscribe_impl(_name.toCppString());
812 return Object{
813 (new FutureEvent<std::string>(std::move(res_future)))->getWaitHandle()
815 } catch (const std::exception& e) {
816 // Several exceptions related to unsubscribe may be thrown but these will
817 // not be PHP-safe as they may be generated in non request contexts.
818 SystemLib::throwInvalidOperationExceptionObject(e.what());
822 struct WatchmanExtension final : Extension {
823 // See ext_watchman.php for details of version bumps.
824 // (INIT entry-point) no need for a lock
825 WatchmanExtension() : Extension("watchman", "1") { };
827 // (INIT entry-point) no need for a lock
828 void moduleInit() override {
829 if (m_enabled) {
830 HHVM_FALIAS(HH\\watchman_run, HH_watchman_run);
831 HHVM_FALIAS(HH\\watchman_subscribe, HH_watchman_subscribe);
832 HHVM_FALIAS(HH\\watchman_check_sub, HH_watchman_check_sub);
833 HHVM_FALIAS(HH\\watchman_sync_sub, HH_watchman_sync_sub);
834 HHVM_FALIAS(HH\\watchman_unsubscribe, HH_watchman_unsubscribe);
836 loadSystemlib();
840 // (INIT entry-point) this needs more fine grained control over locking as
841 // described inline.
842 void moduleShutdown() override {
843 std::vector<folly::Future<std::string>> unsub_futures;
845 std::lock_guard<std::mutex> g(s_sharedDataMutex);
846 if (!WatchmanThreadEventBase::Exists()) {
847 return;
849 for (auto& sub_entry : s_activeSubscriptions) {
850 try {
851 auto& sub_name = sub_entry.first;
852 unsub_futures.emplace_back(watchman_unsubscribe_impl(sub_name));
853 // Iteration still safe as we hold the shared data lock
855 // Absorb all exceptions here as there is no catch on shutdown.
856 // There shouldn't be any issues that would cause further problems
857 // with shutting down.
858 } catch(const std::exception& e) {
859 Logger::Error("Error on Watchman client shutdown: %s", e.what());
860 } catch(...) {
861 Logger::Error("Unknown errror on Watchman client shutdown");
865 // Don't hold the lock here or the futures won't be able to complete.
866 // As we're shutting down the module nothing external should be happening.
867 for (auto& unsub_future : unsub_futures) {
868 unsub_future.wait();
870 WatchmanThreadEventBase::Get()->drain();
871 s_activeSubscriptions.clear();
872 WatchmanThreadEventBase::Free();
875 // (INIT entry-point) no need for lock
876 bool moduleEnabled() const override {
877 return m_enabled;
880 // (INIT entry-point) no need for lock
881 void moduleLoad(const IniSetting::Map& ini, Hdf config) override {
882 m_enabled = Config::GetBool(ini, config, "watchman.enable", m_enabled);
885 private:
886 // Disabled by default to ensure we don't pay requestInit/shutdown cost of
887 // getting a lock unless we need to.
888 bool m_enabled{false};
891 WatchmanExtension s_watchman;
893 } // namespace
894 } // namespace HPHP