Spamworldpro Mini Shell
Spamworldpro


Server : Apache/2.4.52 (Ubuntu)
System : Linux webserver 6.8.0-49-generic #49~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Wed Nov 6 17:42:15 UTC 2 x86_64
User : www-data ( 33)
PHP Version : 8.1.2-1ubuntu2.21
Disable Function : NONE
Directory :  /lib/python3/dist-packages/twisted/conch/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/python3/dist-packages/twisted/conch/stdio.py
# -*- test-case-name: twisted.conch.test.test_manhole -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Asynchronous local terminal input handling

@author: Jp Calderone
"""

import os
import sys
import termios
import tty

from twisted.conch.insults.insults import ServerProtocol
from twisted.conch.manhole import ColoredManhole
from twisted.internet import defer, protocol, reactor, stdio
from twisted.python import failure, log, reflect


class UnexpectedOutputError(Exception):
    pass


class TerminalProcessProtocol(protocol.ProcessProtocol):
    def __init__(self, proto):
        self.proto = proto
        self.onConnection = defer.Deferred()

    def connectionMade(self):
        self.proto.makeConnection(self)
        self.onConnection.callback(None)
        self.onConnection = None

    def write(self, data):
        """
        Write to the terminal.

        @param data: Data to write.
        @type data: L{bytes}
        """
        self.transport.write(data)

    def outReceived(self, data):
        """
        Receive data from the terminal.

        @param data: Data received.
        @type data: L{bytes}
        """
        self.proto.dataReceived(data)

    def errReceived(self, data):
        """
        Report an error.

        @param data: Data to include in L{Failure}.
        @type data: L{bytes}
        """
        self.transport.loseConnection()
        if self.proto is not None:
            self.proto.connectionLost(failure.Failure(UnexpectedOutputError(data)))
            self.proto = None

    def childConnectionLost(self, childFD):
        if self.proto is not None:
            self.proto.childConnectionLost(childFD)

    def processEnded(self, reason):
        if self.proto is not None:
            self.proto.connectionLost(reason)
            self.proto = None


class ConsoleManhole(ColoredManhole):
    """
    A manhole protocol specifically for use with L{stdio.StandardIO}.
    """

    def connectionLost(self, reason):
        """
        When the connection is lost, there is nothing more to do.  Stop the
        reactor so that the process can exit.
        """
        reactor.stop()


def runWithProtocol(klass):
    fd = sys.__stdin__.fileno()
    oldSettings = termios.tcgetattr(fd)
    tty.setraw(fd)
    try:
        stdio.StandardIO(ServerProtocol(klass))
        reactor.run()
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, oldSettings)
        os.write(fd, b"\r\x1bc\r")


def main(argv=None):
    log.startLogging(open("child.log", "w"))

    if argv is None:
        argv = sys.argv[1:]
    if argv:
        klass = reflect.namedClass(argv[0])
    else:
        klass = ConsoleManhole
    runWithProtocol(klass)


if __name__ == "__main__":
    main()

Spamworldpro Mini