This is a study of license server deployment. The structure of the license includes two parts: server and client. The server maintains a database with sqlite3. The server and client communicate with AES crypto method.
Crypto
Define an AES crypto method:
1 2 3 4 5 6 7 8 9 10 11
classAESHelper(): def__inite__(self, password, iv): #Set the password and offset defpkcs7padding(self, text): #Pack the text to n*16 bytes defpkcs7unpadding(self, text): #Unpack the text defencrypt(self, content): #encrypt content and return defdecrypt(self, content): #decrypt content and return
Instantiate an AES crypto:
1 2 3
defserver_client_crypto(): #Return an "AESHelper" instance with a fixed password and offset return AESHelper("f1231111111111gh", "f1231111111111gh")
Server
The server receives requests from client and verify whether the request is valid. If the client is valid for a license, the server will generate a license string and send to the client. The communication encryption is AES. Besides, the license generation is recorded in database.
The server command line:
1
python main.py
Directory:
├───main.py # main file ├───utility.py # functions used in the main file └───license_data.db # database of license users
Receive requests from clients
Python can serve with package flask. We first define the router to get data from client:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
app = Flask(__name__) @app.route('/sendjson', methods=['POST']) defget_data(): #Get encrypted request from client #Decrypt the request #Call function "is_valid" to verify whether the license request is valid #Return a license if valid, otherwise, return "false" aes_code = server_client_crypto() data_from_client = aes_code.decrypt(request.get_json()) data_from_client = json.loads(data_from_client) #get dict print(data_from_client) response = Response(request.get_json()) if(not is_valid(data_from_client)): response.data = 'false' return response return_data = generate_license(data_from_client) response.data = aes_code.encrypt(return_data) return response
Verify validation of requests
The work flow of the verification is shown below:
The data from client should have some keys. Check whether the data have all the specified keys.
Check whether the server MAC address is correct.
Check whether the client is requesting for a license for a valid software.
Check in database:
Check the username and password.
Check whether the client is requesting a license for an existed device.
If the client is requesting for a license for a new device, check whether the user is valid for one more device.
If distribute a license to the client, register in database.
defis_valid(data_from_client): #Check whether it's valid for a license in database keys_should_have = ['MAC', 'SoftwareName', 'ServerMac', 'UserName', 'Password', 'DeviceID', 'MachineName', 'MachineID'] softwarenames = ['name0', 'name1'] UserContents = ['MAC', 'MachineName', 'DeviceID'] try: for key in keys_should_have: data_from_client[key] except: returnFalse #check server mac if(data_from_client['ServerMac'] != get_mac_address()): returnFalse # find target software and record the column index column_index = 1 find_target_software = False for i in softwarenames: column_index += 1 if(i == data_from_client['SoftwareName']): find_target_software = True break if(not find_target_software): returnFalse
#check db valid_user = False db = sqlite3.connect("license_data.db") cursor = db.cursor()
users = cursor.execute("SELECT * FROM LICENSE") for row in users: if(row[0] == data_from_client['UserName']): UserName = row[0] if(row[1] == data_from_client['Password']): num_devices_max = row[column_index] valid_user = True db.close() if(not valid_user): print("invalid user") returnFalse #check whether the user is tring to create more devices than valid num_devices db = sqlite3.connect("license_data.db") cursor = db.cursor() devices = cursor.execute("SELECT * FROM " + UserName + '_' + data_from_client['SoftwareName']) registered_dives = 0 for device in devices: registered_dives += 1 same_device = True for i inrange(len(UserContents)): if(device[i] != data_from_client[UserContents[i]]): print(device, data_from_client) same_device = False break if(same_device): db.close() returnTrue print(registered_dives, num_devices_max) if(registered_dives >= num_devices_max):#registering more devices than max db.close() returnFalse else: #register in database cmd = "INSERT INTO " + UserName + '_' + data_from_client['SoftwareName'] + ' (' for cc in UserContents: cmd += cc + ',' cmd = cmd[:-1] + ') ' + 'VALUES (' for uc in UserContents: if(uc.find('ID') != -1): cmd += str(data_from_client[uc]) + ',' else: cmd += '\'' + data_from_client[uc] + '\',' cmd = cmd[:-1] + ')' print(cmd) cursor.execute(cmd) db.commit() db.close() returnTrue
Database operations
Assume we have two softwares: “name0” and “name1”. We first create table to record the users’ permission. The main table records the user and the number of devices the user can use.
UserName
Password
software_name0
software_name1
…
VARCHAR(255) NOT NULL
VARCHAR(255) NOT NULL
INT
INT
INT
1 2 3 4 5 6 7 8 9 10 11 12 13 14
defadd_database(): #Specify softwares manually #Create an empty table. Note that the columns of software_names record the number of devices the client can register softwarenames = ['name0', 'name1'] db = sqlite3.connect('license_data.db') cursor = db.cursor() cmd = 'CREATE TABLE LICENSE (UserName VARCHAR(255) NOT NULL,Password VARCHAR(255) NOT NULL' for sn in softwarenames: cmd += ',' + sn + ' INT NOT NULL' cmd += ')' print(cmd) cursor.execute(cmd) db.commit() db.close()
After adding a database, we can add users in the main table. Besides, in this demo, we create a table for each user for each software, UserName_SoftwareName. Every table for the user records device information the user has registered.
defdelete_user(UserName): #Delete UserName in database include: #1. UserName in table license #2. UserName_SoftwareName Tables in database softwarenames = ['name0', 'name1'] NewUser = {'name0': 2, 'name1': 2, 'UserName': 'yma1', 'Password': 'pss'} db = sqlite3.connect("license_data.db") cursor = db.cursor() try: cmd = "DELETE FROM LICENSE where UserName=\'" + UserName + '\'' print(cmd) cursor.execute(cmd) print("Success delete from license table.") except: print("No specified user.") pass for sn in softwarenames: try: cmd = "DROP TABLE " + NewUser['UserName'] + '_' + sn print(cmd) cursor.execute(cmd) print("Success") except: pass db.commit() db.close()
Client
Read yaml file to get the configurations. Then send the configuration and mac address to server to request and generate a license for every software. After that, execute update every hour.
The client command line:
1
main.py -U user.yml -M 1 #user config and machineID
Directory:
├───main.py # main file ├───utility.py # functions used in main file └───user.yml # yaml file of user configuration
Register for a software
The client register a software and get license with the register function. The client first get the configuration from file user.yml and devices information, e.g. MAC address. Then encrypt the information and send to server to request a license.
1 2 3 4 5 6 7 8 9 10
defregister(SoftwareName, data): #Send data to server and generate license file #"Data" is from yaml file #Generate data_for_server by calling function "GetInfo" #Send the data_for_server to server by calling function "send_to_server" #Write license file by calling function "license_write_file" (see 2.2) data = GetInfo(SoftwareName, data) license_content = send_to_server(data) license_write_file(SoftwareName, license_content) print("Successfully get license for " + SoftwareName + ".")
1 2 3 4 5 6 7 8
defGetInfo(SoftwareName, data): #Return a dict containing "data", MAC address, MachineName and SoftwareName mac = get_mac_address() data['MAC'] = mac MachineName = socket.gethostname() data['MachineName'] = MachineName data['SoftwareName'] = SoftwareName return data
1 2 3 4 5 6 7 8 9 10 11 12 13
defsend_to_server(data): #Send data to server to request a license. #Exit if the server returns "false". #Return license aes_code = server_client_crypto() data = json.dumps(data) data = aes_code.encrypt(data) headers2 = {'Content-Type':'application/json'} response = requests.post(url="http://localhost:8085/sendjson", headers=headers2, json=data, verify=False) if(response.content == b'false'): print("Fail! Check configure yaml file or contact administrator.") exit(0) return response.content
Besides, we want the client to update the license every month. We check the license file to get the register data, If the date exceeds 30, the client delete the license and register again.
1 2 3 4 5 6 7 8 9 10 11 12
defupdater(): #Check the data in the license and current date: d = current_date - license_date #Remove license files and call function "register" if d >= 30. SoftwareName = license_path[:-4] current_time = datetime.now() license_content = license_parser(license_path) license_time = license_content['time'] license_time = datetime.strptime(license_time, "%Y-%m-%d") days_past = (current_time - license_time).days if(days_past >= 30): print("Update license.") os.remove(license_path) register(SoftwareName, data)
Generate license file
After getting the encrypted license from server, generate license file locally. The license file contains 10 lines. The first three lines are junk strings including “Y”, “E” and “S” respectively. The index of “Y”, “E” and “S” in the first three lines determines which line the encrypted license string is located:
[index(“Y” in line0) + index(“E” in line1) + index(“S” in line3)] % 7
For example, [1+1+1]%7=3. The license string will be stored in the seventh (3+3+1) line. The first “3” is the base index. The next “3” is the result of the computation. The other six lines are also junk strings containing “M”, “Y”, “Y”, “Y”, “D”, “S” respectively.
deflicense_write_file(SoftwareName, content): #"content": the encrypted license string. #Generate license file for SoftwareName: SoftwareName.lic fw = open(SoftwareName+'.lic', 'w') length = len(content) generate_list = ['Y', 'E', 'S'] digit = 0 for i in generate_list: ll = generate_junk(length, i) fw.write(ll+'\n') digit += ll.find(i) insert_line = digit % 7 generate_list += ['M', 'Y', 'Y', 'Y', 'D', 'S'] index = 0 for i inrange(7): if(i == insert_line): fw.write(str(content)[2:-1]+'\n') else: fw.write(generate_junk(length, generate_list[index])+'\n') index += 1 fw.close()
1 2 3 4 5 6 7 8
defgenerate_junk(length, key): #Generate a junk string which includes key with fixed length. ll = ''.join([random.choice(string.printable[:-6]) for i inrange(length)]) if(ll.find(key) == -1): ll = list(ll) ll[random.randint(0,length-1)] = key ll = ''.join((ll)) return ll
License parser
Get license content from a license file. The string can be used to verify the validation of the license file for security. In this demo, I don’t realize it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
deflicense_parser(license_path): #Read the first three lines and compute the license location (see 3.2). #Decrypt the license line. generate_list = ['Y', 'E', 'S'] #read license file and get license content fr = open(license_path) digit = 0 for i in generate_list: ll = fr.readline() digit += ll.find(i) insert_line = digit % 7 for i inrange(7): ll = fr.readline() if(i == insert_line): content = ll[:-1].encode() fr.close() aes_code = server_client_crypto() license_content = aes_code.decrypt(content) return json.loads(license_content)