2016-07-21 1 views
1

Рассмотрят следующий код для прослушивания обновления с длинным опросом:Получить JAX-RS AsyncResponse, но приостановить позже

Map<String, List<AsyncResponse>> tagMap = new ConcurrentGoodStuff(); 

// This endpoint listens for notifications of the tag 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@GET 
@Path("listen/{tag}") 
public void listenForUpdates(
     @PathParam("tag") final String tag, 
     @Suspended final AsyncResponse response) { 
    tagMap.get(tag).add(response); 
} 

// This endpoint is for push-style notifications 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@PUT 
@Path("update/{tag}/{value}") 
public Response updateTag(
     @PathParam("tag") final String tag, 
     @PathParam("value") final String value) { 
    for(AsyncResponse response : tagMap.get(tag)) { 
     // Resumes all previously suspended responses 
     response.resume(value); 
    } 
    return Response.ok("cool whatever").build(); 
} 

Клиент добавляет слушатель нормального клиента-Джерси AsyncInvoker, вызывает асинхронную задачу, а затем другая задача вызывает метод обновления.

Когда я тестирую это, я сталкиваюсь с состоянием гонки. Сразу после добавления асинхронного слушателя на listenForUpdates() я делаю обновление на конечной точке с помощью updateTag() синхронно. Но обновление запускается до добавления слушателя, и асинхронный ответ не возобновляется.

Решение этого метода - вызвать метод suspend() на ответ после, добавив его слушателям. Но неясно, как это сделать, учитывая, что @Suspended предоставляет уже приостановленный объект AsyncResponse. Что мне делать, чтобы асинхронный отклик был приостановлен только после, добавляющего слушателя? Будет ли это называть метод suspend? Как я могу заставить это работать с клиентом асинхронной сети в Джерси, или я должен использовать другой клиент с длительным опросом?

Для решений я открыт для разных библиотек, таких как Атмосфера или Гуава. Я не открыт для добавления Thread.sleep() в свой тест, так как это прерывистый сбой, ожидающий своего появления.

+0

Не уверен, что я полностью понимаю, что вы пытаетесь сделать, но вы должны посмотреть на некоторые из примеров в проекте Джерси. Есть несколько различных асинхронных примеров. Я думаю, что один из примеров чата - это то, что вы пытаетесь сделать. –

+1

Я думаю [это один] (https://github.com/jersey/jersey/tree/master/examples/server-async-managed). Не уверен, что это именно то, что вы пытаетесь сделать, хотя –

+0

Я думаю, что блокирующая очередь определенно подходит, если я хочу, чтобы потребление предшествовало проверке потребления. В настоящее время у меня обновления всегда не блокируются. Если я добавлю флаг, который будет необязательно блокироваться при обновлении слушателя, он определенно поможет в тестировании. Круто, я думаю, я это сделаю. Спасибо. Добавьте это как «реальный ответ», и я передам вам некоторые интернет-точки. –

ответ

0

Я закончил с использованием RxJava, но не до того, как придумал решение «просто-как», используя BlockingQueue вместо List в Map. Это выглядит примерно так:

ConcurrentMap<String, BlockingQueue<AsyncResponse>> tagMap = new ConcurrentGoodStuff(); 

// This endpoint initiates a listener array for the tag. 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@GET 
@Path("initListen/{tag}") 
public void listenForUpdates(
     @PathParam("tag") final String tag) { 
    tagMap.putIfAbsent(tag, new LinkedBlockingQueue<>()); 
} 

// This endpoint listens for notifications of the tag 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@GET 
@Path("listen/{tag}") 
public void listenForUpdates(
     @PathParam("tag") final String tag, 
     @Suspended final AsyncResponse response) { 
    BlockingQueue<AsyncResponse> responses = tagMap.get(tag); 

    if (responses != null) { 
     responses.add(response); 
    } 
} 

// This endpoint is for push-style notifications 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@PUT 
@Path("update/{tag}/{value}") 
public Response updateTag(
     @PathParam("tag") final String tag, 
     @PathParam("value") final String value) { 
    BlockingQueue<AsyncResponse> responses = tagMap.get(tag); 

    if (responses == null) { 
     return Response.noContent().build(); 
    } 
    if (responses.isEmpty()) { 
     // Block-wait for an async listener 
     try { 
      AsyncResponse response = tagMap.poll(15, TimeUnit.SECONDS); 

      if (response == null) { 
       return Response.noContent().build(); 
      } 

      response.resume(value); 
     } catch (InterruptedException e) { 
      return Response.noContent().build(); 
     } 
    } else { 
     for (AsyncResponse response : responses) { 
      // Resumes all previously suspended responses 
      response.resume(value); 
     } 
    } 
    return Response.ok("cool whatever").build(); 
} 

Я не тестировал этот точный код, но использовал его в прошлом. Пока вы сначала вызываете конечную точку initListen, вы можете вызывать асинхронную конечную точку асинхронного listen, а затем синхронную конечную точку update и не будет никаких существенных условий гонки.

В конечной точке update есть незначительный намек на состояние гонки, но он незначительный. Остановка блокировки responses может быть пустой на итерации или может быть обновлена ​​несколькими источниками по-разному. Чтобы облегчить это, я использовал метод drainTo(Collection) для структуры данных, созданной по запросу. Это все еще не решает случай использования, когда несколько клиентов могут попробовать обновить один и тот же тег слушателей, но мне не нужен этот прецедент.