mirror of
				https://gitea.invidious.io/iv-org/invidious.git
				synced 2024-08-15 00:53:41 +00:00 
			
		
		
		
	Add POST /api/v1/auth/subscriptions
This commit is contained in:
		
							parent
							
								
									a675c64c2d
								
							
						
					
					
						commit
						b3e083d866
					
				
					 2 changed files with 121 additions and 110 deletions
				
			
		
							
								
								
									
										116
									
								
								src/invidious.cr
									
										
									
									
									
								
							
							
						
						
									
										116
									
								
								src/invidious.cr
									
										
									
									
									
								
							|  | @ -4414,121 +4414,17 @@ get "/api/v1/mixes/:rdid" do |env| | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| get "/api/v1/auth/notifications" do |env| | get "/api/v1/auth/notifications" do |env| | ||||||
|   locale = LOCALES[env.get("preferences").as(Preferences).locale]? |  | ||||||
| 
 |  | ||||||
|   env.response.content_type = "text/event-stream" |  | ||||||
| 
 |  | ||||||
|   topics = env.params.query["topics"]?.try &.split(",").uniq.first(1000) |   topics = env.params.query["topics"]?.try &.split(",").uniq.first(1000) | ||||||
|   topics ||= [] of String |   topics ||= [] of String | ||||||
| 
 | 
 | ||||||
|   since = env.params.query["since"]?.try &.to_i? |   create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics) | ||||||
|  | end | ||||||
| 
 | 
 | ||||||
|   begin | post "/api/v1/auth/notifications" do |env| | ||||||
|     id = 0 |   topics = env.params.body["topics"]?.try &.split(",").uniq.first(1000) | ||||||
|  |   topics ||= [] of String | ||||||
| 
 | 
 | ||||||
|     if topics.includes? "debug" |   create_notification_stream(env, proxies, config, Kemal.config, decrypt_function, topics) | ||||||
|       spawn do |  | ||||||
|         loop do |  | ||||||
|           time_span = [0, 0, 0, 0] |  | ||||||
|           time_span[rand(4)] = rand(30) + 5 |  | ||||||
|           published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3]) |  | ||||||
|           video_id = TEST_IDS[rand(TEST_IDS.size)] |  | ||||||
| 
 |  | ||||||
|           video = get_video(video_id, PG_DB, proxies) |  | ||||||
|           video.published = published |  | ||||||
|           response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function)) |  | ||||||
| 
 |  | ||||||
|           if fields_text = env.params.query["fields"]? |  | ||||||
|             begin |  | ||||||
|               JSONFilter.filter(response, fields_text) |  | ||||||
|             rescue ex |  | ||||||
|               env.response.status_code = 400 |  | ||||||
|               response = {"error" => ex.message} |  | ||||||
|             end |  | ||||||
|           end |  | ||||||
| 
 |  | ||||||
|           env.response.puts "id: #{id}" |  | ||||||
|           env.response.puts "data: #{response.to_json}" |  | ||||||
|           env.response.puts |  | ||||||
|           env.response.flush |  | ||||||
| 
 |  | ||||||
|           id += 1 |  | ||||||
| 
 |  | ||||||
|           sleep 1.minute |  | ||||||
|         end |  | ||||||
|       end |  | ||||||
|     end |  | ||||||
| 
 |  | ||||||
|     spawn do |  | ||||||
|       if since |  | ||||||
|         topics.try &.each do |topic| |  | ||||||
|           case topic |  | ||||||
|           when .match(/UC[A-Za-z0-9_-]{22}/) |  | ||||||
|             PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15", |  | ||||||
|               topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video| |  | ||||||
|               response = JSON.parse(video.to_json(locale, config, Kemal.config)) |  | ||||||
| 
 |  | ||||||
|               if fields_text = env.params.query["fields"]? |  | ||||||
|                 begin |  | ||||||
|                   JSONFilter.filter(response, fields_text) |  | ||||||
|                 rescue ex |  | ||||||
|                   env.response.status_code = 400 |  | ||||||
|                   response = {"error" => ex.message} |  | ||||||
|                 end |  | ||||||
|               end |  | ||||||
| 
 |  | ||||||
|               env.response.puts "id: #{id}" |  | ||||||
|               env.response.puts "data: #{response.to_json}" |  | ||||||
|               env.response.puts |  | ||||||
|               env.response.flush |  | ||||||
| 
 |  | ||||||
|               id += 1 |  | ||||||
|             end |  | ||||||
|           else |  | ||||||
|             # TODO |  | ||||||
|           end |  | ||||||
|         end |  | ||||||
|       end |  | ||||||
| 
 |  | ||||||
|       PG.connect_listen(PG_URL, "notifications") do |event| |  | ||||||
|         notification = JSON.parse(event.payload) |  | ||||||
|         topic = notification["topic"].as_s |  | ||||||
|         video_id = notification["videoId"].as_s |  | ||||||
|         published = notification["published"].as_i64 |  | ||||||
| 
 |  | ||||||
|         video = get_video(video_id, PG_DB, proxies) |  | ||||||
|         video.published = Time.unix(published) |  | ||||||
|         response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function)) |  | ||||||
| 
 |  | ||||||
|         if fields_text = env.params.query["fields"]? |  | ||||||
|           begin |  | ||||||
|             JSONFilter.filter(response, fields_text) |  | ||||||
|           rescue ex |  | ||||||
|             env.response.status_code = 400 |  | ||||||
|             response = {"error" => ex.message} |  | ||||||
|           end |  | ||||||
|         end |  | ||||||
| 
 |  | ||||||
|         if topics.try &.includes? topic |  | ||||||
|           env.response.puts "id: #{id}" |  | ||||||
|           env.response.puts "data: #{response.to_json}" |  | ||||||
|           env.response.puts |  | ||||||
|           env.response.flush |  | ||||||
| 
 |  | ||||||
|           id += 1 |  | ||||||
|         end |  | ||||||
|       end |  | ||||||
|     end |  | ||||||
| 
 |  | ||||||
|     # Send heartbeat |  | ||||||
|     loop do |  | ||||||
|       env.response.puts ":keepalive #{Time.now.to_unix}" |  | ||||||
|       env.response.puts |  | ||||||
|       env.response.flush |  | ||||||
|       sleep (20 + rand(11)).seconds |  | ||||||
|     end |  | ||||||
|   rescue |  | ||||||
|   end |  | ||||||
| end | end | ||||||
| 
 | 
 | ||||||
| get "/api/v1/auth/preferences" do |env| | get "/api/v1/auth/preferences" do |env| | ||||||
|  |  | ||||||
|  | @ -663,3 +663,118 @@ def copy_in_chunks(input, output, chunk_size = 4096) | ||||||
|     Fiber.yield |     Fiber.yield | ||||||
|   end |   end | ||||||
| end | end | ||||||
|  | 
 | ||||||
|  | def create_notification_stream(env, proxies, config, kemal_config, decrypt_function, topics) | ||||||
|  |   locale = LOCALES[env.get("preferences").as(Preferences).locale]? | ||||||
|  | 
 | ||||||
|  |   env.response.content_type = "text/event-stream" | ||||||
|  | 
 | ||||||
|  |   since = env.params.query["since"]?.try &.to_i? | ||||||
|  | 
 | ||||||
|  |   begin | ||||||
|  |     id = 0 | ||||||
|  | 
 | ||||||
|  |     if topics.includes? "debug" | ||||||
|  |       spawn do | ||||||
|  |         loop do | ||||||
|  |           time_span = [0, 0, 0, 0] | ||||||
|  |           time_span[rand(4)] = rand(30) + 5 | ||||||
|  |           published = Time.now - Time::Span.new(time_span[0], time_span[1], time_span[2], time_span[3]) | ||||||
|  |           video_id = TEST_IDS[rand(TEST_IDS.size)] | ||||||
|  | 
 | ||||||
|  |           video = get_video(video_id, PG_DB, proxies) | ||||||
|  |           video.published = published | ||||||
|  |           response = JSON.parse(video.to_json(locale, config, kemal_config, decrypt_function)) | ||||||
|  | 
 | ||||||
|  |           if fields_text = env.params.query["fields"]? | ||||||
|  |             begin | ||||||
|  |               JSONFilter.filter(response, fields_text) | ||||||
|  |             rescue ex | ||||||
|  |               env.response.status_code = 400 | ||||||
|  |               response = {"error" => ex.message} | ||||||
|  |             end | ||||||
|  |           end | ||||||
|  | 
 | ||||||
|  |           env.response.puts "id: #{id}" | ||||||
|  |           env.response.puts "data: #{response.to_json}" | ||||||
|  |           env.response.puts | ||||||
|  |           env.response.flush | ||||||
|  | 
 | ||||||
|  |           id += 1 | ||||||
|  | 
 | ||||||
|  |           sleep 1.minute | ||||||
|  |         end | ||||||
|  |       end | ||||||
|  |     end | ||||||
|  | 
 | ||||||
|  |     spawn do | ||||||
|  |       if since | ||||||
|  |         topics.try &.each do |topic| | ||||||
|  |           case topic | ||||||
|  |           when .match(/UC[A-Za-z0-9_-]{22}/) | ||||||
|  |             PG_DB.query_all("SELECT * FROM channel_videos WHERE ucid = $1 AND published > $2 ORDER BY published DESC LIMIT 15", | ||||||
|  |               topic, Time.unix(since.not_nil!), as: ChannelVideo).each do |video| | ||||||
|  |               response = JSON.parse(video.to_json(locale, config, Kemal.config)) | ||||||
|  | 
 | ||||||
|  |               if fields_text = env.params.query["fields"]? | ||||||
|  |                 begin | ||||||
|  |                   JSONFilter.filter(response, fields_text) | ||||||
|  |                 rescue ex | ||||||
|  |                   env.response.status_code = 400 | ||||||
|  |                   response = {"error" => ex.message} | ||||||
|  |                 end | ||||||
|  |               end | ||||||
|  | 
 | ||||||
|  |               env.response.puts "id: #{id}" | ||||||
|  |               env.response.puts "data: #{response.to_json}" | ||||||
|  |               env.response.puts | ||||||
|  |               env.response.flush | ||||||
|  | 
 | ||||||
|  |               id += 1 | ||||||
|  |             end | ||||||
|  |           else | ||||||
|  |             # TODO | ||||||
|  |           end | ||||||
|  |         end | ||||||
|  |       end | ||||||
|  | 
 | ||||||
|  |       PG.connect_listen(PG_URL, "notifications") do |event| | ||||||
|  |         notification = JSON.parse(event.payload) | ||||||
|  |         topic = notification["topic"].as_s | ||||||
|  |         video_id = notification["videoId"].as_s | ||||||
|  |         published = notification["published"].as_i64 | ||||||
|  | 
 | ||||||
|  |         video = get_video(video_id, PG_DB, proxies) | ||||||
|  |         video.published = Time.unix(published) | ||||||
|  |         response = JSON.parse(video.to_json(locale, config, Kemal.config, decrypt_function)) | ||||||
|  | 
 | ||||||
|  |         if fields_text = env.params.query["fields"]? | ||||||
|  |           begin | ||||||
|  |             JSONFilter.filter(response, fields_text) | ||||||
|  |           rescue ex | ||||||
|  |             env.response.status_code = 400 | ||||||
|  |             response = {"error" => ex.message} | ||||||
|  |           end | ||||||
|  |         end | ||||||
|  | 
 | ||||||
|  |         if topics.try &.includes? topic | ||||||
|  |           env.response.puts "id: #{id}" | ||||||
|  |           env.response.puts "data: #{response.to_json}" | ||||||
|  |           env.response.puts | ||||||
|  |           env.response.flush | ||||||
|  | 
 | ||||||
|  |           id += 1 | ||||||
|  |         end | ||||||
|  |       end | ||||||
|  |     end | ||||||
|  | 
 | ||||||
|  |     # Send heartbeat | ||||||
|  |     loop do | ||||||
|  |       env.response.puts ":keepalive #{Time.now.to_unix}" | ||||||
|  |       env.response.puts | ||||||
|  |       env.response.flush | ||||||
|  |       sleep (20 + rand(11)).seconds | ||||||
|  |     end | ||||||
|  |   rescue | ||||||
|  |   end | ||||||
|  | end | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue