0%

License server

License DOC

Overview

This is a study of license server deployment. The structure of the license includes two parts: server and client. The server maintains a database with sqlite3. The server and client communicate with AES crypto method.

Crypto

Define an AES crypto method:

1
2
3
4
5
6
7
8
9
10
11
class AESHelper():
def __inite__(self, password, iv):
#Set the password and offset
def pkcs7padding(self, text):
#Pack the text to n*16 bytes
def pkcs7unpadding(self, text):
#Unpack the text
def encrypt(self, content):
#encrypt content and return
def decrypt(self, content):
#decrypt content and return

Instantiate an AES crypto:

1
2
3
def server_client_crypto():
#Return an "AESHelper" instance with a fixed password and offset
return AESHelper("f1231111111111gh", "f1231111111111gh")

Server

The server receives requests from client and verify whether the request is valid. If the client is valid for a license, the server will generate a license string and send to the client. The communication encryption is AES. Besides, the license generation is recorded in database.

The server command line:

1
python main.py

Directory:

├───main.py # main file
├───utility.py # functions used in the main file
└───license_data.db # database of license users

Receive requests from clients

Python can serve with package flask. We first define the router to get data from client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
app = Flask(__name__)
@app.route('/sendjson', methods=['POST'])
def get_data():
#Get encrypted request from client
#Decrypt the request
#Call function "is_valid" to verify whether the license request is valid
#Return a license if valid, otherwise, return "false"
aes_code = server_client_crypto()
data_from_client = aes_code.decrypt(request.get_json())
data_from_client = json.loads(data_from_client) #get dict
print(data_from_client)
response = Response(request.get_json())
if(not is_valid(data_from_client)):
response.data = 'false'
return response
return_data = generate_license(data_from_client)
response.data = aes_code.encrypt(return_data)
return response

Verify validation of requests

The work flow of the verification is shown below:

  • The data from client should have some keys. Check whether the data have all the specified keys.
  • Check whether the server MAC address is correct.
  • Check whether the client is requesting for a license for a valid software.
  • Check in database:
    • Check the username and password.
    • Check whether the client is requesting a license for an existed device.
    • If the client is requesting for a license for a new device, check whether the user is valid for one more device.
  • If distribute a license to the client, register in database.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def is_valid(data_from_client):
#Check whether it's valid for a license in database
keys_should_have = ['MAC', 'SoftwareName', 'ServerMac', 'UserName', 'Password', 'DeviceID', 'MachineName', 'MachineID']
softwarenames = ['name0', 'name1']
UserContents = ['MAC', 'MachineName', 'DeviceID']
try:
for key in keys_should_have:
data_from_client[key]
except:
return False
#check server mac
if(data_from_client['ServerMac'] != get_mac_address()):
return False
# find target software and record the column index
column_index = 1
find_target_software = False
for i in softwarenames:
column_index += 1
if(i == data_from_client['SoftwareName']):
find_target_software = True
break
if(not find_target_software):
return False

#check db
valid_user = False
db = sqlite3.connect("license_data.db")
cursor = db.cursor()

users = cursor.execute("SELECT * FROM LICENSE")
for row in users:
if(row[0] == data_from_client['UserName']):
UserName = row[0]
if(row[1] == data_from_client['Password']):
num_devices_max = row[column_index]
valid_user = True
db.close()
if(not valid_user):
print("invalid user")
return False
#check whether the user is tring to create more devices than valid num_devices
db = sqlite3.connect("license_data.db")
cursor = db.cursor()
devices = cursor.execute("SELECT * FROM " + UserName + '_' + data_from_client['SoftwareName'])
registered_dives = 0
for device in devices:
registered_dives += 1
same_device = True
for i in range(len(UserContents)):
if(device[i] != data_from_client[UserContents[i]]):
print(device, data_from_client)
same_device = False
break
if(same_device):
db.close()
return True
print(registered_dives, num_devices_max)
if(registered_dives >= num_devices_max):#registering more devices than max
db.close()
return False
else:
#register in database
cmd = "INSERT INTO " + UserName + '_' + data_from_client['SoftwareName'] + ' ('
for cc in UserContents:
cmd += cc + ','
cmd = cmd[:-1] + ') ' + 'VALUES ('
for uc in UserContents:
if(uc.find('ID') != -1):
cmd += str(data_from_client[uc]) + ','
else:
cmd += '\'' + data_from_client[uc] + '\','
cmd = cmd[:-1] + ')'
print(cmd)
cursor.execute(cmd)
db.commit()
db.close()
return True

Database operations

Assume we have two softwares: “name0” and “name1”. We first create table to record the users’ permission. The main table records the user and the number of devices the user can use.

UserName Password software_name0 software_name1
VARCHAR(255) NOT NULL VARCHAR(255) NOT NULL INT INT INT
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def add_database():
#Specify softwares manually
#Create an empty table. Note that the columns of software_names record the number of devices the client can register
softwarenames = ['name0', 'name1']
db = sqlite3.connect('license_data.db')
cursor = db.cursor()
cmd = 'CREATE TABLE LICENSE (UserName VARCHAR(255) NOT NULL,Password VARCHAR(255) NOT NULL'
for sn in softwarenames:
cmd += ',' + sn + ' INT NOT NULL'
cmd += ')'
print(cmd)
cursor.execute(cmd)
db.commit()
db.close()

After adding a database, we can add users in the main table. Besides, in this demo, we create a table for each user for each software, UserName_SoftwareName. Every table for the user records device information the user has registered.

MAC MachineName DeviceID
VARCHAR(255) NOT NULL VARCHAR(255) NOT NULL INT
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def add_user():
#Add a user in the main table
#Create an empty table for each software with specified title for the user
#e.g. two empty tables: UserName_softwarename0, UserName_softwarename1
softwarenames = ['name0', 'name1']
UserContents = ['MAC', 'MachineName', 'DeviceID']
NewUser = {'name0': 2, 'name1': 2, 'UserName': 'yma1', 'Password': 'pss'}
db = sqlite3.connect("license_data.db")
cursor = db.cursor()
users = cursor.execute("SELECT * FROM license")
for user in users:
if(user[0] == NewUser['UserName']):
db.close()
raise RuntimeError("Adding existed user.")
#cursor.execute("DROP TABLE yma1_name0")
#cursor.execute("DROP TABLE yma1_name1")
cmd = "INSERT INTO LICENSE (UserName,Password"
for sn in softwarenames:
cmd += ','+ sn
cmd += ") VALUES (\'" + NewUser['UserName'] + '\',\'' + NewUser['Password'] + '\''
for sn in softwarenames:
cmd += ',' + str(NewUser[sn])
cmd += ')'
print(cmd)
cursor.execute(cmd)
for sn in softwarenames:
if(NewUser[sn] > 0):
cmd = "CREATE TABLE " + NewUser['UserName'] + '_' + sn + ' ('
for uc in UserContents:
cmd += uc
if(uc.find('ID') == -1):
cmd += " VARCHAR(255)" + ','
else:
cmd += " INT" + ','
cmd = cmd[:-1] + ')'
print(cmd)
cursor.execute(cmd)
db.commit()
db.close()

Similarly, if we want to delete a user, we remove the tables created for the user and delete the information in table LICENSE.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def delete_user(UserName):
#Delete UserName in database include:
#1. UserName in table license
#2. UserName_SoftwareName Tables in database
softwarenames = ['name0', 'name1']
NewUser = {'name0': 2, 'name1': 2, 'UserName': 'yma1', 'Password': 'pss'}
db = sqlite3.connect("license_data.db")
cursor = db.cursor()
try:
cmd = "DELETE FROM LICENSE where UserName=\'" + UserName + '\''
print(cmd)
cursor.execute(cmd)
print("Success delete from license table.")
except:
print("No specified user.")
pass
for sn in softwarenames:
try:
cmd = "DROP TABLE " + NewUser['UserName'] + '_' + sn
print(cmd)
cursor.execute(cmd)
print("Success")
except:
pass
db.commit()
db.close()

Client

Read yaml file to get the configurations. Then send the configuration and mac address to server to request and generate a license for every software. After that, execute update every hour.

The client command line:

1
main.py -U user.yml -M 1 #user config and machineID

Directory:

├───main.py # main file
├───utility.py # functions used in main file
└───user.yml # yaml file of user configuration

Register for a software

The client register a software and get license with the register function. The client first get the configuration from file user.yml and devices information, e.g. MAC address. Then encrypt the information and send to server to request a license.

1
2
3
4
5
6
7
8
9
10
def register(SoftwareName, data):
#Send data to server and generate license file
#"Data" is from yaml file
#Generate data_for_server by calling function "GetInfo"
#Send the data_for_server to server by calling function "send_to_server"
#Write license file by calling function "license_write_file" (see 2.2)
data = GetInfo(SoftwareName, data)
license_content = send_to_server(data)
license_write_file(SoftwareName, license_content)
print("Successfully get license for " + SoftwareName + ".")
1
2
3
4
5
6
7
8
def GetInfo(SoftwareName, data):
#Return a dict containing "data", MAC address, MachineName and SoftwareName
mac = get_mac_address()
data['MAC'] = mac
MachineName = socket.gethostname()
data['MachineName'] = MachineName
data['SoftwareName'] = SoftwareName
return data
1
2
3
4
5
6
7
8
9
10
11
12
13
def send_to_server(data):
#Send data to server to request a license.
#Exit if the server returns "false".
#Return license
aes_code = server_client_crypto()
data = json.dumps(data)
data = aes_code.encrypt(data)
headers2 = {'Content-Type':'application/json'}
response = requests.post(url="http://localhost:8085/sendjson", headers=headers2, json=data, verify=False)
if(response.content == b'false'):
print("Fail! Check configure yaml file or contact administrator.")
exit(0)
return response.content

Besides, we want the client to update the license every month. We check the license file to get the register data, If the date exceeds 30, the client delete the license and register again.

1
2
3
4
5
6
7
8
9
10
11
12
def updater():
#Check the data in the license and current date: d = current_date - license_date #Remove license files and call function "register" if d >= 30.
SoftwareName = license_path[:-4]
current_time = datetime.now()
license_content = license_parser(license_path)
license_time = license_content['time']
license_time = datetime.strptime(license_time, "%Y-%m-%d")
days_past = (current_time - license_time).days
if(days_past >= 30):
print("Update license.")
os.remove(license_path)
register(SoftwareName, data)

Generate license file

After getting the encrypted license from server, generate license file locally. The license file contains 10 lines. The first three lines are junk strings including “Y”, “E” and “S” respectively. The index of “Y”, “E” and “S” in the first three lines determines which line the encrypted license string is located:

[index(“Y” in line0) + index(“E” in line1) + index(“S” in line3)] % 7

For example, [1+1+1]%7=3. The license string will be stored in the seventh (3+3+1) line. The first “3” is the base index. The next “3” is the result of the computation. The other six lines are also junk strings containing “M”, “Y”, “Y”, “Y”, “D”, “S” respectively.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def license_write_file(SoftwareName, content):
#"content": the encrypted license string.
#Generate license file for SoftwareName: SoftwareName.lic
fw = open(SoftwareName+'.lic', 'w')
length = len(content)
generate_list = ['Y', 'E', 'S']
digit = 0
for i in generate_list:
ll = generate_junk(length, i)
fw.write(ll+'\n')
digit += ll.find(i)
insert_line = digit % 7
generate_list += ['M', 'Y', 'Y', 'Y', 'D', 'S']
index = 0
for i in range(7):
if(i == insert_line):
fw.write(str(content)[2:-1]+'\n')
else:
fw.write(generate_junk(length, generate_list[index])+'\n')
index += 1
fw.close()
1
2
3
4
5
6
7
8
def generate_junk(length, key):
#Generate a junk string which includes key with fixed length.
ll = ''.join([random.choice(string.printable[:-6]) for i in range(length)])
if(ll.find(key) == -1):
ll = list(ll)
ll[random.randint(0,length-1)] = key
ll = ''.join((ll))
return ll

License parser

Get license content from a license file. The string can be used to verify the validation of the license file for security. In this demo, I don’t realize it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def license_parser(license_path):
#Read the first three lines and compute the license location (see 3.2).
#Decrypt the license line.
generate_list = ['Y', 'E', 'S']
#read license file and get license content
fr = open(license_path)
digit = 0
for i in generate_list:
ll = fr.readline()
digit += ll.find(i)
insert_line = digit % 7
for i in range(7):
ll = fr.readline()
if(i == insert_line):
content = ll[:-1].encode()
fr.close()
aes_code = server_client_crypto()
license_content = aes_code.decrypt(content)
return json.loads(license_content)