Skip to content

Commit

Permalink
add stream_id
Browse files Browse the repository at this point in the history
  • Loading branch information
thisbejim committed Nov 10, 2016
1 parent ec78787 commit b719994
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions pyrebase/pyrebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ def remove(self, token=None):
raise_detailed_error(request_object)
return request_object.json()

def stream(self, stream_handler, token=None):
def stream(self, stream_handler, token=None, stream_id=None):
request_ref = self.build_request_url(token)
return Stream(request_ref, stream_handler, self.build_headers)
return Stream(request_ref, stream_handler, self.build_headers, stream_id)

def check_token(self, database_url, path, token):
if token:
Expand Down Expand Up @@ -529,10 +529,11 @@ def close(self):


class Stream:
def __init__(self, url, stream_handler, build_headers):
def __init__(self, url, stream_handler, build_headers, stream_id):
self.build_headers = build_headers
self.url = url
self.stream_handler = stream_handler
self.stream_id = stream_id
self.sse = None
self.thread = None
self.start()
Expand All @@ -545,19 +546,20 @@ def make_session(self):
return session

def start(self):
self.thread = threading.Thread(target=self.start_stream,
args=(self.url, self.stream_handler))
self.thread = threading.Thread(target=self.start_stream)
self.thread.start()
return self

def start_stream(self, url, stream_handler):
self.sse = ClosableSSEClient(url, session=self.make_session(), build_headers=self.build_headers)
def start_stream(self):
self.sse = ClosableSSEClient(self.url, session=self.make_session(), build_headers=self.build_headers)
for msg in self.sse:
msg_data = json.loads(msg.data)
# don't return initial data
if msg_data:
msg_data["event"] = msg.event
stream_handler(msg_data)
if self.stream_id:
msg_data["stream_id"] = self.stream_id
self.stream_handler(msg_data)

def close(self):
while not self.sse and not hasattr(self.sse, 'resp'):
Expand Down

0 comments on commit b719994

Please sign in to comment.