1

Я застрял с изменением проблемы раздвижного окна!python раз скользящее окно изменение

Обычно мы устанавливаем число элементов для слайдов, но в моем случае я хочу сдвинуть время!

Целью, которую я хотел бы достичь, является функция (поток в этом случае) , который способен создавать «временные» окна за считанные секунды (заданные пользователем).

Начиная с первого элемента очереди в этом случае:

[datetime.time (7, 6, 14, 537370), 584 добавить 5 секунд -> 7: 6: 19,537370 (конечная точка) и просуммировать все элементы в этом интервале:

[datetime.time(7, 6, 14, 537370), 584] 
[datetime.time(7, 6, 18, 542798), 761] 

Итого: 584 + 761 = 1345

Затем создайте еще "окно" со вторыми элементами и идет дальше. ВАЖНО: Один элемент может быть частью нескольких окон. элемент создается тем временем, решение naif с функцией, спящей за секунду, а затем сбрасывает очередь, не подходит для моей проблемы.

Я думаю, что его изменение этого поста: Flexible sliding window (in Python)

Но до сих пор не может решить эту проблему! Любая помощь или предложения будут оценены. Спасибо!

Пример списка элементов:

[datetime.time(7, 6, 14, 537370), 584] 
[datetime.time(7, 6, 18, 542798), 761] 
[datetime.time(7, 6, 20, 546007), 848] 
[datetime.time(7, 6, 24, 550969), 20] 
[datetime.time(7, 6, 27, 554370), 478] 
[datetime.time(7, 6, 27, 554628), 12] 
[datetime.time(7, 6, 31, 558919), 29] 
[datetime.time(7, 6, 31, 559562), 227] 
[datetime.time(7, 6, 32, 560863), 379] 
[datetime.time(7, 6, 35, 564863), 132] 
[datetime.time(7, 6, 37, 567276), 651] 
[datetime.time(7, 6, 38, 568652), 68] 
[datetime.time(7, 6, 40, 569861), 100] 
[datetime.time(7, 6, 41, 571459), 722] 
[datetime.time(7, 6, 44, 574802), 560] 

...

Код:

import random 
import time 
import threading 
import datetime 
from multiprocessing import Queue 

q = Queue() 

#this is a producer that put elements in queue 

def t1(): 
    element = [0,0] 
    while True: 
     time.sleep(random.randint(0, 5)) 
     element[0] = datetime.datetime.now().time() 
     element[1] = random.randint(0, 1000) 
     q.put(element) 


#this is a consumer that sum elements inside a window of n seconds 
#Ineed something a sliding window time of ten seconds that sum all elements for n seconds 

def t2(): 
    windowsize = 5 #size of the window 5 seconds 
    while not queue.empty(): 
     e = q.get() 
     start = e[0] #the first element is the beginning point 
     end = start + datetime.timedelta(seconds=windowsize) #ending point 
     sum += e[1] 
     #some code that solve the problem :) 



a = threading.Thread(target=t1) 
a.start() 

b = threading.Thread(target=t2) 
b.start() 

while True: 
    time.sleep(1) 
+0

Может ли элемент быть частью нескольких ваших «окон» или вы хотите использовать элемент из очереди, как только он суммируется с окном? Если элемент может быть частью нескольких окон, каков механизм, который в конечном итоге удаляет элементы из очереди («этот элемент определенно больше не нужен, давайте удалим его, чтобы избежать заполнения памяти»)? Это определенно разрешимо в любом случае, но просто нужно знать, что именно вы хотите достичь. – Hannu

+0

Один элемент может быть частью нескольких окон, добавьте оригинальное сообщение, спасибо. – egariM

+0

Итак, правильно ли я понимаю, что, как только ваша «очередь» имеет разницу во времени первого и последнего элементов более 10 секунд, тогда будет нормально истекать самые старые записи, пока все не вступит снова в 10-секундный кадр (или что бы мы ни решили использовать как максимум?), а затем ваш «t2» можно использовать для запроса окна размером 0-10 секунд, начиная с первого элемента? – Hannu

ответ

0

ли это делать? Вот как я понял вашу проблему. Что это делает, он создает класс, который отслеживает все. Вы либо добавляете к этому значение tw.insert(), либо суммируете с помощью tw.sum_window (в секундах).

Когда вы инициализируете TimeWindow, вы можете дать ему параметр максимального размера, по умолчанию - 10 секунд. Когда вы добавляете элементы или вычисляете суммы, он выполняет очистку, так что перед каждой операцией вставки или суммирования время первого элемента e [0] [0] и время последнего элемента e [n] [0] находятся в пределах 10 секунд друг от друга , Предыдущие записи исключаются. Поток «poller» предназначен для отслеживания ваших запросов.

Я добавил две очереди, так как не знаю, что вы намерены делать с результатами. Теперь, если вы хотите запросить данные, начиная с этого момента, до 5 секунд в будущем, вы создаете запрос и помещаете его в очередь. Запрос имеет случайный идентификатор, чтобы вы могли сопоставить его с результатами. Ваш основной поток должен контролировать очередь результатов, и через пять секунд каждый запрос, отправленный в очередь, возвращается с тем же идентификатором и суммой.

Если это не то, что вы хотите сделать, то я просто не понимаю, чего вы пытаетесь достичь здесь. Даже это уже довольно сложно, и может быть гораздо более простой способ добиться того, что вы намереваетесь сделать.

import random 
import time 
import threading 
import datetime 
import Queue 
import uuid 

from collections import deque 

q_lock = threading.RLock() 


class TimeWindow(object): 
    def __init__(self, max_size=10): 
     self.max_size = max_size 
     self.q = deque() 

    def expire(self): 
     time_now = datetime.datetime.now() 
     while True: 
      try: 
       oldest_element = self.q.popleft() 
       oe_time = oldest_element[0] 
       if oe_time + datetime.timedelta(seconds=self.max_size) > time_now: 
        self.q.appendleft(oldest_element) 
        break 

      except IndexError: 
       break 

    def insert(self,elm): 
     self.expire() 
     self.q.append(elm) 

    def sum_window(self, start, end): 
     self.expire() 
     try: 
      _ = self.q[0] 
     except IndexError: 
      return 0 
     result=0 
     for f in self.q: 
      if start < f[0] < end: 
       result += f[1] 
      else: 
       pass 
     return result 


tw = TimeWindow() 


def t1(): 
    while True: 
     time.sleep(random.randint(0, 3)) 
     element = [datetime.datetime.now(), random.randint(0,1000)] 
     with q_lock: 
      tw.insert(element) 


def poller(in_q, out_q): 
    pending = [] 
    while True: 
     try: 
      new_request = in_q.get(0.1) 
      new_request["end"] = new_request["start"] + datetime.timedelta(seconds=new_request["frame"]) 
      pending.append(new_request) 
     except Queue.Empty: 
      pass 

     new_pending = [] 
     for a in pending: 
      if a["end"] < datetime.datetime.now(): 
       with q_lock: 
        r_sum = tw.sum_window(a["start"], a["end"]) 
       r_structure = {"id": a["id"], "result": r_sum} 
       out_q.put(r_structure) 
      else: 
       new_pending.append(a) 
     pending = new_pending 


a = threading.Thread(target=t1) 
a.daemon = True 
a.start() 
in_queue = Queue.Queue() 
result_queue = Queue.Queue() 

po = threading.Thread(target=poller, args=(in_queue, result_queue,)) 
po.daemon = True 
po.start() 

while True: 
    time.sleep(1) 
    newr = {"id": uuid.uuid4(), "frame": 5, "start": datetime.datetime.now()} 
    in_queue.put(newr) 
    try: 
     ready = result_queue.get(0) 
     print ready 
    except Queue.Empty: 
     pass 
+0

Прежде всего большое спасибо за помощь! Mmm проблема в том, что tw.sum_window (5) должен быть выполнен в конце 5 секунд, когда закончились оконные окна! – egariM

+0

Я немного озадачен тем, как вы хотите, чтобы это было рассчитано. Вы имеете в виду, что вам просто хотелось бы «заглянуть в историю», чтобы, если вы сейчас выполнили sum_window, выберете время, которое было последним вставлено, и пересчитайте его на пять секунд и суммируйте их? Так что в основном то же самое, что я сделал, но наоборот? – Hannu

+0

Примером может быть более самоисследовательным), первый элемент достигает значения 12: 02: 50.164829 8, чем другие ... ... функция «сумма» должна выполняться в 12: 02: 55.164829 (t0 + 5 секунд) и то iter продолжает базироваться на datetime элемента-преемника. – egariM

0
[email protected]:~$ python solution.py 
1 t1 produce element: 16:09:30.472497 1 
2 t1 produce element: 16:09:33.475714 9 
3 t1 produce element: 16:09:34.476922 10 
4 t1 produce element: 16:09:37.480100 7 
solution: 16:09:37.481171 {'id': UUID('adff334f-a97a-459d-8dcc-f28309e25574'), 'result': 19} 
5 t1 produce element: 16:09:38.481352 10 
solution: 16:09:38.482687 {'id': UUID('0a7481e5-e993-439a-9f7e-2c5aeef86155'), 'result': 19} 

Это еще doent работает :(Я добавить счетчик для каждого элемента вставляет с функцией t1. Цель сделать сумму (result_queue.получить) в это время:

16: 09: 35,472497 ---> 16: 09: 30.472497 + 5 секунд

не раньше. Только тогда элемент гаснет. В следующий раз, когда сумма будет сделана в:

16: 09: 35,475714 ---> 16: 09: 33.475714 + 5 секунд

Я понимаю, что это трудно объяснить .. С обоего вашего раствором так что я могу рассмотреть проблему решена :) Я постараюсь улучшить, когда будет выполняться сумма функции, этот триггер времени важен. Я получаю много полезных знаний. Спасибо за помощь.

 Смежные вопросы

  • Нет связанных вопросов^_^