2016-04-14 6 views
0

Я пытаюсь реализовать обратный HTTP-прокси с помощью Spray/Akka, но возникает проблема. Я обнаружил, что в некоторых случаях мой прокси-сервер будет продолжать получать данные с восходящего сервера даже после того, как клиент отключится.Сброс обратного прокси: сохранить передачу данных после того, как клиент отключился

Вот как я реализую мой Spray прокси директивы (только немного модификации для bthuillier's implementation):

trait ProxyDirectives { 

    private def sending(f: RequestContext ⇒ HttpRequest)(implicit system: ActorSystem): Route = { 
    val transport = IO(Http)(system) 
    ctx ⇒ transport.tell(f(ctx), ctx.responder) 
    } 

    /** 
    * Re-shape the original request, to match the destination server. 
    */ 
    private def reShapeRequest(req: HttpRequest, uri: Uri): HttpRequest = { 
    req.copy(
     uri = uri, 
     headers = req.headers.map { 
     case x: HttpHeaders.Host => HttpHeaders.Host(uri.authority.host.address, uri.authority.port) 
     case x => x 
     } 
    ) 
    } 

    /** 
    * proxy the request to the specified uri 
    * 
    */ 
    def proxyTo(uri: Uri)(implicit system: ActorSystem): Route = { 
    sending(ctx => reShapeRequest(ctx.request, uri)) 
    } 
} 

Этот обратный прокси-сервер будет работать хорошо, если я ставлю один прокси слой между клиентом и сервером (то есть, клиент < -> proxyTo < -> сервер), но у него возникнут проблемы, если я поставлю два слоя между клиентом и сервером. Например, если у меня есть следующий простой Python HTTP-сервер:

import socket 
from threading import Thread, Semaphore 
import time 

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer 
from SocketServer import ThreadingMixIn 


class MyHTTPHandler(BaseHTTPRequestHandler): 
    protocol_version = 'HTTP/1.1' 

    def do_GET(self): 
     self.send_response(200) 
     self.send_header('Transfer-Encoding', 'chunked') 
     self.end_headers() 

     for i in range(100): 
      data = ('%s\n' % i).encode('utf-8') 
      self.wfile.write(hex(len(data))[2:].encode('utf-8')) 
      self.wfile.write(b'\r\n') 
      self.wfile.write(data) 
      self.wfile.write(b'\r\n') 
      time.sleep(1) 
     self.wfile.write(b'0\r\n\r\n') 


class MyServer(ThreadingMixIn, HTTPServer): 
    def server_bind(self): 
     HTTPServer.server_bind(self) 
     self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

    def server_close(self): 
     HTTPServer.server_close(self) 


if __name__ == '__main__': 
    server = MyServer(('127.0.0.1', 8080), MyHTTPHandler) 
    server.serve_forever() 

Что в принципе ничего не делает, но открыть Chunked ответ (для долгосрочного бега, так что мы можем экзамен вопросы). И если я приковать два слоя прокси следующим образом:

class TestActor(val target: String)(implicit val system: ActorSystem) extends Actor 
    with HttpService 
    with ProxyDirectives 
{ 
    // we use the enclosing ActorContext's or ActorSystem's dispatcher for our Futures and Scheduler 
    implicit private def executionContext = actorRefFactory.dispatcher 

    // the HttpService trait defines only one abstract member, which 
    // connects the services environment to the enclosing actor or test 
    def actorRefFactory = context 

    val serviceRoute: Route = { 
    get { 
     proxyTo(target) 
    } 
    } 

    // runs the service routes. 
    def receive = runRoute(serviceRoute) orElse handleTimeouts 

    private def handleTimeouts: Receive = { 
    case Timedout(x: HttpRequest) => 
     sender ! HttpResponse(StatusCodes.InternalServerError, "Request timed out.") 
    } 
} 

object DebugMain extends App { 
    val actorName = "TestActor" 
    implicit val system = ActorSystem(actorName) 

    // create and start our service actor 
    val service = system.actorOf(
    Props { new TestActor("http://127.0.0.1:8080") }, 
    s"${actorName}Service" 
) 
    val service2 = system.actorOf(
    Props { new TestActor("http://127.0.0.1:8081") }, 
    s"${actorName}2Service" 
) 

    IO(Http) ! Http.Bind(service, "::0", port = 8081) 
    IO(Http) ! Http.Bind(service2, "::0", port = 8082) 
} 

Использование curl http://localhost:8082 для подключения к прокси-серверу, и вы увидите, что система Akka продолжает передавать данные даже после того, как ротор был убит (вы можете включить журналы уровня DEBUG, чтобы увидеть подробности).

Как я могу справиться с этой проблемой? Благодарю.

ответ

0

Ну, это, оказывается, очень сложная проблема, в то время как мое решение принимает почти 100 строк кодов.

На самом деле проблема возникает не только тогда, когда я складываю два слоя прокси. Когда я использую прокси-сервер одного уровня, проблема существует, но журнал не печатается, поэтому я не знал об этой проблеме раньше.

Основная проблема заключается в том, что, хотя мы используем IO(Http) ! HttpRequest, на самом деле это API-интерфейс хост-уровня от распылителя. Соединения API-интерфейсов уровня хоста управляются с помощью Spray HttpManager, который недоступен нашему коду. Таким образом, мы ничего не можем с этим соединением, если мы не отправим Http.CloseAll в IO(Http), что приведет к закрытию всех восходящих соединений.

(Если кто знает, как получить соединение от HttpManager, пожалуйста, сообщите мне).

Мы должны использовать API-интерфейсы уровня соединения от распылительной банки, чтобы служить для этой ситуации. Так что я придумал что-то вроде этого:

/** 
    * Proxy to upstream server, where the server response may be a long connection. 
    * 
    * @param uri Target URI, where to proxy to. 
    * @param system Akka actor system. 
    */ 
def proxyToLongConnection(uri: Uri)(implicit system: ActorSystem): Route = { 
    val io = IO(Http)(system) 

    ctx => { 
    val request = reShapeRequest(ctx.request, uri) 

    // We've successfully opened a connection to upstream server, now start proxying data. 
    actorRefFactory.actorOf { 
     Props { 
     new Actor with ActorLogging { 
      private var upstream: ActorRef = null 
      private val upstreamClosed = new AtomicBoolean(false) 
      private val clientClosed = new AtomicBoolean(false) 
      private val contextStopped = new AtomicBoolean(false) 

      // Connect to the upstream server. 
      { 
      implicit val timeout = Timeout(FiniteDuration(10, TimeUnit.SECONDS)) 
      io ! Http.Connect(
       request.uri.authority.host.toString, 
       request.uri.effectivePort, 
       sslEncryption = request.uri.scheme == "https" 
      ) 
      context.become(connecting) 
      } 

      def connecting: Receive = { 
      case _: Http.Connected => 
       upstream = sender() 
       upstream ! request 
       context.unbecome() // Restore the context to [[receive]] 

      case Http.CommandFailed(Http.Connect(address, _, _, _, _)) => 
       log.warning("Could not connect to {}", address) 
       complete(StatusCodes.GatewayTimeout)(ctx) 
       closeBothSide() 

      case x: Http.ConnectionClosed => 
       closeBothSide() 
      } 

      override def receive: Receive = { 
      case x: HttpResponse => 
       ctx.responder ! x.withAck(ContinueSend(0)) 

      case x: ChunkedMessageEnd => 
       ctx.responder ! x.withAck(ContinueSend(0)) 

      case x: ContinueSend => 
       closeBothSide() 

      case x: Failure => 
       closeBothSide() 

      case x: Http.ConnectionClosed => 
       closeBothSide() 

      case x => 
       // Proxy everything else from server to the client. 
       ctx.responder ! x 
      } 

      private def closeBothSide(): Unit = { 
      if (upstream != null) { 
       if (!upstreamClosed.getAndSet(true)) { 
       upstream ! Http.Close 
       } 
      } 
      if (!clientClosed.getAndSet(true)) { 
       ctx.responder ! Http.Close 
      } 
      if (!contextStopped.getAndSet(true)) { 
       context.stop(self) 
      } 
      } 
     } // new Actor 
     } // Props 
    } // actorOf 
    } // (ctx: RequestContext) => Unit 
} 

кода является немного долго, и я сомневаюсь, что там должно быть немного более чистой и простой реализации (на самом деле я не знаком с Акки). Тем не менее, этот код работает, поэтому я поставил это решение здесь. Вы можете опубликовать свое решение по этой проблеме свободно, если вы нашли несколько лучших.

 Смежные вопросы

  • Нет связанных вопросов^_^