ipfs stream

ipfs_stream at tip
Login

ipfs_stream at tip

File ipfs_stream from the latest check-in


#!/bin/env python3
#stream data via ipfs

#to stream: ffmpeg | ipfs_stream 
#to watch: ipfs_stream stream_id | ffplay

#copyright 2021 Russell Stickney
#
#modification and distribution of this program is allowed.

import base64
import json
import nacl.signing
import os
import re
import selectors
import subprocess
import sys
import time
import urllib.parse
import urllib.request

debug = 2

sel = selectors.DefaultSelector()

def default_stream():
	config = setup()
	stream(config['id'], config['sign_key'], sys.stdin.buffer, config['read_method'])

def decode_sign_message(verify_key, message):

	signature_bytes = base64.b64decode(message['sig'])
	data_bytes = message['data'].encode()
	data_verified = verify_key.verify(data_bytes, signature_bytes)
	debug and dprint('message ok', data_verified, message['data'])
	return data_verified and message['data']
	
def dprint(*plist, **kw):
	print(*plist, file=sys.stderr)

def get_linked_data(hash, output_flo):
	link_sequencer = sequence_event(sel, output_flo)
	def write_linked_data(segment):
		debug and dprint('fetching linked data', segment.data)
		message = json.loads(segment.data.decode())
		for index, link in enumerate(message['links']):
			segment_hash = link['Cid']['/']
			debug and dprint('getting segment:', index, segment_hash)
			link_sequencer.add_event('segment', str(index), segment_hash )
	debug and dprint('geting link node {}'.format(hash) )			
	link_segment = segment(sel, hash, write_linked_data)
	link_segment.start_segment()
	while link_segment.finished == False or link_sequencer.seq:
		events = sel.select(timeout=10)
		for key, mask in events:
			callback = key.data
			callback()
		debug and dprint('waiting on: link finished:', link_segment.finished , 'link sequence', len(link_sequencer.seq) )

		
	
def format_multipart_formdata(data):
	#unable to find multipartencoding in the stdlib
	#this is rough and ready but should do the trick
	#a working boundary is
	#--------------------------67f6eece09014f5e
	boundary = 'segment'
	name = 'segment'
	data_header = {
		'Content-Disposition': 'form-data; name="{}"; filename="{}"'.format(name, name),
		'Content-Type': 'application/octet-stream',
		}
	data_part = b''
	data_part += '--{}\r\n'.format(boundary).encode()
	for header in data_header:
		data_part += '{}: {}\r\n'.format(header, data_header[header]).encode()
	data_part += '\r\n'.encode()
	data_part += data
	data_part += '\r\n--{}--\r\n'.format(boundary).encode()

	multipart_headers = {
		'Content-Length': str(len(data_part)),
		'Content-Type': 'multipart/form-data; boundary={}'.format(boundary),
		}
	return data_part, multipart_headers
	
		
def ipfs_make_api(ipfs_loc, ipfs_port, api_prefix):
	ipfs_loc = '{}:{}'.format(ipfs_loc, ipfs_port)
	def ipfs_api(api_path, arg_map, data=None, callback=None):
		request_headers = None
		query = urllib.parse.urlencode(arg_map)
		url_parts = [
			'http',
			ipfs_loc,
			api_prefix + api_path,
			'',
			query,
			''
			]
		ipfs_url = urllib.parse.urlunparse(url_parts)
		debug > 1 and dprint('ipfs url:', ipfs_url)
		if data is not None:
			data, request_headers = format_multipart_formdata(data)
			debug > 1 and dprint('data:', data)
		ipfs_request = urllib.request.Request(url = ipfs_url, headers = request_headers, data = data)
		dprint('debug_request:', ipfs_request)
		with urllib.request.urlopen(ipfs_request) as ipfs_response:
			response_data = ipfs_response.read()
			debug > 1 and dprint(response_data)
		return response_data
	return ipfs_api
		
ipfs_api = ipfs_make_api(ipfs_loc = '127.0.0.1', ipfs_port = '5001', api_prefix = '/api/v0/')
		
def ipfs_publish(channel, id, sign_key, message):
	'''
		channel to publish message, usually video/id
		id username
		sign_key nacl signing key nacl.signing.SigningKey(key)
		message space seperated message
			segment index hash
			watching id
		'''
	publish_ok = False
	#sig = sign message bytes; base64 encode; decode as unidata string 
	data = {
		'id':id,
		'sig':base64.b64encode(sign_key.sign(message.encode()).signature).decode(),
		'data':message,
		}
	signed_json_message = json.dumps(data).encode() + b'\n'

	pub_result = ipfs_api('pubsub/pub', {'arg':channel}, data=signed_json_message)
	print('pub_result', pub_result)
	publish_ok = pub_results == b''
	ipfs_pub_err = pub_results or None
	return publish_ok, ipfs_pub_err
	
def load_ipfs_config():
	return json.load(open(os.path.join(os.environ['HOME'], '.ipfs', 'config'), 'r'))

def setup():
	config = {}
	if os.path.exists('config'):
		with open('config', 'r') as config_file:
			for line in config_file:
				data, sep, comment = line.partition('#')
				if data and ':' in data:
					key, sep, value = data.partition(':')
					config[key.strip()] = value.strip()
	ipfs_config = load_ipfs_config()
	id = ipfs_config['Identity']['PeerID']
	if not os.path.isdir('private'):
		os.mkdir('private')
	private_key_path = os.path.join('private', id)
	if not os.path.exists(private_key_path):
		with open(private_key_path, 'wb') as private_key_file:
			debug and dprint('generating new private key', private_key_path)
			sign_key = nacl.signing.SigningKey.generate()
			private_key_file.write(base64.b64encode(sign_key.encode()))
	else:
		with open(private_key_path, 'rb') as private_key_file:
			debug and dprint('loading private key', private_key_path)
			sign_key = nacl.signing.SigningKey(base64.b64decode(private_key_file.read()))
	
	print('using identity:', id)
	
	if not os.path.isdir('public'):
		os.mkdir('public')
	public_key_path = os.path.join('public', id)
	public_key = sign_key.verify_key
	b64_public_key = base64.b64encode(public_key.encode())
	if not os.path.exists(public_key_path):
		with open(public_key_path, 'wb') as public_key_file:
			public_key_file.write(b64_public_key)
	print('public key:', b64_public_key.decode())

	read_method_map = {
		'bytes': make_segment_size,
		'seconds': make_segment_time,
		}

	info = {
		'id': id,
		'sign_key': sign_key,
		'read_method': make_segment_size(2**18),
		}

	if 'segment every' in config:
		read_amount, sep, read_method = config['segment every'].partition(' ')
		read_amount = read_amount.strip()
		read_method = read_method.strip()
		if read_method in read_method_map and read_amount.isdigit():
			print('segmenting every', read_amount, read_method)
			info['read_method'] = read_method_map[read_method](int(read_amount))
		else:
			print('invalid "segment every": format is "segment every: amount type" where type is one of', ' '.join(read_method_map.keys()) )

	return info
		
class segment:
	def __init__(self, selector, address, finished_cb = None):
		self.selector = selector
		self.address = address
		self.data = b''
		self.finished = False
		self.finished_cb = finished_cb
		self.read_size = 100
		self.ipfs_proc = None

	def start_segment(self):
		ipfs_cmd = [
			'ipfs',
			'dag',
			'get',
			self.address,
			]
		self.ipfs_proc = subprocess.Popen(ipfs_cmd, stdout=subprocess.PIPE, encoding='utf-8')
		self.stream =  self.ipfs_proc.stdout.buffer
		os.set_blocking(self.stream.fileno(), False)
		self.selector.register(self.stream, selectors.EVENT_READ, self.read_data)

	def read_data(self):
		data = self.stream.read(self.read_size)
		if not data:
			self.stream.close()
			self.selector.unregister(self.stream)
			self.finished = True
			if self.finished_cb != None:
				self.finished_cb(self)
		else:
			while len(data) == self.read_size:
				self.data += data
				data = self.stream.read(self.read_size)
				if data == None:
					data = b''
			self.data += data
	def write(self, segment_file):
		message = json.loads(self.data.decode())
		if 'bytes' in message:
			based_data = message['bytes']
			if based_data[0] == 'm':
				bin_data = base64.b64decode(based_data[1:])
			else:
				dprint('error decoding data. not a known multibase type')
				bin_data = b''
			segment_file.write(bin_data)
		
class sequence_event:
	def __init__(self, selector, segment_file):
		self.selector = selector
		self.segment_file = segment_file
		self.seq = {}
		self.index = 0
		self.running_seq = {}
		self.max_run = 5
	def add_event(self, method, sequence, message):
		seq_ok = False
		if sequence.isdigit():
			seq_id = int(sequence)
			if method == 'segment':
				self.seq[seq_id] = segment(self.selector, message, self.cycle)	
				seq_ok = True
				if len(self.running_seq) < self.max_run:
					self.start_event(seq_id)
		return seq_ok
	def start_event(self, seq_id):
		self.running_seq[seq_id] = True
		self.seq[seq_id].start_segment()
	def cycle(self, overide = None):
		#note: this implamentation of cycle is not robustaly designed and should be reviewed
		debug and dprint('segment cycle')
		del_list = []
		#output all finished segments in order
		sorted_segment_list = sorted(self.seq)
		debug and dprint('segments active', ','.join(map(str, sorted_segment_list)) or 'None')
		#catch up to the current segment
		if sorted_segment_list and sorted_segment_list[0] > self.index:
			self.index = sorted_segment_list[0]
		#check all active segments in order, write the ones that are done to file and delete them
		#to do: some sort of timeout or skip so we do not get stuck on a missing segment
		#idea: have segment index be file offset, then we could write sparse files.
		for sindex in sorted_segment_list:
			debug and dprint('checking if segment is finished', sindex)
			if overide == True or (self.seq[sindex].finished and sindex == self.index):
				debug and dprint('segment ok', sindex)
				self.index += 1
				overide = False
			if sindex < self.index and self.seq[sindex].finished:
				debug and dprint('writing segment {}'.format(sindex) )
				self.seq[sindex].write(self.segment_file)
				del_list.append(sindex)
			elif (len(self.running_seq) - len(del_list) < self.max_run) and self.seq[sindex].ipfs_proc == None:
				self.start_event(sindex)
					
			else:
				debug and dprint('waiting for segment {} to finish'.format(self.index))
		for dindex in del_list:
			del self.seq[dindex]
			del self.running_seq[dindex]
			debug and dprint('removing segment', dindex )
			
class verify_message:
	''' message is a dictionary with the folling keys
		id: the id of the sender, must be present in 'public' directory
		data: string with message data
		sig: signature of data string
		'''
	def __init__(self):
		self.verify_key = {}
		self.stored_key = os.listdir('public')
		if debug:
			for key in self.stored_key:
				dprint('known key', key)
		self.valid_id_exp = re.compile(r'Qm[^ ]{40,50}$')
	def check(self, message):
		data_verified = False
		result = 'Never processed'
		message_id = message.get('id', 'unknown')
		if self.valid_id_exp.match(message_id):
			if message_id not in self.verify_key and message_id in self.stored_key:
				debug and dprint('loading key', message_id)
				with open(os.path.join('public', message_id), 'rb') as pub_file:
					self.verify_key[message_id] = nacl.signing.VerifyKey(base64.b64decode(pub_file.read()))
			if message_id in self.verify_key:
				signature_bytes = base64.b64decode(message['sig'])
				data_bytes = message['data'].encode()
				data_verified = self.verify_key[message_id].verify(data_bytes, signature_bytes)
				result = 'verified'
			else:
				result = 'No public key found for id "{}"'.format(message_id)
		else:
			result = 'message id failed to meet valid id requirments'
			
		return data_verified, result

class subscribe_stream:
	def __init__(self, stream_id, selector, segment_file, verify):
		self.stream_id = stream_id
		self.selector = selector
		self.segment_file = segment_file
		self.buffer = b''
		self.sequence = {
			self.stream_id: sequence_event(self.selector, self.segment_file),
			}
		self.verify = verify
		self.read_size = 100
		self.jd = json.JSONDecoder()
		user_info = setup()
		self.id = user_info['id']
		self.sign_key = user_info['sign_key']
		self.channel = 'video/{}'.format(stream_id)

		ipfs_sub_command = [
			'ipfs',
			'pubsub',
			'sub',
			self.channel,
			]
		self.ipfs_sub_proc = subprocess.Popen(ipfs_sub_command, stdout=subprocess.PIPE)
		self.proc_file = self.ipfs_sub_proc.stdout
		os.set_blocking(self.proc_file.fileno(), False)
		self.selector.register(self.proc_file, selectors.EVENT_READ, self.read_stream)
		self.send_message('watching {}'.format(stream_id))

	def read_stream(self):
		debug and dprint('reading stream data')
		read_data = self.proc_file.read(self.read_size)
		if not read_data:
			debug and dprint('stream {} over closing down'.format(self.stream_id) )
			self.proc_file.close()
			self.selector.unregister(self.proc_file)
		debug and dprint('read {} bytes'.format(len(read_data) ) )
		while len(read_data) == self.read_size:
			debug and dprint('reading more')
			self.buffer += read_data
			read_data = self.proc_file.read(self.read_size)
			if read_data == None:
				read_data = b''
			debug and dprint('read {} bytes'.format(len(read_data) ) )
		self.buffer += read_data

		check_failed = 0
		while check_failed < 1:
			try:
				message, break_index = self.jd.raw_decode(self.buffer.decode())
				self.buffer = self.buffer[break_index:]
				self.read_size = len(self.buffer) + break_index
				message_ok, message_ok_reason = self.verify.check(message)
				debug and dprint('message_ok', message_ok)
				if message_ok:
					sender = message['id']
					data_args = message['data'].split()
					if data_args:
						method = data_args[0]
						if method == 'segment':
							self.method_segment(sender = sender, sequence_id = data_args[1], segment_hash = data_args[2])
						else:
							dprint('unknown message:', sender, message['data'])
				else:
					debug and dprint('signature failure', message_ok_reason)
				check_failed = 0
			except json.decoder.JSONDecodeError:
				debug and dprint('no json', self.buffer)
				check_failed += 1
	def method_segment(self, sender, sequence_id, segment_hash):
		if sender == self.stream_id:
			self.sequence[sender].add_event('segment', sequence_id, segment_hash)
		else:
			debug and dprint('recieved segment from invalid sender:', sender, 'expected from:', self.stream_id, 'segment:', sequence_id, segment_hash)
			 
			
		
	def send_message(self, message):
		publish_ok, publish_err = ipfs_publish(self.channel, self.id, self.sign_key, message)
		if publish_ok == False:
			dprint('error publishing message:', publish_err, message)
		return publish_ok
		
					
			
def subscribe(stream_id):
	segment_file = sys.stdout.buffer
	verify = verify_message()
	subscription = subscribe_stream(stream_id, sel, segment_file, verify)

	while subscription.ipfs_sub_proc.poll() == None:
		events = sel.select(timeout=10)
		for key, mask in events:
			callback = key.data
			callback()

def make_segment_size(size):
	def segment_size(flo):
		data = flo.read(size)
		return data, len(data) == size
	return segment_size

def make_segment_time(seconds, read_size = None):
	if read_size == None:
		read_size = 1024
	def segment_time(flo):
		ts_start = time.time()
		ts_now = ts_start
		buffer = b''
		while ts_now - ts_start < seconds:
			data = flo.read(read_size)
			buffer += data
			if len(data) < read_size:
				return buffer, False
			ts_now = time.time()
		return buffer, True
	return segment_time
		


def stream(id, nacl_sign_key, flo, flo_read_method = None):
	'''Stream a file like object to ipfs pubsub id
	   id: channel to stream on
	   nacl_sign_key: bytes of data used to sign posts
	   flo: a file like object to stream
	   flo_read_method(optional): a function that returns bytes of flo to stream and a bool indicationg the end of the stream
	   	flo_read_method(flo) -> stream_bytes, stream_more
	   '''
	if flo_read_method == None:
		flo_read_method = make_segment_size(2 ** 18)
	link_list = []
	node = {
		'bytes':'',
		}
	ipfs_command = [
		'ipfs',
		'dag',
		'put',
		]
	publish_channel = 'video/{}'.format(id)
	
	output_exp = re.compile(r'([^\n ]+)\n')
	print('streaming to video/{}'.format(id))
	run_flag = True
	index = 0
	try:
		while run_flag:
			data, run_flag = flo_read_method(flo)
			node['bytes'] = 'm' + base64.b64encode(data).decode()
			json_node = json.dumps(node)
			debug > 1 and dprint('publishing segment:', len(data), 'bytes')
			ipfs_proc = subprocess.Popen(ipfs_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
			ipfs_out, ipfs_error = ipfs_proc.communicate(json_node.encode())
			
			output_match = output_exp.match(ipfs_out.decode())		
			if output_match:
				segment_id = output_match[1]
				link_data = {
					'Cid': {
						'/': segment_id,
						},
					'Name':'',
					'Size':len(data),
					}
				link_list.append(link_data)

				pub_message = 'segment {} {}'.format(index, segment_id)
				index += 1
				publish_ok, publish_err = ipfs_publish(publish_channel, id, nacl_sign_key, pub_message)
				
				if publish_ok:
					dprint('publish:',  pub_message)
				else:
					dprint('publish fail:', publish_err, pub_message)
			else:
				print('unable to publish message:', ipfs_out, ipfs_error)
	except KeyboardInterrupt:
		pass
	#quick note on the linked video file
	#ideally this would be compatible with ipfs get
	#unfortunately the get format is some sort of protobuf based thing
	#and the ipfs group is moving away from it to the ipld format
	#doubly unfortunatly there is no current file specification for the new ipld format
	#so this is all speculation... but it still beats trying to intigrate protobufs
	final_video = {
		'links':link_list,
		'data':''
		}
	ipfs_proc = subprocess.Popen(ipfs_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
	ipfs_out, ipfs_error = ipfs_proc.communicate(json.dumps(final_video).encode())
	full_hash = ipfs_out.decode()
	print('full video is at {}'.format(full_hash) )
	publish_ok, publish_err = ipfs_publish(publish_channel, id, nacl_sign_key, 'full {}'.format(full_hash) )
	
	
			

if __name__ == '__main__':
	#
	if len(sys.argv) > 1:
		if sys.argv[1] == 'get' and len(sys.argv) > 2:
			get_linked_data(sys.argv[2], sys.stdout.buffer)
		else:
			watch_id = sys.argv[1]
			if os.path.exists(os.path.join('public', watch_id)):
				subscribe(sys.argv[1])
			else:
				print('no public key found for {}: unable to stream'.format(watch_id))
	else:
		default_stream()