-
Notifications
You must be signed in to change notification settings - Fork 0
/
Untitled-1
147 lines (146 loc) · 4.15 KB
/
Untitled-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import threading,sys,librtmp,socket,os,time
server=socket.socket
src_buflist=[]
r=0
w=0
class Channel:
Src_rtmp=""
Src_Strm=""
Src_Url=""
num_of_Clnts=0
Clnt_list=[]
class Client:
Cl_rtmp=""
Cl_strm=""
Cl_Url=""
class Adver:
Ad_Url=""
Ad_rtmp=""
Ad_strm=""
Ad_clnt=""
class src_thrd(threading.Thread):
def __init__(self,chn):
threading.Thread.__init__(self)
self.chn=chn
def run(self):
src_buf_func(self.chn)
print "exiting src_thread"
return
class play_ad(threading.Thread):
def __init__(self,ad_url,new_clnt):
threading.Thread.__init__(self)
self.adv=ad_url
self.clnt=new_clnt
def run(self):
adv_buf(self.adv,self.clnt)
print "exiting play_ad thread"
def src_buf_func(chn):
print "in src_buf_func"
global src_buflist,r,w
err_wt=0
cnt=0
clnt_turn=1
while(chn.Src_rtmp.connected):
err_wt=0
while(chn.num_of_Clnts==0):
err_wt+=1
time.sleep(1)
if err_wt==20:
break
if err_wt==20:
break
r+=1
Src_Read=chn.Src_Strm.read(1024*64)
src_buflist.append(Src_Read)
if chn.num_of_Clnts==0:
continue
cnt=w
if chn.Clnt_list is not None:
time.sleep(.01)
for clnt_count in range(len(chn.Clnt_list)):
if not (chn.Clnt_list[clnt_count].Cl_rtmp.connected):
continue
if chn.Clnt_list[clnt_count].Cl_rtmp.connected:
print "live streaming and client %s connected at index %d:"%(chn.Clnt_list[clnt_count].Cl_Url,clnt_count)
chn.Clnt_list[clnt_count].Cl_strm.write(src_buflist[0])
src_buflist.pop(0)
if chn.num_of_Clnts !=0:
for clnts in range(len(chn.Clnt_list)):
chn.Clnt_list[clnts].Cl_strm.close()
del chn.Clnt_list[clnts]
chn.num_of_Clnts-=1
threadlock=threading.Lock()
def src_buf(src_url):
print "in src_buf method"
chn=Channel()
chn.Src_Url=src_url
chn.Src_rtmp=librtmp.RTMP(src_url)
chn.Src_rtmp.connect(None)
chn.Src_Strm=chn.Src_rtmp.create_stream(False)
chn.num_of_Clnts=0
src_thread=src_thrd(chn)
try:
src_thread.start()
except:
print "Enable to start a src_thread"
return chn
def adv_buf(ad_url,new_clnt):
ad_rtmp=librtmp.RTMP(ad_url)
ad_rtmp.connect(None)
ad_strm=ad_rtmp.create_stream(0,False)
if not ad_rtmp.connected:
print "Error in connected ad_rtmp"
return 1
while (ad_strm and new_clnt.Cl_rtmp.connected):
ad_data=ad_rtmp.read_packet()
new_clnt.Cl_rtmp.send_packet(ad_data)
time.sleep(.01)
ad_strm.close()
time.sleep(1)
return 1
def add_client(ad_url,clnt_url,chn):
print "in add_client method"
new_clnt=Client()
new_clnt.Cl_Url=clnt_url
new_clnt.Cl_rtmp=librtmp.RTMP(clnt_url)
new_clnt.Cl_rtmp.connect(None)
new_clnt.Cl_strm=new_clnt.Cl_rtmp.create_stream(0,True)
# adver=Adver()
# adver.Ad_Url=ad_url
# adver.Ad_clnt=new_clnt
adv_thread=play_ad(ad_url,new_clnt)
try:
print "adv_thread started"
adv_thread.start()
except:
print "error in adv_thread"
return 1
def main():
print" in main method"
src_url="rtmp://127.0.0.1:1935/live/src.sdp"
chn=src_buf(src_url)
if chn is None:
sys.exit(0)
time.sleep(.10)
Port=2077
host=""
sock=server(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind((host,Port))
print "Server started at Pid:",os.getpid()
recv_data=""
ad_url=""
clnt_url=""
while True:
sys.stdout.flush()
print "Enter arguments here:"
recv_data,adres=sock.recvfrom(1024)
print "received arguments are:",recv_data
ad_url,clnt_url=recv_data.split("+")
print "add_url is:",ad_url,"clnt_url is:",clnt_url
if chn is not None:
add_client(ad_url,clnt_url, chn)
else:
print"src is None"
sock.sendto("server started",adres)
sock.close()
if __name__=="__main__":main()