Source code for hookee.tunnel

__copyright__ = "Copyright (c) 2020-2024 Alex Laird"
__license__ = "MIT"

import threading
import time

from pyngrok import conf, ngrok
from pyngrok.conf import PyngrokConfig
from pyngrok.exception import PyngrokError


[docs] class Tunnel: """ An object that manages a non-blocking ``pyngrok`` tunnel and thread. :var hookee_manager: Reference to the ``hookee`` Manager. :vartype hookee_manager: HookeeManager :var plugin_manager: Reference to the Plugin Manager. :vartype plugin_manager: PluginManager :var print_util: Reference to the PrintUtil. :vartype print_util: PrintUtil :var port: The server's port. :vartype port: int :var pyngrok_config: The ``pyngrok`` config. :vartype pyngrok_config: pyngrok.conf.PyngrokConfig :var public_url: The public URL of the tunnel. :vartype public_url: str :var ngrok_process: The ``ngrok`` process. :vartype ngrok_process: pyngrok.process.NgrokProcess """ def __init__(self, hookee_manager): self.hookee_manager = hookee_manager self.config = self.hookee_manager.config self.plugin_manager = self.hookee_manager.plugin_manager self.print_util = self.hookee_manager.print_util self.port = self.config.get("port") self.pyngrok_config = PyngrokConfig(auth_token=self.config.get("auth_token"), region=self.config.get("region")) conf.set_default(self.pyngrok_config) self.public_url = None self.ngrok_process = None self._thread = None def _loop(self): thread = None try: self._start_tunnel() thread = threading.current_thread() thread.alive = True while thread.alive: time.sleep(1) except PyngrokError: # pyngrok already logged this to the console for us pass if thread: thread.alive = False self.stop()
[docs] def start(self): """ If one is not already running, start a tunnel in a new thread. """ if self._thread is None: self.print_util.print_open_header("Opening Tunnel") self._thread = threading.Thread(target=self._loop, daemon=True) self._thread.start() while self.public_url is None: time.sleep(1) self.print_close_header()
def _start_tunnel(self): options = {"schemes": ["https"]} subdomain = self.config.get("subdomain") domain = self.config.get("domain", self.config.get("hostname")) host_header = self.config.get("host_header") basic_auth = self.config.get("basic_auth", self.config.get("auth")) if subdomain: options["subdomain"] = subdomain if domain: options["domain"] = domain if host_header: options["host_header"] = host_header if basic_auth: if isinstance(basic_auth, list): options["basic_auth"] = basic_auth else: options["basic_auth"] = [basic_auth] self.public_url = ngrok.connect(self.port, **options).public_url self.ngrok_process = ngrok.get_ngrok_process()
[docs] def stop(self): """ If running, kill the tunnel and cleanup its thread. """ if self._thread: ngrok.kill() self.public_url = None self.ngrok_process = None self._thread = None
def print_close_header(self): self.print_util.print_basic( f" * Tunnel: {self.public_url} -> http://127.0.0.1:{self.port}", print_when_logging=True) self.print_util.print_close_header()